文章目录

  • 1. 功能介绍
  • 2.快速开始
    • 2.1 启动客户端
    • 2.2 获取DataHub客户端
    • 2.3 写数据
    • 2.4 读数据
  • 3. 核心代码

DataHub 类似于传统大数据解决方案中 Kafka 的角色,提供了一个数据队列功能。
DataHub 除了供了一个缓冲的队列作用。同时由于 DataHub 提供了各种与其他阿里云
上下游产品的对接功能,所以 DataHub 又扮演了一个数据的分发枢纽工作。


datahub提供了开发者生产和消费的sdk,在平时的开发中往往会写很多重复的代码,我们可以利用springboot为我们提供的自定义starter的方式,模仿springboot官方的starter组件实现方式,来封装一个更高效简单易用的starter组件,实现开箱即用。

本文仅提供核心思路实现供学习使用,应根据自己所在公司开发习惯做定制开发

1. 功能介绍

1.无需关心DataHub底层如何操作,安心编写业务代码即可进行数据的获取和上传,

2.类似RabbitMQ的starter,通过注解方式,Listener和Handler方式进行队列消费

3.支持游标的上次记忆功能

<dependency><artifactId>cry-starters-projects</artifactId><groupId>cn.com.cry.starters</groupId><version>2022-1.0.0</version>
</dependency>

2.快速开始

2.1 启动客户端

配置阿里云DataHub的endpoint以及AK信息

aliyun:datahub:# 开启功能havingValue: true#是否为私有云isPrivate: falseaccessId: xxxaccessKey: xxxendpoint: xxx#连接DataHub客户端超时时间conn-timeout: 10000

启动SpringBoot,你会发现datahub客户端已经启动完毕

2.2 获取DataHub客户端

DatahubClient datahubClient=DataHubTemplate.getDataHubClient();

2.3 写数据

public int write(@RequestParam("id") Integer shardId) {List<Student> datas = new ArrayList<>();for (int i = 0; i < 10; i++) {Student s = new Student();s.setAge(i);s.setName("name-" + i);s.setAddress("address-" + i);datas.add(s);}int successNumbers = DataHubTemplate.write("my_test", "student", datas, shardId);return successNumbers;
}

以上示例代码表示往 projectName为my_test, topicName为student, shardId 为N的hub里写数据,并且返回插入成功的条数

2.4 读数据

读数据开发的逻辑类似RabbitMq的starter,使用@DataHubListener和@DataHubHandler处理器注解进行使用

@Component
@DataHubListener(projectName = "my_test")
public class ReadServiceImpl {@DataHubHandler(topicName = "student", shardId = 0, cursorType = CursorTypeWrapper.LATEST)public void handler(Message message) {System.out.println("读取到shardId=0的消息");System.out.println(message.getData());System.out.println(message.getCreateTsime());System.out.println(message.getSize());System.out.println(message.getConfig());System.out.println(message.getMessageId());}
}

以上代码说明: 通过LATEST游标的方式,监听 project=my_test ,topicName=student,shardId=0 ,最终通过Message的包装类拿到dataHub实时写入的数据。

这边可以设置多种游标类型,例如根据最新的系统时间、最早录入的序号等

3. 核心代码

首先需要一个DataHubClient增强类,在SpringBoot启动时开启一个线程来监听对应的project-topic-shardingId,根据游标规则来读取当前的cursor进行数据的读取。

public class DataHubClientWrapper implements InitializingBean, DisposableBean {@Autowiredprivate AliyunAccountProperties properties;@Autowiredprivate ApplicationContext context;private DatahubClient datahubClient;public DataHubClientWrapper() {}/*** 执行销毁方法** @throws Exception*/@Overridepublic void destroy() throws Exception {WorkerResourceExecutor.shutdown();}@Overridepublic void afterPropertiesSet() throws Exception {/*** 创建DataHubClient*/this.datahubClient = DataHubClientFactory.create(properties);/*** 打印Banner*/BannerUtil.printBanner();/*** 赋值Template的静态对象dataHubClient*/DataHubTemplate.setDataHubClient(datahubClient);/*** 初始化Worker线程*/WorkerResourceExecutor.initWorkerResource(context);/*** 启动Worker线程*/WorkerResourceExecutor.start();}
}

写数据,构建了一个类似RedisDataTemplate的模板类,封装了write的逻辑,调用时只需要用DataHubTemplate.write调用

public class DataHubTemplate {private static DatahubClient dataHubClient;private final static Logger logger = LoggerFactory.getLogger(DataHubTemplate.class);/*** 默认不开启重试机制** @param projectName* @param topicName* @param datas* @param shardId* @return*/public static int write(String projectName, String topicName, List<?> datas, Integer shardId) {return write(projectName, topicName, datas, shardId, false);}/*** 往指定的projectName以及topic和shard下面写数据** @param projectName* @param topicName* @param datas* @param shardId* @param retry* @return*/private static int write(String projectName, String topicName, List<?> datas, Integer shardId, boolean retry) {RecordSchema recordSchema = dataHubClient.getTopic(projectName, topicName).getRecordSchema();List<RecordEntry> recordEntries = new ArrayList<>();for (Object o : datas) {RecordEntry entry = new RecordEntry();Map<String, Object> data = BeanUtil.beanToMap(o);TupleRecordData tupleRecordData = new TupleRecordData(recordSchema);for (String key : data.keySet()) {tupleRecordData.setField(key, data.get(key));}entry.setRecordData(tupleRecordData);entry.setShardId(String.valueOf(shardId));recordEntries.add(entry);}PutRecordsResult result = dataHubClient.putRecords(projectName, topicName, recordEntries);int failedRecordCount = result.getFailedRecordCount();if (failedRecordCount > 0 && retry) {retry(dataHubClient, result.getFailedRecords(), 1, projectName, topicName);}return datas.size() - failedRecordCount;}/*** @param client* @param records* @param retryTimes* @param project* @param topic*/private static void retry(DatahubClient client, List<RecordEntry> records, int retryTimes, String project, String topic) {boolean suc = false;List<RecordEntry> failedRecords = records;while (retryTimes != 0) {logger.info("the time to send message has [{}] records failed, is starting retry", records.size());retryTimes = retryTimes - 1;PutRecordsResult result = client.putRecords(project, topic, failedRecords);int failedNum = result.getFailedRecordCount();if (failedNum > 0) {failedRecords = result.getFailedRecords();continue;}suc = true;break;}if (!suc) {logger.error("DataHub send message retry failure");}}public static DatahubClient getDataHubClient() {return dataHubClient;}public static void setDataHubClient(DatahubClient dataHubClient) {DataHubTemplate.dataHubClient = dataHubClient;}
}

读数据,需要在Spring启动时开启一个监听线程DataListenerWorkerThread,执行一个死循环不停轮询DataHub下的对应通道。

public class DataListenerWorkerThread extends Thread {private final static Logger logger = LoggerFactory.getLogger(DataListenerWorkerThread.class);private volatile boolean init = false;private DatahubConfig config;private String workerKey;private int recordLimits;private int sleep;private RecordSchema recordSchema;private RecordHandler recordHandler;private CursorHandler cursorHandler;public DataListenerWorkerThread(String projectName, String topicName, int shardId, CursorTypeWrapper cursorType, int recordLimits, int sleep, int sequenceOffset, String startTime, StringRedisTemplate redisTemplate) {this.config = new DatahubConfig(projectName, topicName, shardId);this.workerKey = projectName + "-" + topicName + "-" + shardId;this.cursorHandler = new CursorHandler(cursorType, sequenceOffset, startTime, redisTemplate, workerKey);this.recordLimits = recordLimits;this.sleep = sleep;this.setName("DataHub-Worker");this.setDaemon(true);}@Overridepublic void run() {initRecordSchema();String cursor = cursorHandler.positioningCursor(config);for (; ; ) {try {GetRecordsResult result = DataHubTemplate.getDataHubClient().getRecords(config.getProjectName(), config.getTopicName(), String.valueOf(config.getShardId()), recordSchema, cursor, recordLimits);if (result.getRecordCount() <= 0) {// 无数据,sleep后读取Thread.sleep(sleep);continue;}List<Map<String, Object>> dataMap = recordHandler.convert2List(result.getRecords());logger.info("receive [{}] records from project:[{}] topic:[{}] shard:[{}]", dataMap.size(), config.getProjectName(), config.getTopicName(), config.getShardId());// 拿到下一个游标cursor = cursorHandler.nextCursor(result);//执行方法WorkerResourceExecutor.invokeMethod(workerKey, JsonUtils.toJson(dataMap), dataMap.size(), config, cursor);} catch (InvalidParameterException ex) {//非法游标或游标已过期,建议重新定位后开始消费cursor = cursorHandler.resetCursor(config);logger.error("get Cursor error and reset cursor localtion ,errorMessage:{}", ex.getErrorMessage());} catch (DatahubClientException e) {logger.error("DataHubException:{}", e.getErrorMessage());this.interrupt();} catch (InterruptedException e) {logger.info("daemon thread {}-{} interrupted", this.getName(), this.getId());} catch (Exception e) {this.interrupt();logger.error("receive DataHub records cry.exception:{}", e, e);}}}/*** 终止*/public void shutdown() {if (!interrupted()) {interrupt();}}/*** 初始化topic字段以及recordSchema*/private void initRecordSchema() {try {if (!init) {recordSchema = DataHubTemplate.getDataHubClient().getTopic(config.getProjectName(), config.getTopicName()).getRecordSchema();List<Field> fields = recordSchema.getFields();this.recordHandler = new RecordHandler(fields);init = true;}} catch (Exception e) {logger.error("initRecordSchema error:{}", e, e);}}
}

read的时候结合了注解开发,通过定义类注解DataHubListener和方法注解DataHubHandler内置属性,来动态的控制需要在哪些方法中处理监听到的数据的逻辑:

DataHubHandler

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface DataHubHandler {/*** 话题名称** @return*/String topicName();/*** shardId** @return*/int shardId();/*** 最大数据量限制** @return*/int recordLimit() default 1000;/*** 游标类型** @return*/CursorTypeWrapper cursorType() default CursorTypeWrapper.LATEST;/*** 若未监听到数据添加,休眠时间 ms** @return*/int sleep() default 10000;/*** 使用CursorType.SYSTEM_TIME的时候配置 时间偏移量** @return*/String startTime() default "";/*** 使用使用CursorType.SEQUENCE的时候配置,偏移量,必须是正整数** @return*/int sequenceOffset() default 0;
}

DataHubListener

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface DataHubListener {String projectName();
}

最后我们需要启动SpringBootStarter的EnableConfigurationProperties 功能,通过配置文件来控制default-bean的开启或者关闭。

启动类:

@Configuration
@EnableConfigurationProperties(value = {AliyunAccountProperties.class})
public class DataHubClientAutoConfiguration {/*** 初始化dataHub装饰bean** @return*/@Beanpublic DataHubClientWrapper dataHubWrapper() {return new DataHubClientWrapper();}}

属性配置类

@ConditionalOnProperty(prefix = "aliyun.datahub",havingValue = "true")
@Data
public class AliyunAccountProperties implements Properties{/*** http://xxx.aliyuncs.com*/private String endpoint;/*** account*/private String accessId;/*** password*/private String accessKey;/*** private cloud || public cloud*/private boolean isPrivate;/*** unit: ms*/private Integer connTimeout = 10000;
}

最后记得要做成一个starter,在resources下新建一个META-INF文件夹,新建一个spring.factories文件,

org.springframework.boot.autoconfigure.EnableAutoConfiguration= \cry.starter.datahub.DataHubClientAutoConfiguration

大体逻辑就是这样了,你学会了吗? hhhhhhhhh~

【starters】springboot-starter整合阿里云datahub相关推荐

  1. springboot项目整合阿里云oss的内容审核

    springboot项目整合阿里云 内容审核 第一 添加依赖 <dependency><groupId>com.aliyun</groupId><artifa ...

  2. springboot + vue 整合 阿里云 视频点播 功能

    阿里云 视频播放操作 1.1.找到视频点播 1.2.进入管理控制台 1.2.开通服务 1.3.选择"按使用流量计费",开通服务 1.4.开通后,进入管理控制台 1.5.上传音 / ...

  3. springboot + Vue 整合阿里云视频点播 | Spring Boot 20

  4. SpringBoot整合阿里云OSS文件上传、下载、查看、删除

    SpringBoot整合阿里云OSS文件上传.下载.查看.删除 该项目源码地址:https://github.com/ggb2312/springboot-integration-examples ( ...

  5. SpringBoot整合——阿里云对象存储(OSS)

    SpringBoot整合--阿里云对象存储 1 OSS介绍 在开发应用的过程中,我们经常会有用户需要实名认证之后才能访问的需求. 用户认证需要上传证件图片.首页轮播也需要上传图片,因此我们要做文件服务 ...

  6. SpringBoot整合阿里云视频点播

    文章目录 SpringBoot整合阿里云视频点播 1.准备工作 2.服务端SDK的使用 2.1 导入依赖 2.2 初始化类 2.3 创建读取公共常量的工具类 2.4 获取视频播放地址 2.5 获取视频 ...

  7. SpringBoot整合阿里云短信服务详细过程(保证初学者也能实现)

    前言 网上关于实操性的文章普遍大部分都记录不全,要么只记录重点部分,对于刚学习的小伙伴来说看起来是比较困难的 所以,基于这一点. 该文章会详细介绍使用SpringBoot整合阿里云短信服务的每一步过程 ...

  8. SpringBoot整合阿里云OSS

    文章目录 SpringBoot整合阿里云OSS 1.准备工作 1.1 开通"对象存储OSS"服务 1.2 创建Bucket 1.3 创建RAM子用户 2.SpringBoot整合阿 ...

  9. springboot整合阿里云oss上传的方法示例

    这篇文章主要介绍了springboot整合阿里云oss上传的方法示例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧 OSS申请和 ...

最新文章

  1. Beam Search
  2. 【Java基础】序列化之serialVersionUID
  3. 共轭梯度法求解线性方程组
  4. mysql fio测试_Linux下 fio磁盘压测笔记
  5. C#中HTML和UBB互相转换的代码
  6. JavaScript——语法
  7. Webpack的基本配置
  8. 通过2048学习自定义view(一)
  9. 迪赛智慧数——柱状图(正负条形图):应届生各行业平均薪酬排行
  10. javascript history对象详解
  11. 基于python的手机号码归属信息查询系统
  12. [RFID]IC卡克隆(四)Proxmark3全卡克隆已解密的IC卡
  13. 《从0到1》读书摘要
  14. 洛谷1717 钓鱼
  15. 中国户外广告业大会 颠覆与革新 我们8月聚首北京
  16. python--定义新矩阵、矩阵赋值、改变类型
  17. 群晖video station 2.4.9-1626申请API后无法连接的解决办法
  18. CentOS6和CentOS7 一键更换内核,一键安装锐速
  19. 地图手绘图生成切片位置进行微调,使图片更精准地贴在地图上
  20. 无线网卡错误代码10(intel 9560)wifi不能使用

热门文章

  1. Hadoop--万恶NameNode各种起不来!!!
  2. 3 种常用校验码「奇偶校验码」「海明校验码」「循环冗余校验码」
  3. 想裸辞的N个瞬间,裸辞后又陷入焦虑!
  4. 蓝牙资讯|苹果AirPods Pro充电盒将换用USB-C接口,还有新功能在测试
  5. 【性能调优】Oracle AWR报告指标全解析
  6. 04、DDR不同类型的区别和硬盘接口分类
  7. 深入浅出计算机组成原理学习笔记:SSD硬盘(上)-如何完成性能优化的KPI?(第46讲)...
  8. Kubernetes 1.23:探索新边界
  9. HDU 4276 The Ghost Blows Light(树形DP)
  10. 给自己起了一个英文名字