本文共 7557 字,大约阅读时间需要 25 分钟。
表格存储是阿里云提供的一个分布式存储系统,可以用来存储海量结构化、半结构化的数据。多元索引(SearchIndex)可以支持基于属性列的丰富查询类型,帮你挖掘出数据的更多潜能。
多元索引会分布式地将数据打散存储在不同机器上。一般情况下,查询无需关心数据被分配到哪里;但通过指定路由,您可以有的放矢地定向搜索,在指定的一个数据分区上执行查询,而不是所有数据分区,有效提升了查询吞吐量,减少长尾对延迟的影响。本文抽丝剥茧,介绍分布式模型中路由的原理,如何使用路由来加速查询。主表和索引数据被分布式地存储到多个分区,分布在多台机器上,为了定位数据被分配到哪个分区,读写时都需要根据路由值进行计算。
海量数据的存储需求、读写延迟和低成本催生了分布式NoSQL存储。购买更大更强的服务器运行数据库作为“纵向扩展(scale up)”,变得越发困难且昂贵;而在服务器集群上进行“横向扩展(scale out)”则备受青睐。分布式系统会对数据分区,分区被分布到多台机器上,根本上解决了单台机器对数据容量的限制。每个分区的数据规模较小,查询性能更高;查询可以并发地在多个分区上执行,提升了吞吐量。
路由,是指定位数据到特定分区的方法。例如,将一张表(Table)的数据划分为3个分区(分区0~2)存储,一个简单的路由规则可以这样设计:PartitionId = PartitionKey % PartitionNum。其中,PartitionId是分区编号;PartitionKey是分区键值;PartitionNum是分区数,在这里为3。表格存储的主表数据就是按分区键(表格存储中的第一个主键列)为路由值,定位某一行到特定分区。多元索引数据默认按“所有主键列的序列化值”为路由值来确定分区,也可以由用户指定路由字段的值来分区索引数据。如果用户创建索引时指定了路由字段,会按照该路由字段写入相应分区;查询时也带上路由值,就可以在指定的一个分区执行查询,而不是所有分区,效率更高。写入一行时,主表数据默认以“分区键”为路由,写入特定分区;索引数据以“所有主键列的序列化值”为路由,写入特定分区。
表格存储中的查询分两种,一种是简单地按照主键列或主键列前缀查询,另一种是按属性列查询。
按主键列/主键列前缀查询:这个过程只涉及主表数据读取。主表数据按照分区键分区,分区内部数据按照主键列的前缀排序。如果按主键列读取(GetRow / BatchGetRow / GetRange),可以直接定位到对应数据分区;按属性列查询:这个过程涉及索引表数据读取。索引数据默认按照“所有主键列的序列化值”分区。如果按属性列查询,并不知道数据分区在哪里,只能全局范围搜索,查询性能低,资源消耗大。如果用户指定了路由字段,写入和读取时都按照路由字段来定位分区,就可以避免全局范围搜索,查询性能高,资源消耗小。本文关心的是多元索引中的路由的作用。
如果创建索引时不指定路由字段,默认按“所有主键列的序列化值”为路由,索引数据虽然会被打散到不同分区实现分布式存储,但是查询的时候必须访问所有的分区,效率是很低的;如果创建索引时由用户指定路由字段,查询时也带上路由字段值,则可以为查询带来额外收益。总结起来,路由有以下作用:别急,事情总有两面。有时候不经过精心设计,会出现严重的数据偏移——某个路由值的行数太多,单个分区的行数太大,查询也变慢,而且可能超过单机容量限制。例如,您为订单表建立多元索引,以商家id(user_id)作为路由字段,普通的商家只会产生少量订单,但有几个“大商家”每天都会产生大量订单,他们的数据所在分区就会急剧膨胀起来。面对这种状况,我们该如何应对?
此外,一旦创建索引时指定了路由,以后的查询时都必须带上正确的路由值,否则可能查不到相关数据。如果某天你不想使用路由进行查询了,只能重建索引。
多元索引的自定义路由使用非常方便,只需要两步,详见后面代码实现:
您在创建多元索引指定了路由字段后,索引数据的读写都会使用该路由字段进行定位,不能动态改变。如果想使用系统默认路由(即主键列路由)或者重新指定其他字段为路由字段,您需要重建索引。
查询时使用路由,定向搜索指定数据分区,可以减少长尾对延迟的影响。一般来说,对于自定义路由的查询请求,都要求用户提供路由字段。如果不指定,虽然查询结果是一样的,但会访问无关的数据分区,白白浪费了系统资源,增加访问延迟。
此外,使用自定义路由也有限制——路由字段只能是表格存储的主键列
由于表格存储半结构化的特性,如果路由字段是属性列,可能出现某行没有该属性列,就不知道路由到哪个数据分片了。假设在订单表上建立多元索引,以商家id(user_id)为路由,查询某商家的订单时也需要指定路由值。代码示例如下:
//1. 创建表 createtableRequest := new(tablestore.CreateTableRequest) tableMeta := new(tablestore.TableMeta) tableMeta.TableName = tableName //设置表名 tableMeta.AddPrimaryKeyColumn("order_id", tablestore.PrimaryKeyType_STRING) //第1主键列: 订单号 tableMeta.AddPrimaryKeyColumn("user_id", tablestore.PrimaryKeyType_STRING) //第2主键列: 商家id tableOption := new(tablestore.TableOption) tableOption.TimeToAlive = -1 //TTL无限 tableOption.MaxVersion = 1 //每列最多1个数据版本 reservedThroughput := new(tablestore.ReservedThroughput) reservedThroughput.Readcap = 0 reservedThroughput.Writecap = 0 createtableRequest.TableMeta = tableMeta createtableRequest.TableOption = tableOption createtableRequest.ReservedThroughput = reservedThroughput //设置预留读写吞吐量 _, err := client.CreateTable(createtableRequest) //创建主表 if err != nil { fmt.Println("Failed to create table with error:", err) return } else { fmt.Println("Create table finished") } //2. 创建多元索引,指定路由字段 request := &tablestore.CreateSearchIndexRequest{} request.TableName = tableName //设置表名 request.IndexName = indexName //设置索引名 //添加索引字段 (根据业务,您还可以添加更多字段) schemas := []*tablestore.FieldSchema{} field1 := &tablestore.FieldSchema{ FieldName: proto.String("product_name"), //字段名: 商品名 FieldType: tablestore.FieldType_KEYWORD, //字段类型: KEYWORD类型 Index: proto.Bool(true), //开启索引 } field2 := &tablestore.FieldSchema{ FieldName: proto.String("order_time"), //字段名: 下单时间 FieldType: tablestore.FieldType_LONG, //字段类型: LONG类型 Index: proto.Bool(true), //开启索引 EnableSortAndAgg: proto.Bool(true), //允许排序和聚合 } field3 := &tablestore.FieldSchema{ FieldName: proto.String("user_id"), //字段名: 商家id FieldType: tablestore.FieldType_KEYWORD, //字段类型: KEYWORD类型 Index: proto.Bool(true), //开启索引 } schemas = append(schemas, field1, field2, field3) indexSetting := &tablestore.IndexSetting{ //设置路由字段 RoutingFields: []string{"user_id"}, } request.IndexSchema = &tablestore.IndexSchema{ FieldSchemas: schemas, //设置索引字段 IndexSetting: indexSetting, //设置索引配置(包含路由字段) } _, err = client.CreateSearchIndex(request) //创建多元索引 if err != nil { fmt.Println("Failed to create index with error:", err) return } else { fmt.Println("Create index finished") } time.Sleep(time.Duration(60) * time.Second) //等待数据表加载 //3. 插入一些测试数据 productNames := []string{"product a", "product b", "product c"} userIds := []string{"00001", "00002", "00003", "00004", "00005"} for i := 0; i < 100; i++ { putRowRequest := new(tablestore.PutRowRequest) putRowChange := new(tablestore.PutRowChange) putRowChange.TableName = tableName putPk := new(tablestore.PrimaryKey) putPk.AddPrimaryKeyColumn("order_id", fmt.Sprintf("%d", i)) putPk.AddPrimaryKeyColumn("user_id", userIds[i%len(userIds)]) putRowChange.PrimaryKey = putPk putRowChange.AddColumn("product_name", productNames[i%len(productNames)]) putRowChange.AddColumn("order_time", int64(time.Now().Second())) putRowChange.SetCondition(tablestore.RowExistenceExpectation_IGNORE) putRowRequest.PutRowChange = putRowChange _, err := client.PutRow(putRowRequest) if err != nil { fmt.Println("putrow failed with error:", err) } } time.Sleep(time.Duration(30) * time.Second) //等待数据同步到多元索引(通常,稳定后延迟在1s~10s级别) //4. 查询时,带上路由字段 searchRequest := &tablestore.SearchRequest{} searchRequest.SetTableName(tableName) //设置主表名 searchRequest.SetIndexName(indexName) //设置多元索引名 query := &search.MatchQuery{} //设置查询类型为MatchQuery query.FieldName = "user_id" //设置要匹配的字段 query.Text = "00002" //设置要匹配的值: 查询user_id="00002"商家的所有订单 searchQuery := search.NewSearchQuery() searchQuery.SetQuery(query) searchQuery.SetGetTotalCount(true) //返回所有匹配到的行 searchRequest.SetColumnsToGet(&tablestore.ColumnsToGet{ReturnAll:true}) //返回所有的列 searchRequest.SetSearchQuery(searchQuery) routingValue := new(tablestore.PrimaryKey) routingValue.AddPrimaryKeyColumn("user_id", "00002") searchRequest.AddRoutingValue(routingValue) searchResponse, err := client.Search(searchRequest) //查询 if err != nil { fmt.Println("Failed to search with error:", err) return } fmt.Println("IsAllSuccess: ", searchResponse.IsAllSuccess) //查看返回结果是否完整 fmt.Println("TotalCount: ", searchResponse.TotalCount) //返回所有匹配到的行数 fmt.Println("RowCount: ", len(searchResponse.Rows)) //返回的行数 for _, row := range searchResponse.Rows { //打印本次返回的行 jsonBody, err := json.Marshal(row) if err != nil { panic(err) } fmt.Println("Row: ", string(jsonBody)) }
转载地址:http://xihna.baihongyu.com/