MyException - 我的异常网
当前位置:我的异常网» 互联网 » 【技术实验】报表存储Tablestore准实时同步数据到El

【技术实验】报表存储Tablestore准实时同步数据到Elasticsearch

www.MyException.Cn  网友分享于:2013-12-18  浏览:0次
【技术实验】表格存储Tablestore准实时同步数据到Elasticsearch

点击有惊喜

阿里云幸运券分享给你,用券购买或者升级阿里云相应产品会有特惠惊喜哦!把想要买的产品的幸运券都领走吧!快下手,马上就要抢光了。

 

实验背景

图书馆Q是一家大型图书馆,图书馆藏书众多,纸质图书600多万册,电子图书7000多万册,总数有八千多万册,这些图书之前都是人工检索维护的,现在需要做一个系统来存储管理这些图书信息。
需求如下:

  • 图书总量目前八千多万册,考虑到未来二十年的增长,需要系统能支持一亿的存储量。
  • 图书信息很重要,不能接受丢失发生。
  • 图书的名字和作者名字需要支持模糊搜索。
  • 每本书的属性最多有一百多个,且不固定,不同类型的图书的属性列差异较大。且未来可能会新增属性列。

根据上面这些需求特点,要完成这个管理系统,需要两类系统支持:

  • 分布式NoSQL数据库:解决两亿存储量的问题,解决属性列较多且不固定的问题,解决可靠性要求高的问题。
  • 搜索系统:解决固定列模糊搜索的需求。

如果使用阿里云产品,那么对应的产品就是:

  • Table Store:分布式NoSQL数据库。
  • Elasticsearch:搜索系统,支持模糊搜索。

在管理系统中使用上述两个系统的时候,目前需要双写,当新增一本书的时候,需要将详细书本信息写入Table Store,将书本ID和作者,书名写入Elasticsearch,并且对书名,作者建索引。查询的时候,如果是根据书本ID,则直接查询Table Store。如果是根据书名模糊查询,则先查Elasticsearch,获取到匹配的书本的ID后,再到Table Store中查询详细信息。

如果Table Store到Elasticsearch有自动同步通道,那么只需要将新书信息写入Table Store即可,不再需要写Elasticsearch。减少了一次写入操作,且不用再考虑数据一致性问题,系统架构大大简化。那么如何才能实现这个自动同步通道呢?

类似于上面的场景,有很多系统都有这样的需求:拥有PB级海量数据需要持久化存储,同时有一两个字段需要做模糊查询,比如姓名,手机号码等,目前很多解决方案需要双写分布式数据库和Elasticsearch,但这样不仅会带来开发、运维复杂度,而且还有数据不一致的问题。

针对上述问题,Table Store团队联合数据集成(CDP)和Elasticsearch团队上线了近实时的数据同步方案,用户只需要将数据写入Table Store,Table Store会负责将数据在10分钟内自动发送给Elasticsearch建索引。

实验环境

Table Store:阿里云分布式NoSQL数据库,专注于海量数据的存储服务,目前单表可支持10PB级,10万亿行以上的数据量,且数据量增大后性能仍然保持稳定。Table Store Stream功能是一种增量实时通道服务,类似于MySQL的binlog,可以通过Stream接口实时读取到最新的变化数据(Put/Update/Delete)。

数据集成 :阿里云数据管理平台,支持数据同步等众多数据功能。

Elasticsearch :阿里云Elasticsearch是刚推出的一项新服务,提供基于开源Elasticsearch及商业版X-Pack插件,致力于数据分析、数据搜索等场景服务。在开源Elasticsearch基础上提供企业级权限管控、安全监控告警、自动报表生成等功能。

三种产品在新解决方案中的角色如下:

| 产品 | Table Store | 数据集成| Elasticsearch |

角色 数据存储 数据同步通道 查询增强

由于Table Store和Elasticsearch不是完全对等的产品,所以如果需要将数据导入Elasticsearch,那么在使用Table Store的时候有一些注意的地方:

  • Table Store主键列个数:
    目前Table Store最大支持4个主键列,而Elasticsearch只支持一个,所以Table Store的表设计时只能使用一个主键列,如果之前有多个主键列,可以将多个主键列的值转换成String,然后拼接成一个主键列。
  • Table Store数据变化类型:
    仅支持PUT(新增),UPDATE(更新)两种操作。

 

不支持DELETE操作。

  • Table Store多版本:
    仅支持单版本,不支持多版本
  • Elasticsearch:
    版本:支持阿里云和开源的5..版本。

延时:目前使用的是周期调度,每隔5分钟调度一次,再加上插件中有5分钟延迟,同步总延迟在5~10分钟。

实验步骤

开通服务

  • Table Store:
  • 登陆Table Store。首页点击开通
  • 新建实例和表,表需要开通Stream,有效时间可以选择24小时。
  • Table Store支持按预留CU和按量付费两种收费模式,如果创建表时指定读写CU都为0则表示为按量付费,后期如果没有使用则不收费。且目前每月有10GB,1000万CU的免费额度。
  • Table Store中表需要开通Stream功能。
  • 数据集成
  • 登陆DataWorks绑定AK。
  • 然后创建项目即可。
  • 注意:子账号不能创建项目,只能被主账号授权。
  • Elasticsearch
  • 登陆Elasticsearch首页,首页点击开通。购买时的VPC必须和之前购买的ECS在同一个VPC环境内部。根据数据量预估,购买相应的实例大小。

使用方式

  • Table Store:PutRow/BatchWriteRow接口写入数据
  • Elasticsearch:无须写入
  • Elasticsearch:搜索到请求结果后,拿到每个doc的_id字段值。
  • Table Store:Elasticsearch中的_id字段就是Table Store中的主键值,获取到一系列_id值后,使用Table Store的BatchGetRow可以查询到完整数据。

同步流程

  1. 整个同步流程应该包括下面两个步骤:
  1. 导出Table Store的全量数据到Elasticsearch,并且记录开始时间T1。
  2. 等全量导出结束后,再开始同步增量数据,增量数据开始同步的时间是T1。
  3. 对于全量导出,需要使用otsreader插件,配置中的Range使用INF_MIN到INF_MAX,也就是导出所有数据。
  4. 对于增量同步,需要配置起始时间和结束时间为一个变量,在调度周期配置的时候配置起始时间必须小于等于T1,否则可能会有数据丢失发生。

我们下面会以增量同步为例来介绍如何配置增量同步任务。

Table Store配置

无须配置

Elasticsearch配置

无须配置

数据集成配置

  1. 创建数据源(可选)

如果已经创建了Table Store的数据源,则可以跳过这一步。
如果不希望创建数据源,也可以在配置页面配置相应的endpoint,instanceName,AccessKeyID和AccessKeySecret。如果希望创建,则按照下面步骤操作。

登录阿里云大数据开发套件:数据源地址。
单击左侧 离线同步 > 数据源。
在数据源配置页面,选择右上角 新增数据源 ,会有一个弹出框。

按照说明填写:

  • 数据源名称:填写一个数据源标识符,比如车联网。
  • 数据源描述:填入描述符,比如:车联网GPS数据存储。
  • 数据源类型:选择 ots ,ots是Table Store曾用名。
  • OTS Endpoint:填入TableStore 实例页面的实例地址,如果Table Store的实例和目标产品(比如Elasticsearch)在同一个region,则可以填入私网地址,否则需要填入公网地址,不能填入VPC地址。
  • OTS 实例ID:填入Table Store的实例名称。
  • Access Id:填入阿里云网站的AccessKeyID。
  • Access Key:填入阿里云网站AccessKeyID对应的AccessKeySecret。
  • 点击测试连通性 ,如果成功则会在右上角提示:测试连接成功。 如果失败,点击endpoint是否配置正确,如果仍然无法解决,提工单联系数据集成。
  • 填好后的页面类似下面这样:
    Elasticsearch_
  • 单击确定,数据源创建成功,此时在数据源页面会出现一个新的数据源信息;
    Elasticsearch_
  1. 创建导出任务
    (1)单击数据集成地址,进入数据集成的页面,会出现模式选择:

Elasticsearch_
(2)单击 脚本模式 ,弹出一个 导入模板 配置。
(3)在导入模板配置里面:
来源类型:OTS Stream
目标类型:Elasticsearch
Elasticsearch_
(4)单击确认,则进入配置界面。

  1. 完善配置项

在配置界面,已经提前嵌入了OTSStreamReader和ElasticsearchWriter的模板,每一项配置后面都做了解释。

{
  "type": "job",
  "version": "1.0",
  "configuration": {
    "setting": {
      "errorLimit": {
        "record": "0"  # 允许出错的个数,当错误超过这个数目的时候同步任务会失败。
      },
      "speed": {
        "mbps": "1",  # 每次同步任务的最大流量。
        "concurrent": "1" # 每次同步任务的并发度。
      }
    },
    "reader": {
      "plugin": "otsstream",  # Reader插件的名称。
      "parameter": {
        "endpoint": "",  # TableStore中实例的endpoint。
        "accessId": "",  # 阿里云的AccessKeyID。
        "accessKey": "",  # 阿里云的AccessKeySecret。
        "instanceName": "",  # TableStore的实例名,如果使用DataSource,则需要新增配置项datasource,不再需要配置endpoint,accessId,accessKey和instanceName。
        "dataTable": "",  # TableStore中的表名。
        "statusTable": "TableStoreStreamReaderStatusTable", # 存储TableStore Stream状态的表,一般不需要修改。
        "startTimestampMillis": "", # 开始导出的时间点,由于是增量导出,需要循环启动此任务,则这里每次启动的时候的时间都不一样,这里需要设置一个变量,比如${start_time}。
        "endTimestampMillis": "",   # 结束导出的时间点。这里也需要设置一个变量,比如${end_time}。
        "date": "yyyyMMdd",  # 导出哪一天的数据,功能和startTimestampMillis、endTimestampMillis重复,这一项需要删除。
        "mode": "single_version_and_update_only", # TableStore Stream导出数据的格式,目前ElasticSearch只能接收这种格式的,这个不需要修改。如果配置模板中没有则需要增加。
        "column":[  # 需要导出TableStore中的哪些列到ElasticSearch中去,如果配置模板中没有则需要增加。
            {"name":"uid"},  
            {"name":"name"},           
            {"name":"phone"}
        ],
        "isExportSequenceInfo": false,  # single_version_and_update_only 模式下只能是false。
        "maxRetries": 30 # 最大重试次数。
      }
    },
    "writer": {
      "plugin": "elasticsearch",  # Writer插件的名称:ElasticSearchWriter,不需要修改。
      "parameter": {
        "endpoint": "",# ElasticSearch的endpoint,控制台上有。
        "accessId": "",# 如果使用了X-PACK插件,则这里需要填写username,如果没使用,则这里填空字符串即可。阿里云Elasticsearch使用了X-PACK插件,这里需要填写username。
        "accessKey": "", # 如果使用了X-PACK插件,则这里需要填写password,如果没使用,则这里填空字符串即可。阿里云Elasticsearch使用了X-PACK插件,这里需要填写password。  
        "index": "",  # ElasticSearch的索引名称,如果之前没有,插件会自动创建。
        "indexType": "", # ElasticSearch中相应索引下的类型名称
        "cleanup": true, # 是否在每次导入数据到ElasticSearch的时候清空原有数据,全量导入/重建索引的时候需要设置为true,同步增量的时候必须为false,这里因为是同步,则需要设置为false。
        "discovery": false, # 是否自动发现,设置为true
        "batchSize": 1000, # 每批导出的个数
        "splitter": ",",  # 如果插入数据是array,就使用指定分隔符。
        "column": [  # ElasticSearch中的列名,顺序和Reader中的Column顺序一致
          {
            "name": "uid",  # TableStore中的主键列是uid,这里也有同名uid,用type:id表示这一列是主键
            "type": "id"  # id表示这一列是主键,id不是ElasticSearch的内置类型,是ElasticSearchWriter提供的虚拟类型
          },
          {
              "name": "name", # 对应于TableStore中的属性列:name
            "type": "text"  # 文本类型,采用默认分词
          }
        ]
      }
    }
  }
}

其他配置项可以参考:ElasticsearchWriter配置项https://help.aliyun.com/knowledge_detail/61990.html

  1. 保存任务

点击上部的 保存 按钮,则弹出一个对话框,输入任务名称后按确定即可。
Elasticsearch_

  1. 设置调度资源

(1)由于目前数据集成还没办法自动访问VPC环境内的Elasticsearch,所以暂时需要用户自己购买VPC内的ECS机器作为调度资源。
(2)购买一台ECS,新购ECS所在的VPC需要和Elasticsearch的VPC是同一个。
(3)进入 DataWorks > 调度资源列表 ,单击 新增调度资源 。
(4)在弹出的对话框中输入:资源名称,选择:归属项目。
Elasticsearch_
(5)然后点击 确定 ,会提示 添加调度资源成功 。
Elasticsearch_
(6)点击 管理服务器 ,会弹出一个新的弹出框。
Elasticsearch_

  • (7)点击 增加服务器 ,会弹出一个新的弹出框。
  • 选择 专有网络 。
  • 输入刚刚购买的ECS的:ECS UUID。登陆ECS,以root身份执行命令 bash dmidecode | grep UUID 即可获取到UUID。
  • 输入刚刚购买的ECS的内网:机器IP。
  • 单击 添加 。
    Elasticsearch_

(8)如果是添加机器,需要初始化服务器,点击 调度资源列表 页面相应资源名称后面的 服务器初始化 ,会弹出初始化步骤,按这个步骤执行。
(9)执行完成后,再点击 调度资源列表 页面相应资源名称后面的 服务器管理 ,点击 刷新 ,当 服务状态 变为 正常 时,表示调度资源配置成功。
Elasticsearch_

  1. 运行任务(测试)

(1)可以先通过运行任务来测试配置是否正确和符合预期。
(2)回到任务配置页面 数据集成 > 离线同步 > 同步任务 > 数据同步 。
(3)双击刚刚创建的任务:tablestore2es,点击配置内容上部的 运行 。
(4)这时候会弹出一个参数设置框,设置之前配置的变量:
Elasticsearch_

 

点击有惊喜

文章评论

5款最佳正则表达式编辑调试器
5款最佳正则表达式编辑调试器
程序员都该阅读的书
程序员都该阅读的书
旅行,写作,编程
旅行,写作,编程
漫画:程序员的工作
漫画:程序员的工作
程序员的鄙视链
程序员的鄙视链
那些性感的让人尖叫的程序员
那些性感的让人尖叫的程序员
为啥Android手机总会越用越慢?
为啥Android手机总会越用越慢?
写给自己也写给你 自己到底该何去何从
写给自己也写给你 自己到底该何去何从
“懒”出效率是程序员的美德
“懒”出效率是程序员的美德
10个调试和排错的小建议
10个调试和排错的小建议
“肮脏的”IT工作排行榜
“肮脏的”IT工作排行榜
程序员必看的十大电影
程序员必看的十大电影
亲爱的项目经理,我恨你
亲爱的项目经理,我恨你
Java程序员必看电影
Java程序员必看电影
 程序员的样子
程序员的样子
2013年中国软件开发者薪资调查报告
2013年中国软件开发者薪资调查报告
10个帮程序员减压放松的网站
10个帮程序员减压放松的网站
看13位CEO、创始人和高管如何提高工作效率
看13位CEO、创始人和高管如何提高工作效率
十大编程算法助程序员走上高手之路
十大编程算法助程序员走上高手之路
中美印日四国程序员比较
中美印日四国程序员比较
当下全球最炙手可热的八位少年创业者
当下全球最炙手可热的八位少年创业者
程序猿的崛起——Growth Hacker
程序猿的崛起——Growth Hacker
每天工作4小时的程序员
每天工作4小时的程序员
不懂技术不要对懂技术的人说这很容易实现
不懂技术不要对懂技术的人说这很容易实现
老程序员的下场
老程序员的下场
我的丈夫是个程序员
我的丈夫是个程序员
一个程序员的时间管理
一个程序员的时间管理
团队中“技术大拿”并非越多越好
团队中“技术大拿”并非越多越好
Web开发人员为什么越来越懒了?
Web开发人员为什么越来越懒了?
2013年美国开发者薪资调查报告
2013年美国开发者薪资调查报告
老美怎么看待阿里赴美上市
老美怎么看待阿里赴美上市
程序员周末都喜欢做什么?
程序员周末都喜欢做什么?
Google伦敦新总部 犹如星级庄园
Google伦敦新总部 犹如星级庄园
初级 vs 高级开发者 哪个性价比更高?
初级 vs 高级开发者 哪个性价比更高?
我跳槽是因为他们的显示器更大
我跳槽是因为他们的显示器更大
Java 与 .NET 的平台发展之争
Java 与 .NET 的平台发展之争
如何成为一名黑客
如何成为一名黑客
鲜为人知的编程真相
鲜为人知的编程真相
Web开发者需具备的8个好习惯
Web开发者需具备的8个好习惯
做程序猿的老婆应该注意的一些事情
做程序猿的老婆应该注意的一些事情
程序员的一天:一寸光阴一寸金
程序员的一天:一寸光阴一寸金
什么才是优秀的用户界面设计
什么才是优秀的用户界面设计
我是如何打败拖延症的
我是如何打败拖延症的
聊聊HTTPS和SSL/TLS协议
聊聊HTTPS和SSL/TLS协议
那些争议最大的编程观点
那些争议最大的编程观点
如何区分一个程序员是“老手“还是“新手“?
如何区分一个程序员是“老手“还是“新手“?
总结2014中国互联网十大段子
总结2014中国互联网十大段子
为什么程序员都是夜猫子
为什么程序员都是夜猫子
程序员和编码员之间的区别
程序员和编码员之间的区别
软件开发程序错误异常ExceptionCopyright © 2009-2015 MyException 版权所有