【华为云技术分享】GeminiDB for Cassandra 流功能介绍
1 使用GeminiDB for Cassandra流捕获表活动
1.1 功能介绍
当存储在GeminiDB for Cassandra集群中某张表的某项目发生变更时,其他的程序能够做出相应的变更,比如:一个应用程序更改了GeminiDB for Cassandra集群中的某行数据,另一个应用程序能够读取到这行数据变更并做出相应的动作。
GeminiDB for Cassandra支持这种类似的场景。流表捕获原表的变动,并存储24小时之后过期。客户端通过SDK可访问流表并获取数据修改前后的项目数据。
GeminiDB for Cassandra流是一种有关表中的项目修改的有序信息流,当启动某张表的流时,GeminiDB for Cassandra流表捕获原表的数据项目的更改信息。当应用程序在表中插入、更新、或者删除某条数据时,流表都会记录一条修改数据的流记录。流记录包含对表中某条数据所做的数据修改的相关信息,包含修改前后的新旧数据记录。原表中修改的每个项目,流表中的记录将按照对应的实际修改的顺序显示。
流表会实时的监控原表的记录,包括插入、删除、更新操作,以便能够在其他情况使用,但不包含DDL操作记录;流表使用GeminiDB for Cassandra的物化视图实现,遵循物化视图的相关限制,比如有必须先删除物化视图表之后才能删除原表,对应必须要先删除流表才能删除原表。
1.2 使用场景
主要用于Cassandra往大数据/ES(Elasticsearch)同步数据变化的场景,支撑客户对应业务开展。
1.3 功能特色
华为GeminiDB for Cassandra专有能力,原生Cassandra不支持。
2 GeminiDB for Cassandra流使用方法概述
GeminiDB for Cassandra原表与流表维护两个独立的表。在开启原表的流开关之后,访问原表并且有操作原表数据会记录到对应的流表中,要读取处理流表记录,通过访问流表,访问方式与数据库其他表的访问方式相同,见第四、五章节访问方法。
3 开启关闭流
启用流的方式:使用alter table KS.TABLE with stream_enabled=true 语句启用流,当前流支持新旧映像。
关闭流:使用alter table KS.TABLE with stream_enabled=false 可以随时禁用流。关闭流之后当前流表中的数据不会立即删除,24之后删除,原表中数据的变更不会再记录到流表中。举例:
CREATE TABLE ks.table ( id int, name text, addr text,age int,email text,PRIMARY KEY (name, id)); // 创建表Alter TABLE ks.table with stream_enabled=true; // 开启流Desc ks.table; // 查看流表是否创建INSERT INTO ks.table (name , id , addr , age , email ) VALUES ('xiaoxin',31,'beijing1',33,'xiaoxin@163.com'); // 向原表写入数据select * from ks."table$streaming"; // 查看流表是否有相应数据产生Alter TABLE ks.table with stream_enabled=false; // 关闭流表
4 读取和处理流记录
应用程序要读取和处理流时,应用程序需要通过SDK连接到C*流表进行相应操作。
流表中的每条流记录均代表一个原表中数据的修改,每条流记录都会有一个时间信息,标识这条流产生的时间信息。每条流记录会在24小时后自动删除。
流表结构:
CREATE TABLE ks.table$streaming (@shardID text,@eventID timeuuid,pk,ck,@newOldImage boolean,@eventName text,co1,co2, PRIMARY KEY (@shardID, @eventID, pk, ck, @newOldImage) //pk,ck,为原表的pk,ck, co1,co2是原表的普通列);
如上,流表中包含几个特殊的字段:"@shardID"是分区键;"@eventID"是由插入时间生成的timeuuid,代表流数据产生的时间;"@newOldImage"代表新旧映像,0表示旧映像,1表示新映像;"@eventName"代表操作事件如"insert"、"update"、"delete"。
迭代处理流表的数据时请使用流表的分区加上时间戳范围访问。查询分区时使用,返回分区列表:
select stream_shards from system_schema.tables where keyspace_name='ks' and table_name='table';例如:cqlsh:ks> select stream_shards from system_schema.tables where keyspace_name='ks' and table_name='table1';stream_shards--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------['-9223372036854775808', '-6148914691236517206', '-4611686018427387905', '-3843071682022823258', '-1537228672809129303', '-2', '1537228672809129299', '4611686018427387901', '5270498306774157603', '6148914691236517202', '7686143364045646492', '7686143364045646503'](1 rows)
使用分区+时间遍历流表数据,范围查询时使用"@eventID"时间查询,每次默认返回的大小根据数据量大小决定,下次迭代使用时间继续往后迭代。例如:
select * From ks."table$streaming" where "@shardID" = '-9223372036854775808' and "@eventID" > a64a8340-e999-11e9-a7bd-cb5f001f61df limit 50;
5 接口说明
5.1 GetShardIterator
public static List<String> GetShardIterator(Cluster cluster, String keySpace, String tableName)
功能: 获取流表的shard信息
入参:
Cluster cluster:集群的cluster信息,连接数据库使用
String keySpace:要查询流数据的数据库名称
String tableName:要查询流数据的表名称
出参:
返回List<String> 一组shard集合,GetRecords接口中使用
5.2 GetRecords
public static StreamInfo GetRecords(Cluster cluster, String keySpace, TableEvent tableEvent)
功能: 获取流表的具体数据
入参:
Cluster cluster:集群的cluster信息,连接数据库使用
String keySpace:要查询流数据的数据库名称
TableEvent tableEvent:要查询流数据相关信息,TableEvent结构如下:
String table:要查询流数据的表名称
String shardID:要查询流数据的shardID
String eventID:要查询流数据的时间戳信息
int limitRow:要查询流数据的限制条数,没有指定情况下默认是100;
出参:
返回一组StreamInfo数据;具体结构如下:
String shardID:流数据的shardID
String table:流数据的原表名
List<RowInfo> columns:一组流数据的集合,RowInfo具体结构如下:
String eventID:流数据的时间戳信息
String operateType:操作类型,例如: INSERT、UPDATE、DELETE
List<DataItem> Keys: 流数据对应的原表的主键信息
List<DataItem> NewImage: 新映像的信息
List<DataItem> OldImage: 旧映像的信息
5.3 GetShardIterator
public static List<String> GetShardIterator(Session session, String keySpace, String tableName)
功能: 获取流表的shard信息
入参:
Session session:数据库集群的连接session,调用函数之后session需要调用者关闭
String keySpace:要查询流数据的数据库名称
String tableName:要查询流数据的表名称
出参:
返回List<String> 一组shard集合,GetRecords接口中使用
5.4 GetRecords
public static StreamInfo GetRecords(Session session, String keySpace, TableEvent tableEvent)
功能: 获取流表的具体数据
入参:
Session session:数据库集群的连接session,调用函数之后session需要调用者关闭;
String keySpace:要查询流数据的数据库名称;
TableEvent tableEvent:要查询流数据相关信息,TableEvent结构如下:
String table:要查询流数据的表名称;
String shardID:要查询流数据的shardID;
String eventID:要查询流数据的时间戳信息;
int limitRow:要查询流数据的限制条数,没有指定情况下默认是100;
List<ColumnMetadata> primaryKey: 要查询流数据的表的主键名称类型信息
出参:
返回一组StreamInfo数据;具体结构如下:
String shardID:流数据的shardID
String table:流数据的原表名
List<RowInfo> columns:一组流数据的集合,RowInfo具体结构如下:
String eventID:流数据的时间戳信息
String operateType:操作类型,例如: INSERT、UPDATE、DELETE
List<DataItem> Keys: 流数据对应的原表的主键信息
List<DataItem> NewImage: 新映像的信息
List<DataItem> OldImage: 旧映像的信息
5.5 GetRecords返回结果范例
{"ShardID": "-4611686018427387905","Table": "tb1","Records": [{"EventID": "52236080-efb5-11e9-9c62-49626763b3dc","OperateType": "INSERT","Keys": [{"columnName": "name","value": "zhoujielun","type": "varchar"}, {"columnName": "id","value": 31,"type": "int"}],"NewImage": [{"columnName": "name","value": "zhoujielun","type": "varchar"}, {"columnName": "id","value": 31,"type": "int"}, {"columnName": "addr","value": "宇宙中心","type": "varchar"}, {"columnName": "age","value": 33,"type": "int"}, {"columnName": "email","value": "zhoujielun.com","type": "varchar"}],"OldImage": []}, {"EventID": "52255c50-efb5-11e9-9c62-49626763b3dc","OperateType": "UPDATE","Keys": [{"columnName": "name","value": "zhoujielun","type": "varchar"}, {"columnName": "id","value": 32,"type": "int"}],"NewImage": [{"columnName": "name","value": "zhoujielun","type": "varchar"}, {"columnName": "id","value": 32,"type": "int"}, {"columnName": "addr","value": "宇宙中心","type": "varchar"}, {"columnName": "age","value": 33,"type": "int"}, {"columnName": "email","value": "zhoujielun.com","type": "varchar"}],"OldImage": [{"columnName": "name","value": "zhoujielun","type": "varchar"}, {"columnName": "id","value": 32,"type": "int"}, {"columnName": "addr","value": "宇宙中心","type": "varchar"}, {"columnName": "age","value": 33,"type": "int"}, {"columnName": "email","value": "zhoujielun.com","type": "varchar"}]}, {"EventID": "52261fa0-efb5-11e9-9c62-49626763b3dc","OperateType": "UPDATE","Keys": [{"columnName": "name","value": "zhoujielun","type": "varchar"}, {"columnName": "id","value": 33,"type": "int"}],"NewImage": [{"columnName": "name","value": "zhoujielun","type": "varchar"}, {"columnName": "id","value": 33,"type": "int"}, {"columnName": "addr","value": "宇宙中心","type": "varchar"}, {"columnName": "age","value": 33,"type": "int"}, {"columnName": "email","value": "zhoujielun.com","type": "varchar"}],"OldImage": [{"columnName": "name","value": "zhoujielun","type": "varchar"}, {"columnName": "id","value": 33,"type": "int"}, {"columnName": "addr","value": "宇宙中心","type": "varchar"}, {"columnName": "age","value": 33,"type": "int"}, {"columnName": "email","value": "zhoujielun.com","type": "varchar"}]}]}
5.6 接口使用demo1
package com.huawei.hwcloud.stream;import com.datastax.driver.core.Cluster;import com.datastax.driver.core.ColumnMetadata;import com.google.gson.Gson;import com.google.gson.GsonBuilder;import com.huawei.hwcloud.stream.req.RowInfo;import com.huawei.hwcloud.stream.req.StreamInfo;import com.huawei.hwcloud.stream.req.TableEvent;import java.util.List;public class Main {public static void main(String[] args) {Cluster cluster = Cluster.builder().addContactPoint("xxx.95.xxx.201").withPort(9042).build();// Cluster cluster = Cluster.builder().addContactPoint(endpoint).withPort(port).withCredentials(username, password).build();List<ColumnMetadata> pm = cluster.getMetadata().getKeyspace("test").getTable("tb1").getPrimaryKey();System.out.println(pm);List streamShards = null;try {streamShards = StreamFetcher.GetShardIterator(cluster, "test", "tb1");}catch (Exception e) {e.printStackTrace();}System.out.println(streamShards);TableEvent tableEvent = new TableEvent();tableEvent.setEventID("43e0eeb0-ee80-11e9-9c62-49626763b3dc");tableEvent.setShardID("-4611686018427387905");tableEvent.setTable("tb1");tableEvent.setLimitRow(6);StreamInfo streamInfo = null;try {streamInfo = StreamFetcher.GetRecords(cluster, "test", tableEvent);}catch (Exception e) {e.printStackTrace();}Gson gson = new GsonBuilder().create();String line = gson.toJson(streamInfo);System.out.println(line);System.out.println(streamInfo.getColumns().size());for (RowInfo rowInfo: streamInfo.getColumns()) {System.out.println(rowInfo.toString());}System.exit(0);}}
5.7 接口使用demo2
package com.huawei.hwcloud.stream;import com.datastax.driver.core.Cluster;import com.datastax.driver.core.ColumnMetadata;import com.datastax.driver.core.Session;import com.google.gson.Gson;import com.google.gson.GsonBuilder;import com.huawei.hwcloud.stream.req.RowInfo;import com.huawei.hwcloud.stream.req.StreamInfo;import com.huawei.hwcloud.stream.req.TableEvent;import com.huawei.hwcloud.stream.utils.WrapperCassandraSession;import java.util.List;public class Main2{public static void main(String[] args) {Cluster cluster = Cluster.builder().addContactPoint("XXX.95.XXX.201").withPort(9042).build();// Cluster cluster = Cluster.builder().addContactPoint(endpoint).withPort(port).withCredentials(username, password).build();List<ColumnMetadata> pk = cluster.getMetadata().getKeyspace("test").getTable("tb1").getPrimaryKey();System.out.println(pk);Session session = cluster.connect();List<String> streamShards = StreamFetcher.GetShardIterator(session, "test", "tb1");System.out.println(streamShards);TableEvent tableEvent = new TableEvent();tableEvent.setEventID("43e0eeb0-ee80-11e9-9c62-49626763b3dc");tableEvent.setShardID("-4611686018427387905");tableEvent.setTable("tb1");tableEvent.setLimitRow(6);tableEvent.setPrimaryKey(pk);StreamInfo streamInfo = StreamFetcher.GetRecords(session, "test", tableEvent);Gson gson = new GsonBuilder().create();String line = gson.toJson(streamInfo);System.out.println(line);System.out.println(streamInfo.getColumns().size());for (RowInfo rowInfo: streamInfo.getColumns()) {System.out.println(rowInfo.toString());}session.close();System.exit(0);}}
6 功能约束
1)流表中的数据保留24小时。
2)流表中的数据会占用数据库的磁盘空间。
3)通过CQL语句不能创建带有"$streaming"后缀的流表。
4)流表可以通过drop MATERIALIZED VIEW ks."table$streaming";进行删除,流表使用物化视图实现,遵从物化视图的限制要求。
GeminiDB for Cassandra流功能介绍.docx
【华为云技术分享】GeminiDB for Cassandra 流功能介绍相关推荐
- 【华为云技术分享】三大前端技术(React,Vue,Angular)探密(下)
[华为云技术分享]三大前端技术(React,Vue,Angular)探密(上) [Angular] Angular(通常被称为 "Angular 2+"或 "Angula ...
- 【华为云技术分享】“技术-经济范式”视角下的开源软件演进剖析-part 1
前言 以互联网为代表的信息技术的迅猛发展对整个经济体系产生了巨大的影响.信息技术的发展一方面使知识的积累和传播更加迅速,知识爆炸性的增长:另一方面,使信息的获取变得越来越容易,信息交流的强度逐渐增加, ...
- 【华为云技术分享】“技术-经济范式”视角下的开源软件演进剖析-part 3
4. 微观层面 4.1 个体动机 在开源软件发展之初, 商业组织的投入很少甚至没有, 完全是靠Richard Stallman 或者 linus Torvalds 这样的个人在努力推动开源软件艰难前行 ...
- 【华为云技术分享】直播回顾丨激发数据裂变新动能,HDC.Cloud云数据库前沿技术解读
3月24日14:00-17:00,HDC.Cloud开发者沙龙系列云数据库专场直播线上开启,此次华为云数据库通过三场直播从NoSQL数据库新技术.数据库迁移.行业解决方案等方面对云端数据库进行深度解读 ...
- 【华为云技术分享】解析:物联网数据分析服务如何做?
[摘要] 物联网设备正在产生大量的数据,如何为开发者提供简单有效的数据分析服务,简化开发过程,提升开发效率,让IoT数据快速变现是一个摆在我们面前的问题. 没有疑问,我们已经身处物联网时代了,每天都有 ...
- 【华为云技术分享】AI 开发路漫漫,什么才是真正的极客精神?
摘要:AI开发看上去很美,实践起来却不是一件容易的事.一个聪明的开发者知道借助工具提升开发效率,一个智能的平台则会站在开发者的立场,为用户提供贴心服务. "理想很丰满,现实很骨感." ...
- 【华为云技术分享】在 K8S 大规模场景下 Service 性能如何优化?
摘要:Kubernetes 原生的 Service 负载均衡基于 Iptables 实现,其规则链会随 Service 的数量呈线性增长,在大规模场景下对 Service 性能影响严重.本文分享了华为 ...
- 【华为云技术分享】前端工程师必备:从浏览器的渲染到性能优化
摘要:本文主要讲谈及浏览器的渲染原理.流程以及相关的性能问题. 问题前瞻 1. 为什么css需要放在头部?2. js为什么要放在body后面?3. 图片的加载和渲染会阻塞页面DOM构建吗?4. dom ...
- 【华为云技术分享】上亿条数据,如何查询分析简单又高效?
正值618大促,小张遇到了一个棘手的问题,需要在一周内将公司近1年电商部门的营收和线下门店经营数据进行联合分析. 这将产生哪些数据难题呢? 数据孤岛:电商部门的数据存在数仓A.门店经营收入数据存在数仓 ...
最新文章
- mysql中change用法,mysql 中alter的用法以及一些步骤
- Linux的基本指令--服务器
- Lucene.Net(转)
- gpio_request 原形代码
- 如何用C#在Excel中生成图表?
- vscode tab键快捷生成元素html标签
- Java加密与解密的艺术~SM4实现
- python中if控制语句_Python中流程控制语句之IF语句
- java jdbc代码_javajdbc代码解决
- Voice Lab 7- AAR-SRST-Media Resource
- iPhone 12 Pro/Pro Max最新渲染图曝光
- 基于JAVA+SpringMVC+Mybatis+MYSQL的企业计划管理系统
- 第三章 文件过滤及内容编辑处理命令
- Linux 基本操作命令
- 超赞 ! 老外的一种避免递归查询所有子部门的树数据表设计与实现!
- node.js 爬取腾讯地图API全国行政区数据
- ado连接mysql方式_用ADO 连接mysql数据库的方法
- 计算机用户授权原则,涉密信息系统严格设定用户权限,按照什么密级防护和什么授权管理的原则...
- 如何关闭 Linux 中的嘟嘟声
- 【C语言复习】C语言中的文件操作
热门文章
- 微型计算机2020年5月上,2020年小进初微机派位细则出炉,意向民办最多可参加3次派位!...
- c语言用于提示的指令,C语言指令、符号表.doc
- java docx 内存溢出_第2章 Java内存区域与内存溢出异常
- win定时关机_如何让电脑定时自动关机
- 手机端html返回顶部,vue实现移动端返回顶部
- golang xorm框架对应pg数据库字段类型参照表
- service nginx start启动nginx出现Failed to start nginx.service:unit not found
- [bzoj] 2049 洞穴勘探 || LCT
- 算法:位运算加减乘除
- Esfog_UnityShader教程_NormalMap法线贴图