目录

一、rocketmq的使用

1、开启rocketmq

2、添加依赖

3、springboot整合disruptor

4、集成SpringCloudStreamRocketmq


一、rocketmq的使用

1、开启rocketmq

2、添加依赖

<!--        disruptor 高速队列 3.4.2--><dependency><groupId>com.lmax</groupId><artifactId>disruptor</artifactId></dependency><!--        cpu亲和锁 3.1.7--><dependency><groupId>net.openhft</groupId><artifactId>affinity</artifactId><version>${affinity.version}</version></dependency><!--        spring-cloud-stream-rocketmq--><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-stream-binder-rocketmq</artifactId></dependency>

3、springboot整合disruptor

package com.dragonwu.disruptor;import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;@Data
@ConfigurationProperties(prefix = "spring.disruptor")
public class DisruptorProperties {/*** 缓冲区的大小*/private Integer ringBufferSize = 1024 * 1024 ;/*** 是否支持多生产者*/private boolean isMultiProducer = false ;
}

异常处理:

package com.dragonwu.disruptor;import com.lmax.disruptor.BatchEventProcessor;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.ExceptionHandler;
import com.lmax.disruptor.LifecycleAware;
import lombok.extern.slf4j.Slf4j;/*** DisruptorHandlerException 的异常处理*/
@Slf4j
public class DisruptorHandlerException implements ExceptionHandler {/*** <p>Strategy for handling uncaught exceptions when processing an event.</p>** <p>If the strategy wishes to terminate further processing by the {@link BatchEventProcessor}* then it should throw a {@link RuntimeException}.</p>** @param ex       the exception that propagated from the {@link EventHandler}.* @param sequence of the event which cause the exception.* @param event    being processed when the exception occurred.  This can be null.*/@Overridepublic void handleEventException(Throwable ex, long sequence, Object event) {log.info("handleEventException Exception===>{} , sequence==> {} ,event===>{}",ex.getMessage(),sequence,event);}/*** Callback to notify of an exception during {@link LifecycleAware#onStart()}** @param ex throw during the starting process.*/@Overridepublic void handleOnStartException(Throwable ex) {log.info("OnStartHandler Exception===>{} ",ex.getMessage());}/*** Callback to notify of an exception during {@link LifecycleAware#onShutdown()}** @param ex throw during the shutdown process.*/@Overridepublic void handleOnShutdownException(Throwable ex) {log.info("OnShutdownHandler Exception===>{} ",ex.getMessage());}
}

处理器:

package com.dragonwu.disruptor;import com.lmax.disruptor.EventTranslatorOneArg;
import com.lmax.disruptor.RingBuffer;
import com.dragonwu.model.Order;/*** 在boot里面使用它发送消息*/
public class DisruptorTemplate {private static final EventTranslatorOneArg<OrderEvent, Order> TRANSLATOR = new EventTranslatorOneArg<OrderEvent, Order>() {public void translateTo(OrderEvent event, long sequence, Order input) {event.setSource(input);}};private final RingBuffer<OrderEvent> ringBuffer;public DisruptorTemplate(RingBuffer<OrderEvent> ringBuffer) {this.ringBuffer = ringBuffer;}/*** 我们使用DisruptorTemplate 时,就使用它的onData方法* @param input*/public void onData(Order input) {ringBuffer.publishEvent(TRANSLATOR, input);}
}

自动配置装配:

package com.dragonwu.disruptor;import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import net.openhft.affinity.AffinityThreadFactory;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.concurrent.ThreadFactory;@Configuration
@EnableConfigurationProperties(value = DisruptorProperties.class)
public class DisruptorAutoConfiguration {public DisruptorProperties disruptorProperties;public DisruptorAutoConfiguration(DisruptorProperties disruptorProperties) {this.disruptorProperties = disruptorProperties;}@Beanpublic EventFactory<OrderEvent> eventEventFactory() {EventFactory<OrderEvent> orderEventEventFactory = new EventFactory<OrderEvent>() {@Overridepublic OrderEvent newInstance() {return new OrderEvent();}};return orderEventEventFactory;}@Beanpublic ThreadFactory threadFactory() {return new AffinityThreadFactory("Match-Handler:") ;}/*** 无锁高效的等待策略** @return*/@Beanpublic WaitStrategy waitStrategy() {return new YieldingWaitStrategy();}/*** 创建一个RingBuffer* eventFactory: 事件工厂* threadFactory:   我们执行者(消费者)的线程该怎么创建* waitStrategy : 等待策略: 当我们ringBuffer 没有数据时,我们怎么等待*/@Beanpublic RingBuffer<OrderEvent> ringBuffer(EventFactory<OrderEvent> eventFactory,ThreadFactory threadFactory,WaitStrategy waitStrategy,EventHandler<OrderEvent>[] eventHandlers) {/*** 构建disruptor*/Disruptor<OrderEvent> disruptor = null;ProducerType producerType = ProducerType.SINGLE;if (disruptorProperties.isMultiProducer()) {producerType = ProducerType.MULTI;}disruptor = new Disruptor<OrderEvent>(eventFactory, disruptorProperties.getRingBufferSize(), threadFactory, producerType, waitStrategy);disruptor.setDefaultExceptionHandler(new DisruptorHandlerException());// 设置消费者---我们的每个消费者代表我们的一个交易对,有多少个交易对,我们就有多少个eventHandlers ,事件来了后,多个eventHandlers 是并发执行的disruptor.handleEventsWith(eventHandlers);RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();disruptor.start();// 开始监听final Disruptor<OrderEvent> disruptorShutdown = disruptor;// 使用优雅的停机Runtime.getRuntime().addShutdownHook(new Thread(() -> {disruptorShutdown.shutdown();}, "DisruptorShutdownThread"));return ringBuffer;}/*** 创建DisruptorTemplate** @param ringBuffer* @return*/@Beanpublic DisruptorTemplate disruptorTemplate(RingBuffer<OrderEvent> ringBuffer) {return new DisruptorTemplate(ringBuffer);}
}

订单类:

package com.dragonwu.disruptor;import lombok.Data;import java.io.Serializable;@Data
public class OrderEvent implements Serializable {/*** 时间戳*/private final long timestamp;/*** 事件携带的数据*/protected transient Object source;public OrderEvent() {this.timestamp = System.currentTimeMillis();}public OrderEvent(Object source) {this.timestamp = System.currentTimeMillis();this.source = source ;}
}

事件处理器:

package com.dragonwu.disruptor;import com.lmax.disruptor.EventHandler;
import com.dragonwu.match.MatchServiceFactory;
import com.dragonwu.match.MatchStrategy;
import com.dragonwu.model.Order;
import com.dragonwu.model.OrderBooks;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;/*** 该对象 有多个: 和Symbol的数据对应* 针对某一个OrderEventHandler ,只会同一时间有一个线程来执行它*/
@Data
@Slf4j
public class OrderEventHandler implements EventHandler<OrderEvent> {private OrderBooks orderBooks;private String symbol ;public OrderEventHandler(OrderBooks orderBooks) {this.orderBooks = orderBooks;this.symbol =  this.orderBooks.getSymbol() ;}/*** 接收到了某个消息** @param event* @param sequence* @param endOfBatch* @throws Exception*/@Overridepublic void onEvent(OrderEvent event, long sequence, boolean endOfBatch) throws Exception {// 从ringbuffer 里面接收了某个数据Order order = (Order)event.getSource();if(!order.getSymbol().equals(symbol)){ // 我们接收到了一个不属于我们处理的数据,我们不处理return;}//        log.info("开始接收订单事件============>{}", event);MatchServiceFactory.getMatchService(MatchStrategy.LIMIT_PRICE).match(orderBooks ,order);/// 处理逻辑是啥?
//        log.info("处理完成我们的订单事件===================>{}", event);}
}

4、集成SpringCloudStreamRocketmq

配置:

stream:bindings:order_in: {destination: order_in, content-type: application/plain, group: order-group, consumer.maxAttempts: 1}trade_plate_out: {destination: trade_plate_out, content-type: application/plain}completed_orders_out: {destination: completed_orders_out, content-type: application/plain}exchange_trades_out: {destination: exchange_trades_out, content-type: application/plain}cancel_order_out: {destination: cancel_order_out, content-type: application/plain}rocketmq:binder:name-server: rocket-server:9876

1、添加数据收发接口

package com.dragonwu.rocket;import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.MessageChannel;public interface Sink {@Input("order_in")public MessageChannel messageChannel() ;
}
package com.dragonwu.rocket;import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;public interface Source {/*** 盘口数据的输出* @return*/@Output("trade_plate_out")MessageChannel plateOut() ;/*** 完成订单数据的输出* @return*/@Output("completed_orders_out")MessageChannel completedOrdersOut() ;/*** 交易记录的输入* @return*/@Output("exchange_trades_out")MessageChannel exchangeTradesOut() ;/*** 取消单的输出* @return*/@Output("cancel_order_out")MessageChannel cancelOrderOut() ;
}

消息消费者:

package com.dragonwu.rocket;import com.dragonwu.disruptor.DisruptorTemplate;
import com.dragonwu.domain.EntrustOrder;
import com.dragonwu.model.Order;
import com.dragonwu.util.BeanUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.stereotype.Service;@Service
@Slf4j
public class MessageConsumerListener {@Autowiredprivate DisruptorTemplate disruptorTemplate;@StreamListener("order_in")public void handleMessage(EntrustOrder entrustOrder) {Order order = null;if (entrustOrder.getStatus() == 2) { // 该单需要取消order = new Order();order.setOrderId(entrustOrder.getId().toString());order.setCancelOrder(true);} else {order = BeanUtils.entrustOrder2Order(entrustOrder);}log.info("接收到了委托单:{}", order);disruptorTemplate.onData(order);}
}

开启stream的开发:

package com.dragonwu.rocket;import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.context.annotation.Configuration;/*** 开启我们的Stream的开发*/
@Configuration
@EnableBinding(value = {Sink.class,Source.class}) //
public class RocketStreamConfig {
}

交易微服务里的数据发送:

spring:cloud:  stream:rocketmq:binder:name-server: rocket-server:9876  #/RocketMQ Message hasn't been sentbinders:order_out: {consumer.orderly: true}bindings:order_out: {destination: order_in, content-type: application/plain}cancel_order_in: {destination: cancel_order_out, content-type: application/plain, group: order-group, consumer.maxAttempts: 1}exchange_trade_in: {destination: exchange_trades_out, content-type: application/plain, group: order-group, consumer.maxAttempts: 1}

1、收发接口

package com.dragonwu.config.rocket;import com.dragonwu.enums.MessageChannel;
import org.springframework.cloud.stream.annotation.Input;/*** 数据的接收*/
public interface Sink {/*** 交易数据的输入* @return*/@Input("exchange_trade_in")MessageChannel exchangeTradeIn() ;/*** 取消订单的输入* @return*/@Input("cancel_order_in")MessageChannel cancelOrderIn() ;
}
package com.dragonwu.config.rocket;import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;public interface Source {@Output("order_out")MessageChannel outputMessage() ;
}

配置类:

package com.dragonwu.config.rocket;import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.context.annotation.Configuration;@Configuration
@EnableBinding(value = Source.class)
public class RocketMQConfig {
}

服务消费者:

package com.dragonwu.config.rocket;import com.dragonwu.domain.ExchangeTrade;
import com.dragonwu.service.EntrustOrderService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;import java.util.List;/**** 交易数据的监听*/
@Component
@Slf4j
public class ExchangeTradeListener {@Autowiredprivate EntrustOrderService entrustOrderService ;@Transactional@StreamListener("exchange_trade_in")public void receiveExchangeTrade(List<ExchangeTrade> exchangeTrades){if (CollectionUtils.isEmpty(exchangeTrades)){return;}for (ExchangeTrade exchangeTrade : exchangeTrades) {if (exchangeTrade != null) {//  交易完成后,去更新我们的数据库entrustOrderService.doMatch(exchangeTrade);}}}@Transactional@StreamListener("cancel_order_in")public void receiveCancelOrder(String orderId){entrustOrderService.cancleEntrustOrderToDb(orderId);}
}

微服务 撮合引擎 撮合微服务搭建 案例 4相关推荐

  1. php+撮合引擎,撮合引擎开发:数据结构设计

    交易委托账本 交易委托账本(OrderBook)是整个撮合引擎里最核心也是最复杂的数据结构,每个交易对都需要维护一份交易委托账本,账本里保存着指定交易对所有待撮合的委托单.每份账本都有两个队列,一个卖 ...

  2. 7个开源交易撮合引擎

    如果你希望按照自己的需求打造金融交易平台,那么应当选择合适的交易撮合 引擎进行二次开发而不是基于完整的交易平台实现进行修改.本文将介绍 10个采用不同语言开发的开源的撮合引擎,你可以根据自己的需要选择 ...

  3. 微服务架构:动态配置中心搭建

    版权声明:本文为博主原创文章,转载请注明出处,欢迎交流学习! 在微服务架构中,服务之间有着错综复杂的依赖关系,每个服务都有自己的依赖配置,在运行期间很多配置会根据访问流量等因素进行调整,传统的配置信息 ...

  4. SpringCloud系列二:Restful 基础架构(搭建项目环境、创建 Dept 微服务、客户端调用微服务)...

    声明:本文来源于MLDN培训视频的课堂笔记,写在这里只是为了方便查阅. 1.概念:Restful 基础架构 2.具体内容 对于 Rest 基础架构实现处理是 SpringCloud 核心所在,其基本操 ...

  5. SpringCloud 搭建项目环境、创建 Dept 微服务、客户端调用微服务

    对于 Rest 基础架构实现处理是 SpringCloud 核心所在,其基本操作形式在 SpringBoot 之中已经有了明确 的讲解,那么本次为 了清晰可见,创建一套新的微服务架构:部门微服务(De ...

  6. 合沟微服务怎么添加_微服务架构:动态配置中心搭建

    版权声明:本文为博主原创文章,转载请注明出处,欢迎交流学习! 在微服务架构中,服务之间有着错综复杂的依赖关系,每个服务都有自己的依赖配置,在运行期间很多配置会根据访问流量等因素进行调整,传统的配置信息 ...

  7. 最新微服务框架SpringCloud Alibaba介绍,搭建

    微服务和SpringCloud Alibaba详细介绍(一),手把手搭建微服务框架 PS:本博客是本人参照B站博主:JAVA阿伟如是说 的视频讲解手敲整理的笔记 跟着一起手动搭建的框架 供大家一起学习 ...

  8. 微服务学习之网关(Gateway)的搭建及使用

    微服务系列 1.Nacus 服务搭建及使用 2.Nacos 配置中心 3.Nacos 服务注册与发现之OpenFeign服务间调用 4.Spring Security & Oauth2 认证授 ...

  9. 微服务中网关的作用及搭建

    作用 (1)统一入口 ​ 为全部微服务提供唯一入口点,网关起到内部和外部隔离,保障了后台服务的安全性. (2)鉴权校验 识别每个请求的 权限,拒绝不符合要求的请求. (3)动态路由 动态的将请求 路由 ...

最新文章

  1. go gin gorm获取harbor项目,镜像,tag代码示例
  2. shell读取文件到变量、管道重定向、if和while嵌套使用、命令替换
  3. 神经网络(第五章补充)
  4. 位置子段最大子段和 hdu 1003 max sum ACM的开始
  5. 如何给一个响应式数据添加一个属性 this.$set
  6. 深度学习之RNN循环神经网络(理论+图解+Python代码部分)
  7. 镜像miracast投屏软件_miracast投屏下载
  8. 失败的过去式英文翻译_过去式用英语怎么说
  9. python自然语言学习之处理原始文本中3.1访问2554文本《罪与罚》出现问题解决
  10. HijackThis使用详解
  11. 神州信息与瀚华金控签署战略协议 共推数字普惠金融
  12. 视觉享受,兼顾人文观感和几何特征的字体「GitHub 热点速览 v.22.46」
  13. 计算机考试成绩有疑惑,计算机考研疑惑 真的好难受
  14. 调试基恩士KV-H20S定位模块记录
  15. BeautifulSoup中find和find_all的使用
  16. 服务器 'server_1' 上的 MSDTC 不可用。
  17. C语言骚操作之没有加法运算符
  18. 拖延的原因、误区、及建议
  19. C语言-密码2,输入一行电报文字,将字母变成其下一字母(如’a’变成’b’……’z’变成’a’其它字符不变)。
  20. 邻居好说话——冒泡排序

热门文章

  1. 利用麦咖啡(McAfee)打造超安全的Web站点目录
  2. 期刊分类—CSSCI、A类、B类、C类、核心期刊的区别
  3. iOS tableview左滑编辑,长按拖动排序
  4. Redis | 安装Redis和启动Redis服务
  5. 服务机器人关键技术分析
  6. win7设置计算机临时用户,为什么Win7用域账号登录以后总显示为临时配置账户? 穿墙书店...
  7. 【ML】混淆矩阵(Accuracy,Precision,Recall,F1)
  8. 黑马头条丨腾讯薪酬制度改革引争议;英特尔全国扩招女工程师;黑马100%就业真的吗......
  9. 网页版JS游戏五子棋单机版也可以改版成网络版
  10. C语言:access函数的使用