Spring Cloud Stream
Spring Cloud Stream
基本概念
Source:来源(近义词:Producer、Publisher)
Sink:接收器(近义词:Consumer、Subscriber)
Processor:对于上流而言是Sink,对于下游而言是Source
Binder:kafka
消息大致分为两个部分:
消息头(Headers)
消息体(Body/Payload)
启动Zookeeper
启动kafka
Producer实现
定义发送通道
public interface Source { /** * 需求通道 */ String OUT_PUT_DEMAND = "out_put_demand"; /** * 任务通道 */ String OUT_PUT_TASK = "out_put_task"; /** * 工作日志通道 */ String OUT_PUT_WORK_LOG = "out_put_workLog"; /** * 组织结构信息通道 */ String OUT_PUT_ORG = "out_put_org"; /** * 代码质量通道 */ String OUT_PUT_QUALITY = "out_put_quality"; @Output(Source.OUT_PUT_DEMAND) MessageChannel demand(); @Output(Source.OUT_PUT_TASK) MessageChannel task(); @Output(Source.OUT_PUT_WORK_LOG) MessageChannel workLog(); @Output(Source.OUT_PUT_ORG) MessageChannel org(); @Output(Source.OUT_PUT_QUALITY) MessageChannel quality(); }
生产类
public class Producer { /** * 默认发送消息 * * @param message * @param channel * @return */ public static Boolean send(Object message, MessageChannel channel) { return send(message, channel, 5000L); } /** * 带超时时间 * * @param message * @param timeout * @param channel * @return */ public static Boolean send(Object message, MessageChannel channel, Long timeout) { return channel.send(MessageBuilder.withPayload(message).build(), timeout); } }
Binding
@EnableBinding(Source.class) public class SourceAutoConfiguration { }
策略模式-消息类型
public enum SendType { DEMAND_MESSAGE(new DemandMessage()), TASK_MESSAGE(new TaskMessage()), WORK_LOG_MESSAGE(new WorkLogMessage()), CODE_QUALITY_MESSAGE(new CodeQualityMessage()); private MessageSend messageSend; SendType(MessageSend messageSend){ this.messageSend = messageSend; } public MessageSend get(){ return this.messageSend; } }
消息发送接口
public interface MessageSend { public Boolean send(Object message); }
接口实现
public class DemandMessage implements MessageSend { private static final Source SOURCE = SpringContextHelper.getBean(Source.class); @Override public Boolean send(Object message) { message = MaskMessage.messageHelper(message); if (null == message) { return false; } return Producer.send(message, SOURCE.demand()); } }
生产消息
public class ProduceHelper { /** * 需求消息生产 * @param sendType 发送类型 * @param message 消息内容 * @return boolean */ public static Boolean produce(SendType sendType, Demand message) { return sendType.get().send(message); } /** * 任务消息生产 * @param sendType 发送类型 * @param message 消息内容 * @return boolean */ public static Boolean produce(SendType sendType, Task message) { return sendType.get().send(message); } /** * 工作日志消息生产 * @param sendType 发送类型 * @param message 消息内容 * @return boolean */ public static Boolean produce(SendType sendType, WorkLog message) { return sendType.get().send(message); } /** * 代码质量消息生产 * @param sendType 发送类型 * @param message 消息内容 * @return boolean */ public static Boolean produce(SendType sendType, CodeQuality message) { return sendType.get().send(message); } }
Comsumer实现
定义接收通道
public interface Sink { /** * 需求通道 */ String IN_PUT_DEMAND = "in_put_demand"; /** * 任务通道 */ String IN_PUT_TASK = "in_put_task"; /** * 工作日志通道 */ String IN_PUT_WORK_LOG = "in_put_workLog"; /** * 组织结构信息通道 */ String IN_PUT_ORG = "in_put_org"; /** * 代码质量通道 */ String IN_PUT_QUALITY = "in_put_quality"; @Input(Sink.IN_PUT_DEMAND) SubscribableChannel demand(); @Input(Sink.IN_PUT_TASK) SubscribableChannel task(); @Input(Sink.IN_PUT_WORK_LOG) SubscribableChannel workLog(); @Input(Sink.IN_PUT_ORG) SubscribableChannel org(); @Input(Sink.IN_PUT_QUALITY) SubscribableChannel quality(); }
消费类
public interface Consumer<T> { void onMessage(T message); }
消息监听
@StreamListener方式
@Slf4j @Component public class MessageListener { @Autowired private MessageHandler messageHandler; /** * 监听需求消息 * * @param message */ @StreamListener(Sink.IN_PUT_DEMAND) public void task(Message message) { LOGGER.info("监听到任务信息:{}", message.getPayload()); //调用demand入库 messageHandler.demandSave(message); } /** * 监听任务消息 * * @param message */ @StreamListener(Sink.IN_PUT_TASK) public void bug(Message message) { LOGGER.info("监听到缺陷信息:{}", message.getPayload()); //任务消息入库 messageHandler.taskSave(message); } /** * 监听工作日志消息 * * @param message */ @StreamListener(Sink.IN_PUT_WORK_LOG) public void workLog(Message message) { LOGGER.info("监听到工作日志信息:{}", message.getPayload()); //工作日志消息入库 messageHandler.worklogSave(message); } /** * 监听组织消息 * * @param message */ @StreamListener(Sink.IN_PUT_ORG) public void org(Message message) { LOGGER.info("监听到组织信息:{}", message.getPayload()); //组织消息入库 messageHandler.orgSave(message); } /** * 监听质量消息 * * @param message */ @StreamListener(Sink.IN_PUT_QUALITY) public void quality(Message message) { LOGGER.info("接收到质量信息:{}", message.getPayload()); //质量消息入库 messageHandler.codeQualitySave(message); } }
@ServiceActivator
@ServiceActivator(Sink.IN_PUT_DEMAND) public void onMessage(String message){ System.out.printIn("@ServiceActivator:"+message); }
@PostConstruct
@PostConstruct public void init(){ //实现异步回调 subscribableChannel.subscribe(new MessageHandler){ @Override public void handleMessage(Message<?> message) throws MessagingException{ System.out.printIn("@PostConstruct:"+message); } } }
消息处理
@Slf4j @Component public class MessageHandler { @Autowired private CodeQualityRepository codeQualityRepository; @Autowired private DemandRepository demandRepository; @Autowired private TaskRepository taskRepository; @Autowired private WorkLogRepository workLogRepository; @Autowired private CompanyRepository companyRepository; @Autowired private OrgInfoRepository orgInfoRepository; /** * 需求消息入库 */ public void demandSave(Message message) { Demand demand = JSONObject.parseObject(message.getPayload().toString(), Demand.class); LOGGER.info("demand {}",demand); MongoNameGet.setCompanyId(demand.getCompanyId()); if (null != demand.getId() && null != demand.getCompanyId()) { demand.setGrabDate(new Date()); demandRepository.save(demand); saveCompany(demand.getCompanyId(),""); LOGGER.info("线程名:{}",Thread.currentThread().getName()); LOGGER.info("数据存储完毕"); } } /** * 任务消息入库 */ public void taskSave(Message message) { Task task = JSONObject.parseObject(message.getPayload().toString(), Task.class); MongoNameGet.setCompanyId(task.getCompanyId()); if (null != task.getId() && null != task.getCompanyId() && !StringUtils.isEmpty(task.getDemandId())) { task.setGrabDate(new Date()); //查询部门id和组id 补充数据 Optional<Demand> demand = demandRepository.findById(task.getDemandId()); if(demand.isPresent()){ task.setDepartId(demand.get().getDepartId()); task.setTeamId(demand.get().getTeamId()); } taskRepository.save(task); saveCompany(task.getCompanyId(),""); LOGGER.info("数据存储完毕"); } } /**
转载于:https://www.cnblogs.com/kinglead/p/10979914.html
Spring Cloud Stream相关推荐
- Spring Cloud Stream Binder 实现
Spring Cloud Stream Binder 实现 JMS 实现 ActiveMQ 1.增加Maven依赖 <!-- 整合 Sprig Boot Starter ActiveMQ --& ...
- 【本人秃顶程序员】使用Spring Cloud Stream和RabbitMQ实现事件驱动的微服务
←←←←←←←←←←←← 快!点关注 让我们展示如何使用Spring Cloud Stream来设计事件驱动的微服务.首先,Spring Cloud Stream首先有什么好处?因为Spring AM ...
- 一文了解Spring Cloud Stream体系
点击蓝色"程序猿DD"关注我哟 加个"星标",不忘签到哦 来源:阿里巴巴中间件 Spring Cloud Stream 在 Spring Cloud 体系内用于 ...
- Spring Cloud Stream 学习小清单
由于最近一直在写Spring Cloud Stream相关的内容,在2018年最后一天,把之前写过的Spring Cloud Stream内容从基础,到入门,到深入,做一些小清单,方便大家查阅. 如果 ...
- Spring Cloud Stream同一通道根据消息内容分发不同的消费逻辑
应用场景 有的时候,我们对于同一通道中的消息处理,会通过判断头信息或者消息内容来做一些差异化处理,比如:可能在消息头信息中带入消息版本号,然后通过if判断来执行不同的处理逻辑,其代码结构可能是这样的: ...
- Spring Cloud Stream消费失败后的处理策略(四):重新入队(RabbitMQ)
应用场景 之前我们已经通过<Spring Cloud Stream消费失败后的处理策略(一):自动重试>一文介绍了Spring Cloud Stream默认的消息重试功能.本文将介绍Rab ...
- Spring Cloud Stream消费失败后的处理策略(三):使用DLQ队列(RabbitMQ)
应用场景 前两天我们已经介绍了两种Spring Cloud Stream对消息失败的处理策略: 自动重试:对于一些因环境原因(如:网络抖动等不稳定因素)引发的问题可以起到比较好的作用,提高消息处理的 ...
- Spring Cloud Stream消费失败后的处理策略(二):自定义错误处理逻辑
应用场景 上一篇<Spring Cloud Stream消费失败后的处理策略(一):自动重试>介绍了默认就会生效的消息重试功能.对于一些因环境原因.网络抖动等不稳定因素引发的问题可以起到 ...
- Spring Cloud Stream消费失败后的处理策略(一):自动重试
之前写了几篇关于Spring Cloud Stream使用中的常见问题,比如: 如何处理消息重复消费? 如何消费自己生产的消息? 下面几天就集中来详细聊聊,当消息消费失败之后该如何处理的几种方式. 不 ...
- Spring Cloud Stream如何消费自己生产的消息?
在上一篇<Spring Cloud Stream如何处理消息重复消费?>中,我们通过消费组的配置解决了多实例部署情况下消息重复消费这一入门时的常见问题.本文将继续说说在另外一个被经常问到的 ...
最新文章
- 图像读取、转为灰度图像、均值平滑、显示保存操作
- [转] 关于Jmail
- 数据结构大总结系列之B树和R树
- 分享一个剪切板的小软件CopyQ
- C#中判断字符串相等的方法
- ps html css 工具,认识Photoshop(PS)CSS切图必用工具
- 前端 JavaScript 之『节流』的简单代码实现
- USB 2.0 Spec 微缩版
- 华为网络技术大赛模拟题目
- linux下安装杰奇2.4,实现关关采集器远程采集详细教程
- (转)Notepad++删除空白行
- Greenplum 调优--数据分布法则 - 分布列与分区的选择
- 内九外七皇城四,九门八点一口钟_ywyuan_新浪博客
- python语言是干什么的-python语言可以干什么
- 把你问到哑口无言,HR是专业的!
- python编程书在线阅读_Python编程完全入门教程
- PCB设计—AD20和立创EDA设计(1)创建项目
- Sharestation 工作站GPU虚拟化,实现共享GPU办公
- tkinter:Toplevel
- matlab多边形检测_Matlab图像处理学习笔记(四):多边形检测
热门文章
- Torch7框架学习资料整理
- java 拦截器 排除_java – Spring MVC Interceptor排除HTTP方法的路径
- Yii需要php版本,yii框架2.0.9版本发布了
- spark sql练习之join操作
- 20180914 文件和目录的权限以及属性
- windows系统清理磁盘临时文件,及缓冲文件,及离线文件和空闲文件
- 英国通讯服务商与采购“不公平待遇”的较量
- ajax基本概念,方法
- 一位软件工程师的7年总结
- Script:列出Oracle每小时的redo重做日志产生量