2019独角兽企业重金招聘Python工程师标准>>>

1、 背景

当前互联网、金融、政府等行业,活动流数据几乎无处不在。对这种数据通常的处理方式是先把各种活动以日志的形式写入某种文件,然后周期性地对这些文件进行统计分析。活动流数据的这种处理方式对实时性要求越来越高的场景已经不在适用并且这种处理方式也增加了整个系统的复杂性,为了解决这种问题,分布式开源消息系统Kakfa已被多家不同类型的公司 作为多种类型的数据管道和消息系统使用。

Kafka是一种分布式的,基于发布/订阅的消息系统。提供消息持久化能力,支持消息分区,分布式消费,同时保证每个分区内的消息顺序传输,支持在线水平扩展、高吞吐率,同时支持离线数据处理和实时数据处理。

巨杉数据库SequoiaDB支持海量分布式数据存储,并且支持垂直分区和水平分区,利用这些特性可以将Kafka中的消息存储到SequoiaDB中方便业务系统后续数据分析、数据应用。本文主要讲解巨杉数据库SequoiaDB如何消费Kafka中的消息以及将消息存储到SequoiaDB中。

2、 产品介绍

巨杉数据库SequoiaDB是一款分布式非关系型文档数据库,可以被用来存取海量非关系型的数据,其底层主要基于分布式,高可用,高性能与动态数据类型设计,它兼顾了关系型数据库中众多的优秀设计:如索引、动态查询和更新等,同时以文档记录为基础更好地处理了动态灵活的数据类型。PostgreSQL支持标准SQL,巨杉SequoiaDB SSQL套件通过扩展 PostgreSQL功能可以使用标准SQL 语句访问 SequoiaDB 数据库,完成对SequoiaDB 数据库的各种操作。将Kafka中的消息存储到SequoiaDB后,可利用巨杉SequoiaDB SSQL对这些消息数据进行在线实时的数据分析和数据应用。

3、 环境搭建

3.1、软件配置

操作系统:windows 7

JDK:1.7.0_80 64位,下载地址为:http://www.oracle.com/technetwork/java/javase/downloads/java-archive-downloads-javase7-521261.html#jdk-7u80-oth-JPR

eclipse:4.5.2

SequoiaDB:1.12.5或以上版本

Kakfa:0.10.0.0,下载地址为:http://211.162.127.20/files/5115000001D9C0FE/www-us.apache.org/dist/kafka/0.10.0.0/kafka_2.10-0.10.0.0.tgz

本项目主要实现从Kafka中消费数据并写入到SequoiaDB中来展示Kafka对接SequoiaDB的整个过程。

创建项目工程如下图:

图3-1-1

3.2、kafka启动及topic创建

在kafka启动前启动zookeeper,Kafka启动,执行脚本如下:

./kafka-server-start.sh ../config/server.properties &

Kafka创建topic,执行脚本如下:

./kafka-topics.sh --zookeeper localhost:2181 --create --topic kafkaSdb --partitions 1 --replication-factor 1

执行结果如下图:

图3-2-1

验证Kafka主题,执行脚本如下:

./kafka-topics.sh --zookeeper localhost:2181 –list

执行结果如下图:

图3-2-2

4、 代码演示

4.1、框架搭建代码展示

Kafka分布式系统分为生产者和消费者,生产者主要产生消息数据供消费者消费,消费者主要消费存储在Kafka中的消息数据。本项目主要演示向SequoiaDB中写入Kafka中的消息,故消息的生产只提供演示代码。生产者和消费者各种参数分别放在各自的配置文件中。

Ø 生产端配置文件如下:

kafka-producer.propertiesbootstrap.servers=192.168.1.35:9092retries=0linger.ms=1key.serializer=org.apache.kafka.common.serialization.StringSerializervalue.serializer=org.apache.kafka.common.serialization.StringSerializerpartitioner.class=com.sequoiadb.kafka.DefaultPartitioner

Ø 消费端配置文件如下:

kafka-consumer.propertiesbootstrap.servers=192.168.1.35:9092 enable.auto.commit=true  auto.commit.interval.ms=60000enable.auto.commit=falseauto.offset.reset=earliestsession.timeout.ms=30000key.deserializer=org.apache.kafka.common.serialization.StringDeserializervalue.deserializer=org.apache.kafka.common.serialization.StringDeserializer

Ø Kafka主题、SequoiaDB集合、消息分区配置文件如下:

config.json[{topicName:'kafkaSdb',sdbCLName:'kafkaSdb',partitionNum:1,topicGroupName:'kafkaSdb-consumer-group',pollTimeout:5000}]

4.2、业务实现代码展示

4.2.1、配置代码展示

本项目将Kafka的配置放在配置文件中如Kafka的主题,主题的分区数,SequoiaDB集合并用java对象进行封装,利用工具类进行获取。

配置信息java实体类如下:

package com.sequoiadb.kafka.bean;public class KafkaConsumerConfig {private String topicName;private String sdbCLName;private int partitionNum = 1;private String topicGroupName;private long pollTimeout = Long.MAX_VALUE;public String getTopicName() {return topicName;}public void setTopicName(String topicName) {this.topicName = topicName;}public String getSdbCLName() {return sdbCLName;}public void setSdbCLName(String sdbCLName) {this.sdbCLName = sdbCLName;}public int getPartitionNum() {return partitionNum;}public void setPartitionNum(int partitionNum) {this.partitionNum = partitionNum;}public String getTopicGroupName() {return topicGroupName;}public void setTopicGroupName(String topicGroupName) {this.topicGroupName = topicGroupName;}public long getPollTimeout() {return pollTimeout;}public void setPollTimeout(long pollTimeout) {this.pollTimeout = pollTimeout;}public String toString(){return "[topicName="+this.topicName+",sdbCLName="+this.sdbCLName+",partitionNum="+this.partitionNum",topicGroupName="+this.topicGroupName+",pollTimeout="+this.pollTimeout+"]";}}

配置信息获取工具类如下:

package com.sequoiadb.utils;import java.io.IOException;import java.io.InputStream;import java.util.Properties;public class PropertiesUtils {private static Properties prop = null;static{InputStream in = PropertiesUtils.class.getClassLoader().getResourceAsStream("config.properties");prop = new Properties();try {prop.load(in);} catch (IOException e) {e.printStackTrace();}}public static String getProperties(String key){return (String)prop.get(key);}public static void main(String[] argc){System.out.println(PropertiesUtils.getProperties("scm.url"));}}

4.2.2、业务逻辑代码演示

生产者业务逻辑代码展示:

package com.sequoiadb.kafka;import java.io.IOException;import java.io.InputStream;import java.util.Properties;import org.apache.commons.io.IOUtils;import org.apache.kafka.clients.producer.Callback;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.clients.producer.RecordMetadata;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import com.sequoiadb.utils.Configuration;public class PartitionTest {private static Logger log = LoggerFactory.getLogger(PartitionTest.class);private static String location = "kafka-producer.properties";// 配置文件位置public static void main(String[] args) {Properties props = new Properties();String json = null;try {props.load(Thread.currentThread().getContextClassLoader().getResourceAsStream(location));InputStream in = Configuration.class.getClassLoader().getResourceAsStream("oracle.json");json = IOUtils.toString(in);} catch (IOException e) {e.printStackTrace();}KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);for (int i = 0; i < 1000; i++) {ProducerRecord<String, String> record = new ProducerRecord<String, String>("oracle", json);producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception e) {if (e != null) {log.error("the producer has a error:" + e.getMessage());}}});}try {Thread.sleep(1000);producer.close();} catch (InterruptedException e1) {e1.printStackTrace();}}}

消费者业务逻辑采用一线程一主题的方式进行消息的消费,主程序入口代码如下:

package com.sequoiadb.kafka;import java.util.ArrayList;import java.util.List;import java.util.Map;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.TimeUnit;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import com.sequoiadb.kafka.bean.KafkaConsumerConfig;import com.sequoiadb.utils.Configuration;import com.sequoiadb.utils.Constants;public class KafkaSdb {private static Logger log = LoggerFactory.getLogger(KafkaSdb.class);private static ExecutorService executor;public static void main(String[] args) {// 获取kafka主题配置List<KafkaConsumerConfig> topicSdbList = Configuration.getConfiguration();if (topicSdbList != null && topicSdbList.size() > 0) {executor = Executors.newFixedThreadPool(topicSdbList.size());final List<ConsumerThread> consumerList = new ArrayList<ConsumerThread>();for (int i = 0; i < topicSdbList.size(); i++) {KafkaConsumerConfig consumerConfig = topicSdbList.get(i);ConsumerThread consumer = new ConsumerThread(consumerConfig);consumerList.add(consumer);executor.submit(consumer);}Runtime.getRuntime().addShutdownHook(new Thread() {@Overridepublic void run() {for (ConsumerThread consumer : consumerList) {consumer.shutdown();}executor.shutdown();try {executor.awaitTermination(5000, TimeUnit.MILLISECONDS);} catch (InterruptedException e) {e.printStackTrace();}}});} else {log.error("主题为空,请确认主题配置是否正确!");}}}

线程类负责具体的消息的消费,并且将消息数据写入到SequoiaDB中,具体代码如下:

package com.sequoiadb.kafka;import java.io.IOException;import java.util.ArrayList;import java.util.Arrays;import java.util.Iterator;import java.util.List;import java.util.Properties;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.common.errors.WakeupException;import org.bson.BSONObject;import org.bson.BasicBSONObject;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import com.sequoiadb.base.CollectionSpace;import com.sequoiadb.base.DBCollection;import com.sequoiadb.base.Sequoiadb;import com.sequoiadb.exception.BaseException;import com.sequoiadb.kafka.bean.KafkaConsumerConfig;import com.sequoiadb.utils.ConnectionPool;import com.sequoiadb.utils.Constants;import net.sf.json.JSONArray;import net.sf.json.JSONObject;public class ConsumerThread implements Runnable {private static Logger log = LoggerFactory.getLogger(ConsumerThread.class);private String location = "kafka-consumer.properties";// 配置文件位置private Sequoiadb sdb = null;private CollectionSpace cs = null;private DBCollection cl = null;private KafkaConsumer<String, String> consumer = null;// private String topicName = null;// private String clName = null;// private String topicGroupName = null;// private long pollTimeout = 1000;private KafkaConsumerConfig consumerConfig;public ConsumerThread(KafkaConsumerConfig consumerConfig) {if (null == sdb) {sdb = ConnectionPool.getInstance().getConnection();}if (sdb.isCollectionSpaceExist(Constants.CS_NAME)) {cs = sdb.getCollectionSpace(Constants.CS_NAME);} else {throw new BaseException("集合空间" + Constants.CS_NAME + "不存在!");}if (null == cs) {throw new BaseException("集合空间不能为null!");} else {this.consumerConfig = consumerConfig;this.cl = cs.getCollection(this.consumerConfig.getSdbCLName());}Properties props = new Properties();try {props.load(Thread.currentThread().getContextClassLoader().getResourceAsStream(location));} catch (IOException e) {e.printStackTrace();}props.put("group.id", this.consumerConfig.getTopicGroupName());consumer = new KafkaConsumer<>(props);}@Overridepublic void run() {log.info("主题为" + this.consumerConfig.getTopicName() + "的消费者线程启动!");try {// 订阅topicconsumer.subscribe(Arrays.asList(this.consumerConfig.getTopicName()));while (true) {ConsumerRecords<String, String> records = consumer.poll(this.consumerConfig.getPollTimeout());// consumer.seekToBeginning(Arrays.asList(new// TopicPartition(this.topicName, 0)));// consumer.seek(new TopicPartition(this.topicName, 0), 0);List<BSONObject> list = new ArrayList<BSONObject>();for (ConsumerRecord<String, String> record : records) {String value = record.value();JSONObject valueJson = JSONObject.fromObject(value);if (valueJson.containsKey("data")) {JSONArray dataJsonArray = valueJson.getJSONArray("data");for (int i = 0; i < dataJsonArray.size(); i++) {BSONObject httpBson = new BasicBSONObject();JSONObject dataJson = dataJsonArray.getJSONObject(i);Iterator iter = dataJson.keys();while (iter.hasNext()) {String key = (String) iter.next();String bsonValue = dataJson.getString(key);httpBson.put(key, bsonValue);}list.add(httpBson);// clHttp.insert(httpBson);}} else {log.error("消息中不存在data节点!");}}if (list != null && list.size() > 0) {try {this.cl.bulkInsert(list, DBCollection.FLG_INSERT_CONTONDUP);log.info("主题为"+this.consumerConfig.getTopicName()+"的消息插入SDB成功,插入记录数为:"+list.size());} catch (BaseException e) {e.printStackTrace();}}consumer.commitSync();}} catch (WakeupException e) {} finally {consumer.close();}}public void shutdown(){consumer.wakeup();}}

5、 总结

从上述对接过程中,Kafka中的消息写入SequoiaDB难点是Kafka中主题分区的配置以及多线程如何消费各主题分区中的消息,并且处理消息消费失败的情况。

转载于:https://my.oschina.net/wangzhonnew/blog/1559772

【技术教程】SequoiaDB对接Kafka相关推荐

  1. kafka对接mysql_【Canal】利用canal实现mysql实时增量备份并对接kafka

    简介 canal 1.1.1版本之后, 默认支持将canal server接收到的binlog数据直接投递到MQ, 目前默认支持的MQ系统有: kafka: https://github.com/ap ...

  2. 五丶阿东安装部署教程+青龙对接阿东及傻妞实现自动登录

    阿东安装部署教程+青龙对接阿东及傻妞实现自动登录 没有服务器的先自行购买,这里推荐腾讯云2H4G8M首年70–点击购买 青龙面板安装 傻妞安装教程--+命令 QQ交流:1014549449 ----- ...

  3. 鼠标绘图 c语言,c语言高级编程技术教程 图形显示方式与鼠标输入.doc

    c语言高级编程技术教程 图形显示方式与鼠标输入 c语言高级编程技术教程 图形显示方式和鼠标输入 图形显示方式和鼠标输入 问题的提出编写程序,使用鼠标进行如下操作:按住鼠标器的任意键并移动,十字光 标将 ...

  4. java调用easyxml接口_【技术教程】如何通过Java程序调用RTSP拉流协议视频平台EasyNVR程序接口?...

    原标题:[技术教程]如何通过Java程序调用RTSP拉流协议视频平台EasyNVR程序接口? RTSP协议视频平台EasyNVR经过多年的积累,已经是一套成熟且完善的视频平台了,用户可以通过网页直接访 ...

  5. 计算机应用技术教程的答案,计算机应用技术教程第3章办公自动化答案

    计算机应用技术教程参考答案 答案: Word部分 一.单选题 1. 在Word中,如果在英文文章中出现红色波浪下划线,表示( ). A.单词拼写错 B.要全部小写 C.语法错 D.要全部大写 2. 在 ...

  6. Spring MVC Boot Cloud 技术教程汇总

    转载自 Spring MVC & Boot & Cloud 技术教程汇总 昨天我们发布了Java成神之路上的知识汇总,今天继续. Java成神之路技术整理(长期更新) 以下是Java技 ...

  7. 好程序员技术教程分享JavaScript运动框架

    好程序员技术教程分享JavaScript运动框架,有需要的朋友可以参考下. JavaScript的运动,即让某元素的某些属性由一个值变到另一个值的过程.如让div的width属性由200px变到400 ...

  8. springboot 系列技术教程目录

    前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家.点击跳转到教程. 一.教程目录地址: springboot系列技术教程目录 二.教程内容: springboot2.X ...

  9. html5如何新建定义站点,HTML5技术教程:创建新作品_HTML5教程_创建作品_添加元素_课课家...

    我们所熟知的HTM5软件是一款非常强大功能的软件,据我了解HTM5软件的特色也是非常有优势的.我们先来介绍一下它的优势: HTML5可以提供: ①:提高可用性和改进用户的友好体验; ②:有几个新的标签 ...

最新文章

  1. 机房收费系统之uml图——初版
  2. Windows客户端C/C++编程规范“建议”——文件
  3. eScan Internet Security Suite 2006
  4. adc0808温度换算公式_adc0808模数转换电路图及程序
  5. 超强整理!PCB设计之电流与线宽的关系
  6. 计算机视觉、机器学习相关领域论文和源代码大集合--持续更新……(转载)
  7. Citrix XenServer 6.5 发布
  8. Java核心技术 卷II 高级特性 原书第9版pdf
  9. 怎样正确的理解和解决 ORA-01843:not a valid month
  10. MVC传参数给js的时候 如果是数值 变量要进行一下转换才能正确识别 例如var aaa = parseInt('@Model.ClickIndex');...
  11. 计算机机房的荷载,​计算机信息中心机房建设标准
  12. SPSS基础教程:SPSS菜单命令详解(三)
  13. 斐波那契数列各种方法求解
  14. Elastic Stack 开源的大数据解决方案
  15. Matlab 许可证文件过期
  16. 信用社pb通用记账_信用社会计记账采用的是()。A、收付实现制B、权责发生制C、借贷记账法D、单式记账法...
  17. 肝脏~卧则回血,坐立向外供血
  18. 报错信息:Avoid mutating a prop directly since the value will be overwritten
  19. 大数据开发:基于Hadoop的数据分析平台
  20. python 信用卡系统+购物商城见解

热门文章

  1. 安可与普通测评的区别_PRINCE王子TeXtreme Tour 100系列网球拍测评(文末有福利)
  2. php鼠标经过显示文本,CSS_HTML和CSS做网页实例教程:鼠标滑过文字改变,关于HTML+CSS的实例效果很多, - phpStudy...
  3. linux:uabntu日常操作
  4. anconda安装及opencv配置
  5. lazada开店流程图解,及平台类目佣金分享!
  6. TEA加密算法的C/C++实现
  7. chrome插件开发
  8. 动态生成控件的消息处理
  9. c语言查找星期几,新手做的日历表及查找日期是星期几
  10. Lockey的沙雕低错集锦(未完待续~自己提升用)