RocketMQ 发送消息的基本案例
RocketMQ 消息发送的基本样例
“一发、一存、一消费” 是消息中间件的本质,本文简单的记录来RocketMQ消息的基本样例,包含消息的发送(同步消息/异步消息/单向消息)、消息消费(负载均衡模式/广播模式)、顺序消息、延时消息、批量消息以及事务消息。
首先在测试项目中引入依赖
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.3.0</version>
</dependency>
一、基本样例
1、消息的发送
消息发送可以拆分为如下几个基本步骤:1. 创建消息生产者producer,并制定生产者组名2. 指定nameserver地址3. 启动producter4. 创建消息对象,指定topic、tag和消息体5. 发送消息6. 关闭生产者
a)发送同步消息
在重要的通知消息、SMS通知,SMS营销系统等广泛的场景中使用可靠的同步传输。
public class BaseProducer {public static void main(String[] args) throws Exception {//1.创建消息生产者producer,并制定生产者的组名DefaultMQProducer producer = new DefaultMQProducer("base-group");//2.指定NameServer地址producer.setNamesrvAddr("***.***.***.***:9876");//您MQ的地址,集群用英文逗号隔开//3.启动生产者producer.start();//每隔一秒发送1条消息,循环发送100条for (int i = 0; i < 100; i++) {String msgBody = "This is a msg to MQ" + i;System.out.println("发送消息:"+msgBody);//4.创建消息对象 指定主题、标签、消息体Message message = new Message("base-topic", "base-tag", msgBody.getBytes(RemotingHelper.DEFAULT_CHARSET));//5.发送顺序消息SendResult result = producer.send(message);//获取发送结果System.err.println("发送响应:MsgId:【" + result.getMsgId() + "】,发送状态:【" + result.getSendStatus()+"】");//线程休眠一秒再去发送下一条消息TimeUnit.SECONDS.sleep(1);}//6.关闭生产者producer.shutdown();}
}
启动执行main方法,控制台得到如下打印输出:
b)发送异步消息
异步传输通常用于对时间敏感的业务场景中,如果发送端不能容忍长时间的等待broker的响应的情况下可以选择发送异步消息。
与发送同步消息基本上一致,只有在第五步消息发送环节,通过提供的public void send(Message msg, SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException 方法来发送异步消息。在发送时创建一个SendCallback接口的匿名内部实现类,实现消息发送成功的回调方法onSuccess(SendResult sendResult),以及消息发送失败出现异常时的onException(Throwable throwable)方法。
public class AsyncProducer1 {public static void main(String[] args) throws Exception{//1.创建消息生产者producer,并制定生产者的组名DefaultMQProducer producer = new DefaultMQProducer("async-group");//2.指定NameServer地址producer.setNamesrvAddr("***.***.***.***:9876");//您MQ的地址,集群用英文逗号隔开//3.启动生产者producer.start();//循环发送100条消息for (int i = 0; i < 100; i++) {String msgBody = "This is an async msg to MQ" + i;System.out.println("发送消息:"+msgBody);//4.创建消息对象 指定主题、标签、消息体Message message = new Message("async-topic", "async-tag", msgBody.getBytes(RemotingHelper.DEFAULT_CHARSET));//5.发送异步消息producer.send(message, new SendCallback() {/*** 发送成功的回调函数* @param sendResult*/@Overridepublic void onSuccess(SendResult sendResult) {System.out.println("发送结果:"+sendResult);}/*** 发送异常的回调函数* @param throwable*/@Overridepublic void onException(Throwable throwable) {System.out.println("发送异常:"+throwable);}});}//6.关闭生产者producer.shutdown();}
}
启动执行main方法,控制台得到如下打印输出:
c)单向发送消息
单向消息一般用于不是特别关心发送结果的场景,例如发送日志。
发送单向消息,只是在基本单顺序消息发送案例的基础上使用public void sendOneway(Message msg) throws MQClientException, RemotingException, InterruptedException 方法对消息进行发送。
public class OnewayProducer1 {public static void main(String[] args) throws Exception{//1.创建消息生产者producer,并制定生产者的组名DefaultMQProducer producer = new DefaultMQProducer("one-way-group");//2.指定NameServer地址producer.setNamesrvAddr("***.***.***.***:9876");//您MQ的地址,集群用英文逗号隔开//3.启动生产者producer.start();//循环发送10条消息for (int i = 0; i < 10; i++) {String msgBody = "This is an one way msg to MQ" + i;System.out.println("发送消息:"+msgBody);//4.创建消息对象 指定主题、标签、消息体Message message = new Message("one-way-topic", "one-way-tag", msgBody.getBytes(RemotingHelper.DEFAULT_CHARSET));//5.发送单向消息producer.sendOneway(message);}//6.关闭生产者producer.shutdown();}
}
2、消费消息
消费消息可以分为如下几个步骤:1. 创建消费者consumer,制定消费者组名2. 指定nameserver地址3. 订阅主题topic和tag4. 设置回调函数,处理消息5. 启动消费者consumer
消费者DefaultMQPushConsumer提供了public void setMessageModel(MessageModel messageModel)方法来设置消费的模式。
MessageModel.BROADCASTING 广播模式
MessageModel.CLUSTERING 负载均衡模式(默认)
a)负载均衡模式(默认)
负载均衡模式指的是:一个消费者组内所有对消费者,共同承担消息的消费,每一个消费者消费的消息都是不同的。
public class BaseConsumer {public static void main(String[] args) throws Exception{//1.创建消费者DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("base-group");//2.指定nameserver地址consumer.setNamesrvAddr("***.***.***.***:9876");//您MQ的地址,集群用英文逗号隔开//3.订阅主题topic和tagconsumer.subscribe("base-topic", "*");//4.注册消息监听器consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {try {for (MessageExt messageExt : list) {System.err.println("消费消息: " + new String(messageExt.getBody()));}} catch (Exception e) {e.printStackTrace();return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再试}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消费成功}});//5.启动消费者consumer.start();}
}
c)广播模式
广播模式是指组内的所有消费者都会消费全量的消息,在在消费者启动前,设置消费模式为MessageModel.BROADCASTING即可。
public class BaseConsumer {public static void main(String[] args) throws Exception{//1.创建消费者DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("base-group");//2.指定nameserver地址consumer.setNamesrvAddr("***.***.***.***:9876");//您MQ的地址,集群用英文逗号隔开//3.订阅主题topic和tagconsumer.subscribe("base-topic", "*");//设定消费模式 【负载均衡模式(默认)/广播模式MessageModel.BROADCASTING】consumer.setMessageModel(MessageModel.BROADCASTING);//4.注册消息监听器consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list,ConsumeConcurrentlyContext consumeConcurrentlyContext) {try {for (MessageExt messageExt : list) {System.err.println("消费消息: " + new String(messageExt.getBody()));}} catch (Exception e) {e.printStackTrace();return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再试}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消费成功}});//5.启动消费者consumer.start();}
}
二、顺序消息
消息中间件在数据结构上是一种先进先出的数据结构,消息的生产者将消息发送到broker中,存储在队列中,但实际上在一个broker中队列的数量往往大于一个,例如下图即为控制台中展示的broker中拥有四个队列。
可以得出,当有消息发送到broker中到时候,在默认的情况下消息会采取Round Robin轮询方式把消息发送到不同的queue(分区队列)。而消费者来消费消息时,是从多个queue上拉取消息,这种情况发送和消费是不能保证顺序的,如何保证消息当顺序消费,是用户在使用时需要结合业务场景来考虑的问题之一。
果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上拉去消息,就保证了顺序。- 将某些需要保持顺序的局部消息放到broker的同一个队列中。- 用同一个线程保证其处理broker中的一个队列的消息。
订单模型类
public class OrderStep {private Long orderId;private String desc;public OrderStep(Long orderId, String desc) {this.orderId = orderId;this.desc = desc;}public OrderStep() {}@Overridepublic String toString() {return "OrderStep{" +"orderId=" + orderId +", desc='" + desc + '\'' +'}';}public static List<OrderStep> buildOrders(){List<OrderStep> orderlist = new ArrayList<>();Long[] ids = {1001l,1002l,1003l,1004l};for(Long id : ids){OrderStep orderStep = new OrderStep(id,"创建");orderlist.add(orderStep);OrderStep orderStep = new OrderStep(id,"付款");orderlist.add(orderStep);OrderStep orderStep = new OrderStep(id,"发货");orderlist.add(orderStep);OrderStep orderStep = new OrderStep(id,"关闭");orderlist.add(orderStep);}return orderlist;}
}
顺序消息的生产者
消息生产者通过MessageQueueSelector 分区队列选择器,通过订单ID以及自定义的规则,对消息进行分区队列对选择。
public class OrderProducer {public static void main(String[] args) throws Exception{//创建消息生产者producer,并制定生产者的组名DefaultMQProducer producer = new DefaultMQProducer("order-group");//指定NameServer地址producer.setNamesrvAddr("***.***.***.***:9876");//您MQ的地址,集群用英文逗号隔开//启动生产者producer.start();//构建消息集合List<OrderStep> orderSteps = OrderStep.buildOrders();//发送消息int i = 0 ;for(OrderStep orderStep : orderSteps){String body = JSON.toJSONString(orderStep);Message msg = new Message("OrderTopic","order",""+(i++),body.getBytes());/*** 参数一:消息对象* 参数二:消息队列的选择器* 参数三:选择队列的业务表识(订单ID)*/SendResult sendResult = producer.send(msg, new MessageQueueSelector() {/**** @param list 队列集合* @param message 消息对象* @param o 业务标示的参数* @return*/@Overridepublic MessageQueue select(List<MessageQueue> list, Message message, Object o) {Long orderId = (long) o;long index = orderId % list.size(); //订单ID除分区队列总数取模 System.out.println("队列总数【"+list.size()+"】,当前订单ID【"+orderId+"】选择的队列下标【"+index+"】");return list.get((int) index);}}, orderStep.getOrderId());System.out.println("发送结果:" + sendResult);}producer.shutdown();}
}
顺序消息的消费者
顺序消息的消费者 通过MessageListenerOrderly ,让一个队列的消息用一个线程去处理。
public class OrderConsumer {public static void main(String[] args) throws Exception{//1.创建消费者DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order-group");//2.指定nameserver地址consumer.setNamesrvAddr("***.***.***.***:9876");//您MQ的地址,集群用英文逗号隔开//3.订阅主题topic和tagconsumer.subscribe("OrderTopic", "*");//4.注册消息监听器 MessageListenerOrderly 一个队列的消息用一个线程去处理consumer.registerMessageListener(new MessageListenerOrderly() {/**** @param msgs 获取到的消息* @param consumeOrderlyContext* @return*/@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,ConsumeOrderlyContext consumeOrderlyContext) {for(MessageExt msg : msgs){String str = new String(msg.getBody());System.out.println("线程名称【"+Thread.currentThread().getName()+"】消费消息:" + str);}return ConsumeOrderlyStatus.SUCCESS;}});//5.启动消费者consumer.start();}
}
三、延时消息
当发送当消息不需要立即消费,需要设置一定当延迟时间时,可以悬着发送延迟消息,该类型当消息并不会立即被消费,会延迟一段设定好的时间后才会被消费者消费。
RocketMq并不支持任意时间的延迟,需要设置几个固定的延时等级,从1s到2h分别对应着等级1到18“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”
通过Message类中提供的 public void setDelayTimeLevel(int level)方法来设置延迟的级别。
延迟消息的生产者:
public class DelayProducer {public static void main(String[] args) throws Exception{//创建消息生产者producer,并制定生产者的组名DefaultMQProducer producer = new DefaultMQProducer("order-group");//指定NameServer地址producer.setNamesrvAddr("***.***.***.***:9876");//您MQ的地址,集群用英文逗号隔开//启动生产者producer.start();//构建消息集合List<OrderStep> orderSteps = OrderStep.buildOrders();//发送消息int i = 0 ;for(OrderStep orderStep : orderSteps){String body = JSON.toJSONString(orderStep);Message msg = new Message("DelayTopic","d1",""+(i++),body.getBytes());msg.setDelayTimeLevel(2); //设置延迟时间级别/*** 参数一:消息对象* 参数二:消息队列的选择器* 参数三:选择队列的业务表识(订单ID)*/SendResult sendResult = producer.send(msg);System.out.println("发送结果:" + sendResult);}producer.shutdown();}
}
四、批量消息
当消息数量很多当时候,可以通过批量发送当形式来提高消息的发送效率。批量发送消息能够显著提高传递小消息的性能。
限制:- 这批消息应该拥有相同的topic,相同分waitStoremsgOK,而且不能是延迟消息。- 批消息的总大小不应该超过4MB
批量消息的生产者:
public class BatchProducer {public static void main(String[] args) throws Exception{//1.创建消息生产者producer,并制定生产者的组名DefaultMQProducer producer = new DefaultMQProducer("batch-group");//2.指定NameServer地址producer.setNamesrvAddr("***.***.***.***:9876");//您MQ的地址,集群用英文逗号隔开//3.启动生产者producer.start();//4.构建一个消息集合List<Message> msgs = new ArrayList<>();for(int i = 1 ; i<=10 ; i++){Message message = new Message("batch-topic", "batch-tag", ("Hello World" + i ).getBytes());msgs.add(message);}//5.发送批量消息SendResult send = producer.send(msgs);System.out.println("批量发送结果:" + send);//6.关闭生产者producer.shutdown();}
}
五、事务消息
RocketMQ提供事务消息的发送,主要是为了解决本地事务执行与消息发送的原子性问题。即解决Producer执行业务逻辑成功之后投递消息可能失败的场景。
事务消息的状态
1.提交状态 :允许消费者消费此消息
2.回归状态 :消息将被删除,不允许被消费
3.中间状态 :代表需要检查消息队列来确定状态
事务消息发送及提交
1.发送消息(half消息)
2.服务端响应写入结果
3.根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)
4.根据本地事务状态执行commit或者rollback (commit操作生成消息索引,消息对消费者可见)
事务补偿
1.对没有commit/rolback对事务消息(pending状态对消息)从服务端发起一次回查
2.Producer收到回查消息,检查回查消息对应对的本地事务的状态
3.根据本地事务状态,重新commit汇总rollback,其中,补偿阶段用于解决消息commiy或者rollback发生超时或者失败的情况。
使用的限制
1.事务消息不支持延迟消息和批量消息
2.未来避免单个消息被检查太多次而导致半队列消息积累,可以设置默认单个消息的检查次数限制为15次,但是用户可以通过Broker配置文件的transactionCheckMax参数来修改此限制。如果已检查某条消息超过N次的话(N=transactionCheckMax)则Broker将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写AbstractTransactionCheckListener类来修改这个行为。
3.事务消息将在broker配置文件中的参数transactionMsgTimeout这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性CHECK_IMMUNITY_TIME_IN_SECONDS来改变这个限制,该参数优先于transactionMsgTimeout参数。
4.事务性消息可能不止一次被检查或消费。
5.提交给用户的目标主题消费可能会失败,目前这依据日志的记录而定,它的高可用性机制来保证,如果希望确保事务消费不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。
6.事务消息的生产者ID不能与其他类型消费的生产者ID共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者ID查询到消费者。
事务消息的生产者
public class Producer {public static void main(String[] args) throws Exception{//1.创建事务消息生产者producer,并制定生产者的组名TransactionMQProducer producer = new TransactionMQProducer("transaction-group");//2.指定NameServer地址producer.setNamesrvAddr("39.108.128.240:9876");//3.设置事务监听器producer.setTransactionListener(new MyTransactionListener());//4.启动生产者producer.start();//每隔一秒,循环发送5条消息for (int i = 0; i < 4; i++) {String body = "事务消息【"+i+"】";//创建消息对象 指定主题、标签、消息体 同步和异步是一致到Message message = new Message("transaction-topic", "T-"+i, body.getBytes());//同步发送消息SendResult sendResult = producer.sendMessageInTransaction(message,null);//打印消息发送的结果System.out.println("发送结果:"+sendResult);//线程休眠一秒再去发送下一条消息TimeUnit.SECONDS.sleep(1);}//关闭生产者 不建议在事务消息中关闭,可能回导致事务监听器不执行//producer.shutdown();}
}
自定义的事务监听器
public class MyTransactionListener implements TransactionListener {//在该方法中执行本地的一些事务@Overridepublic LocalTransactionState executeLocalTransaction(Message message, Object o) {if(message.getTags().equals("T-1")){System.out.println("tag【"+message.getTags()+"】-- 事务提交");return LocalTransactionState.COMMIT_MESSAGE;}else if(message.getTags().equals("T-2")){System.out.println("tag【"+message.getTags()+"】-- 事务回滚");return LocalTransactionState.ROLLBACK_MESSAGE;}else {System.out.println("tag【"+message.getTags()+"】-- 事务结果未知");return LocalTransactionState.UNKNOW;}}//MQ在没有接收到事务结果时,发送请求来二次核对 ,消息事务状态的回查@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt messageExt) {System.out.println("MQ开始事务状态的回查");if(messageExt.getTags().equals("T-3")){System.out.println("tag【"+messageExt.getTags()+"】-- 事务提交");return LocalTransactionState.COMMIT_MESSAGE;}else{System.out.println("tag【"+messageExt.getTags()+"】-- 仍然结果未知");return LocalTransactionState.UNKNOW;}}
}
事务消息的消费者
public class Consumer {public static void main(String[] args) throws Exception{//1.创建消费者DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("transaction-group");//2.指定nameserver地址consumer.setNamesrvAddr("***.***.***.***:9876");//您MQ的地址,集群用英文逗号隔开//3.订阅主题topic和tagconsumer.subscribe("transaction-topic", "*");//4.设置回调函数 处理消息consumer.registerMessageListener((MessageListenerConcurrently) (list, context) -> {try {for (MessageExt messageExt : list) {System.err.println("消费消息: " + new String(messageExt.getBody()));//输出消息内容}} catch (Exception e) {e.printStackTrace();return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再试}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消费成功});//5.启动消费者consumer.start();//6.打印启动状态System.out.println("消费者启动成功");}
}
六、消费消息的过滤
针对消费者, 消费者来决定对哪些消息进行消费,对哪些消息不消费。
提供两种消息过滤的方式
1.依据tag来过滤
2.依据RocketMQ定义了一些基本语法来支持过滤 (只有使用push模式对消费者才能使用SQL92标准对sql语句)
数值比较 例如: > , >= ,< , <= , BETWEEN , = ;
字符比较 例如:= ,<> ,IN;
IS NULL 或者 IS NOT NULL ;
逻辑符号 AND , OR ,NOT;
常量支持类型:
数值 例如:123,3.1415926
字符 例如:‘abc’ 必须用单引号包裹起来
特殊字符 例如:NULL
布尔值 TRUE 或 FALSE
sql模式过滤的消息生产者
通过Message对象提供的 public void putUserProperty(String name, String value) 方法,添加自定义sql查询的标示与内容。
public class Producer {public static void main(String[] args) throws Exception{//创建消息生产者producer,并制定生产者的组名DefaultMQProducer producer = new DefaultMQProducer("sql-filter-group");//指定NameServer地址producer.setNamesrvAddr("***.***.***.***:9876");//您MQ的地址,集群用英文逗号隔开//启动生产者producer.start();//每隔一秒,循环发送5条消息for (int i = 0; i < 5; i++) {String body = "通过tag过滤消息"+i;//创建消息对象 指定主题、标签、消息体 同步和异步是一致到Message message = new Message("sql-filter-topic", "sql-filter-tag", body.getBytes());//设置消息 用户自定义属性message.putUserProperty("i",String.valueOf(i));//同步发送消息SendResult sendResult = producer.send(message);//打印消息发送的结果System.out.println("发送结果:"+sendResult);//线程休眠一秒再去发送下一条消息TimeUnit.SECONDS.sleep(1);}}
}
sql模式过滤的消息消费者
public class Consumer {public static void main(String[] args) throws Exception{//1.创建消费者DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("sql-filter-group");//2.指定nameserver地址consumer.setNamesrvAddr("***.***.***.***:9876");//您MQ的地址,集群用英文逗号隔开//3.订阅主题topic和tag 依据消息sql过滤器来过滤消息consumer.subscribe("sql-filter-topic", MessageSelector.bySql("i>=0"));//4.设置回调函数 处理消息consumer.registerMessageListener((MessageListenerConcurrently) (list, context) -> {try {for (MessageExt messageExt : list) {System.err.println("消费消息:【 " + new String(messageExt.getBody())+"】");}} catch (Exception e) {e.printStackTrace();return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再试}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消费成功});//5.启动消费者consumer.start();//6.打印启动状态System.out.println("消费者启动成功");}
}
参考学习视频 黑马程序员《RocketMQ系统精讲,经受历年双十一狂欢节考验的分布式消息中间件》,图片来源于网络侵删~
RocketMQ 发送消息的基本案例相关推荐
- RocketMQ发送消息和消费消息
RocketMQ发送消息和消费消息 一.使用前配置 二.启动命令 三.pom.xml文件配置 四.编码 4.1 先定义一个消息保存的载体: 4.2 定义消息的发送者: 4.3 定义消息的消费者: 五 ...
- rocketmq发送消息失败
错误日志 org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: sendDefaultImpl call ti ...
- 测试发送消息和接受消息
测试RocketMQ 发送消息 # 1.设置环境变量 export NAMESRV_ADDR=localhost:9876 # 2.使用安装包的Demo发送消息 sh bin/tools.sh org ...
- RocketMQ同步消息、异步消息、单向消息详解
一.RocketMQ 支持 3 种消息发送方式 : 1.同步消息(sync message ) producer向 broker 发送消息,执行 API 时同步等待, 直到broker 服务器返回发送 ...
- 【Java从0到架构师】RocketMQ 使用 - 发送消息、接收消息、特殊消息
RocketMQ 消息中间件 入门案例 NameServer 地址 发送消息 同步发送消息 异步发送消息 一次性发送消息 生产者组.消息封装 接收消息 消费方式:推式消费.拉式消费 消息方式:集群模式 ...
- RocketMq - Springboot2.x整合RocketMQ4.x - 发送消息(七)
说明: 本例为单节点案例演示 并总结出常见错误和解决方案 step1: 引入依赖包 <dependency><groupId>org.apache.rocketmq</g ...
- 深入探究 RocketMQ 事务机制的实现流程,为什么它能做到发送消息零丢失?
本文转载自公众号:石杉的架构笔记 本文来自狸猫技术窝专栏<从零开始带你成为消息中间件实战高手>,是作者原子弹大侠开放的试读 1.解决消息丢失的第一个问题:订单系统推送消息领丢失 既然我们已 ...
- kafka之Producer同步与异步消息发送及事务幂等性案例应用实战
本套系列博客从真实商业环境抽取案例进行总结和分享,并给出Spark商业应用实战指导,请持续关注本套博客.版权声明:本套Spark商业应用实战归作者(秦凯新)所有,禁止转载,欢迎学习. 秦凯新的技术社区 ...
- rocketmq发送顺序消息(四)
rocketmq怎么发送消息可参考我的上一篇博客:rocketmq发送第一条消息.此处我们讲解如何发送rocketmq顺序消息 producer public class ProducerOrder ...
最新文章
- 【第11周复盘】小朋友们 100% 闯关成功!
- 启发式搜索求解TSP近似解
- Linux的企业-Redis数据库、缓存和哨兵Sentinal、Redis高可用
- log4j.logger java_java – Log4JLogger的根本原因是找不到还是不可用?
- boost::safe_numerics模块实现隐式转换更改数据值的测试程序
- 第三次学JAVA再学不好就吃翔(part97)--抛出异常
- 使用nodejs将某个简书用户的文章进行导出
- 【VBS】总结 Visual Basic 的分支结构和循环结构
- LINUX使用sed修改文件,如果包含变量,需要使用双引号
- 谈谈作为DBA我对MySQL数据库优化的理解
- 【grasshopper自定义电池开发】使用Visual Studio 2022借助官方扩展插件开发一个贪吃蛇电池
- python详解绘制风玫瑰图
- Acer宏基笔记本FN快捷键大全
- James Gosling畅言Java技术未来十年发展
- 概率论大作业C语言验证正态分布的数学期望和方差
- 2020第一届无线大数据竞赛——华为赛道:无线网络智能定位 第2名方案
- C 碎片八 结构体amp;枚举amp;联合
- 全志F1C600芯片处理器介绍
- NodeJS 实现多语言
- java中的集合详解