mysql消息订阅与发布_消息发布与订阅
代码示例
消息发布者 (即publish client)
package message.kvstore.aliyun.com;
import redis.clients.jedis.Jedis;
public class KVStorePubClient {
private Jedis jedis;
public KVStorePubClient(String host,int port, String password){
jedis = new Jedis(host,port);
//KVStore的实例密码
String authString = jedis.auth(password);
if (!authString.equals("OK"))
{
System.err.println("AUTH Failed: " + authString);
return;
}
}
public void pub(String channel,String message){
System.out.println(" >>> 发布(PUBLISH) > Channel:"+channel+" > 发送出的Message:"+message);
jedis.publish(channel, message);
}
public void close(String channel){
System.out.println(" >>> 发布(PUBLISH)结束 > Channel:"+channel+" > Message:quit");
//消息发布者结束发送,即发送一个“quit”消息;
jedis.publish(channel, "quit");
}
}
消息订阅者 (即subscribe client)
package message.kvstore.aliyun.com;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;
public class KVStoreSubClient extends Thread{
private Jedis jedis;
private String channel;
private JedisPubSub listener;
public KVStoreSubClient(String host,int port, String password){
jedis = new Jedis(host,port);
//ApsaraDB for Redis的实例密码
String authString = jedis.auth(password);//password
if (!authString.equals("OK"))
{
System.err.println("AUTH Failed: " + authString);
return;
}
}
public void setChannelAndListener(JedisPubSub listener,String channel){
this.listener=listener;
this.channel=channel;
}
private void subscribe(){
if(listener==null || channel==null){
System.err.println("Error:SubClient> listener or channel is null");
}
System.out.println(" >>> 订阅(SUBSCRIBE) > Channel:"+channel);
System.out.println();
//接收者在侦听订阅的消息时,将会阻塞进程,直至接收到quit消息(被动方式),或主动取消订阅
jedis.subscribe(listener, channel);
}
public void unsubscribe(String channel){
System.out.println(" >>> 取消订阅(UNSUBSCRIBE) > Channel:"+channel);
System.out.println();
listener.unsubscribe(channel);
}
@Override
public void run() {
try{
System.out.println();
System.out.println("----------订阅消息SUBSCRIBE 开始-------");
subscribe();
System.out.println("----------订阅消息SUBSCRIBE 结束-------");
System.out.println();
}catch(Exception e){
e.printStackTrace();
}
}
}
消息监听者
package message.kvstore.aliyun.com;
import redis.clients.jedis.JedisPubSub;
public class KVStoreMessageListener extends JedisPubSub{
@Override
public void onMessage(String channel, String message) {
System.out.println(" <<< 订阅(SUBSCRIBE)< Channel:" + channel + " >接收到的Message:" + message );
System.out.println();
//当接收到的message为quit时,取消订阅(被动方式)
if(message.equalsIgnoreCase("quit")){
this.unsubscribe(channel);
}
}
@Override
public void onPMessage(String pattern, String channel, String message) {
// TODO Auto-generated method stub
}
@Override
public void onSubscribe(String channel, int subscribedChannels) {
// TODO Auto-generated method stub
}
@Override
public void onUnsubscribe(String channel, int subscribedChannels) {
// TODO Auto-generated method stub
}
@Override
public void onPUnsubscribe(String pattern, int subscribedChannels) {
// TODO Auto-generated method stub
}
@Override
public void onPSubscribe(String pattern, int subscribedChannels) {
// TODO Auto-generated method stub
}
}
示例主程序
package message.kvstore.aliyun.com;
import java.util.UUID;
import redis.clients.jedis.JedisPubSub;
public class KVStorePubSubTest {
//ApsaraDB for Redis的连接信息,从控制台可以获得
static final String host = "xxxxxxxxxx.m.cnhza.kvstore.aliyuncs.com";
static final int port = 6379;
static final String password="password";//password
public static void main(String[] args) throws Exception{
KVStorePubClient pubClient = new KVStorePubClient(host, port,password);
final String channel = "KVStore频道-A";
//消息发送者开始发消息,此时还无人订阅,所以此消息不会被接收
pubClient.pub(channel, "Aliyun消息1:(此时还无人订阅,所以此消息不会被接收)");
//消息接收者
KVStoreSubClient subClient = new KVStoreSubClient(host, port,password);
JedisPubSub listener = new KVStoreMessageListener();
subClient.setChannelAndListener(listener, channel);
//消息接收者开始订阅
subClient.start();
//消息发送者继续发消息
for (int i = 0; i < 5; i++) {
String message=UUID.randomUUID().toString();
pubClient.pub(channel, message);
Thread.sleep(1000);
}
//消息接收者主动取消订阅
subClient.unsubscribe(channel);
Thread.sleep(1000);
pubClient.pub(channel, "Aliyun消息2:(此时订阅取消,所以此消息不会被接收)");
//消息发布者结束发送,即发送一个“quit”消息;
//此时如果有其他的消息接收者,那么在listener.onMessage()中接收到“quit”时,将执行“unsubscribe”操作。
pubClient.close(channel);
}
}
mysql消息订阅与发布_消息发布与订阅相关推荐
- python杂志订阅系统详细设计_从发布-订阅模式谈谈 Flask 的 Signals
发布-订阅模式 发布-订阅模式,顾名思义,就像大家订报纸一样,出版社发布不同类型的报纸杂志不同的读者根据不同的需求预定符合自己口味的的报纸杂志,付费之后由邮局安排人员统一派送. 上面一段话,提到了发布 ...
- 微信开发中消息回复的代码_消息中的消息
微信开发中消息回复的代码 Ste·ga·no·graph·y / stegəˈnägrəfi / (noun): the practice of concealing messages or info ...
- java 灰度发布_灰度发布的原理及实现
灰度发布(又名金丝雀发布)是指在黑与白之间,能够平滑过渡的一种发布方式.在其上可以进行 A/B testing,即让一部分用户继续用产品特性 A,一部分用户开始用产品特性 B,如果用户对 B 没有什么 ...
- python消息队列框架持久化_消息队列如果持久化到数据库的话,相对于直接操作数据库有啥优势?...
MQ的作用很多,典型作用: 1.削峰填谷:如果短时间内要处理的业务量大于数据库的服务能力,则可能会卡死数据库:使用MQ可以慢慢处理. 2.异步化:如果处理的工作非常耗时,则RPC的请求一直halt,对 ...
- 如何把采集到的数据存入mysql_数据采集教程_数据发布_如何发布到数据库MySQL_后羿采集器...
如果大家在发布到数据库时遇到一些问题,请参考这个教程进行问题排查:发布到数据库常见问题 作为一款真免费的数据采集软件,我们免费提供多种导出方式,如果小伙伴们需要将采集到的数据发布到数据库MySQL,可 ...
- python写的网站如何发布_如何发布python程序
如何发布一个Python程序: 1.安装一个pyInstaller 在pycharm里点 file-–>setting-–>Project workspace-->Interpret ...
- redis 发布订阅实际案例_【赵强老师】Redis的消息发布与订阅
欢迎关注赵强老师微信公众号:myitshare Redis 作为一个publish/subscribe server,起到了消息路由的功能.订阅者可以通过subscribe和psubscribe命令向 ...
- kafka内存不断增加_分布式发布订阅消息系统Kafka 为什么快
Kafka 为什么能那么快 | Kafka高效读写数据的原因 无论 kafka 作为 MQ 也好,作为存储层也罢,无非就是两个功能(好简单的样子),一是 Producer 生产的数据存到 broker ...
- 为什么rocketmq的queue分为读写?_分布式发布订阅消息系统Kafka 为什么快
Kafka 为什么能那么快 | Kafka高效读写数据的原因 无论 kafka 作为 MQ 也好,作为存储层也罢,无非就是两个功能(好简单的样子),一是 Producer 生产的数据存到 broker ...
最新文章
- JVM 参数及各部分含义(转)
- java第三方接口对接_调用多个第三方接口哪一种方案更好?
- AD 脚本kixtart运用之六(outlook邮件批量生成签名)
- Hibernate 中的DetachedCriteria。
- WIFI配网方式(AP模式、Smartconfig等模式)
- 修复版ZFAKA发卡系统源码 自适应PC+手机端
- SWF反编神器Action Script Viewer终身免费升级!
- WebApi生成接口文档
- [离线]ps4共享屏幕到笔记本
- 对称矩阵的特征值与特征向量
- 函数调用中的堆栈平衡
- WhatsApp网页版扫码分析
- 电子负载使用恒电流和恒电阻负载模式的方法
- 圣诞节,教你用Python给微信头像添加一个圣诞帽
- 如何提升营销工作ROI?跨渠道营销了解一下
- 感谢米老师,感谢提高班,做个骄傲的自己
- 小米全系列手机 刷机总贴
- Linux ls 命令学习和简单使用
- 以太坊实践经验之《eth.blockNumber结果为0》
- VR插件:VR Interaction Framework 1.7(VRIF)(一)