前言:

本文基于jedis 2.9.0.jar、commons-pool2-2.4.2.jar以及json-20160810.jar

其中jedis连接池需要依赖commons-pool2包,json包用于对象实例和json字符串的相互转换

1、jedis的消息队列方法简述

1.1、发布消息方法

(其中,channel是对应消息通道,message是对应消息体)

jedis.publish(channel, message);

1.2、监听消息方法

(其中,jedisPubSub用于处理监听到的消息,channels是对应的通道)

jedis.subscribe(jedisPubSub, channels);

2、发布消息

/*** 从jedis连接池获取jedis操作实例* @return*/public static Jedis getJedis() {return RedisPoolManager.getJedis();}/*** 推入消息到redis消息通道* * @param String*            channel* @param String*            message*/public static void publish(String channel, String message) {Jedis jedis = null;try {jedis = getJedis();jedis.publish(channel, message);} finally {jedis.close();}}/*** 推入消息到redis消息通道* * @param byte[]*            channel* @param byte[]*            message*/public void publish(byte[] channel, byte[] message) {Jedis jedis = null;try {jedis = getJedis();jedis.publish(channel, message);} finally {jedis.close();}}

3、监听消息

3.1、监听消息主体方法

/*** 监听消息通道* @param jedisPubSub - 监听任务* @param channels - 要监听的消息通道*/public static void subscribe(BinaryJedisPubSub jedisPubSub, byte[]... channels) {Jedis jedis = null;try {jedis = getJedis();jedis.subscribe(jedisPubSub, channels);} finally {jedis.close();}}/*** 监听消息通道* @param jedisPubSub - 监听任务* @param channels - 要监听的消息通道*/public static void subscribe(JedisPubSub jedisPubSub, String... channels) {Jedis jedis = null;try {jedis = getJedis();jedis.subscribe(jedisPubSub, channels);} finally {jedis.close();}}

3.2、处理监听到的消息任务

class Tasker implements Runnable {private String[] channel = null;//监听的消息通道private JedisPubSub jedisPubSub = null;//消息处理任务public Tasker(JedisPubSub jedisPubSub, String ...channel) {this.jedisPubSub = jedisPubSub;this.channel = channel;}@Overridepublic void run() {// 监听channel通道的消息RedisMQ.subscribe(jedisPubSub, channel);}}

3.3、处理监听到的消息主体类实现

package cn.eguid.livePushServer.redisManager;import java.util.Map;import org.json.JSONObject;import cc.eguid.livepush.PushManager;
import redis.clients.jedis.JedisPubSub;public class RedisMQHandler extends JedisPubSub{PushManager pushManager = null;public RedisMQHandler(PushManager pushManager) {super();this.pushManager = pushManager;}@Override// 接收到消息后进行分发执行public void onMessage(String channel, String message) {JSONObject jsonObj = new JSONObject(message);System.out.println(channel+","+message);if ("push".equals(channel)) {Map<String,Object> map=jsonObj.toMap();System.out.println("接收到一条推流消息,准备推流:"+map);
//          String appName=pushManager.push(map);//推流完成后还需要发布一个成功消息到返回队列} else if ("close".equals(channel)) {String appName=jsonObj.getString("appName");System.out.println("接收到一条关闭消息,准备关闭应用:"+appName);
//          pushManager.closePush(appName);}}
}

4、测试消息队列发布和监听

public static void main(String[] args) throws InterruptedException {PushManager pushManager= new PushManagerImpl();Thread t1 = new Thread(new Tasker(new RedisMQHandler (pushManager), "push"));Thread t2 = new Thread(new Tasker(new RedisMQHandler (pushManager), "close"));t1.start();t2.start();LivePushEntity livePushInfo=new LivePushEntity();livePushInfo.setAppName("test1");JSONObject json=new JSONObject(livePushInfo);publish("push",json.toString());publish("close", json.toString());Thread.sleep(2000);publish("push", json.toString());publish("close",json.toString());Thread.sleep(2000);publish("push", json.toString());publish("close",json.toString());}

转载于:https://www.cnblogs.com/eguid/p/6821593.html

使用jedis实现Redis消息队列(MQ)的发布(publish)和消息监听(subscribe)相关推荐

  1. Listener method could not be invoked with the incoming message消息队列RabbitMQ项目启动报错及监听队列报错

    Listener method could not be invoked with the incoming message 报错如图: 说是不能调用监听器的方法,问题原因是Channel依赖导错 应 ...

  2. OneNET物联网平台06 消息队列MQ服务开启与配置

    消息队列MQ可作为规则引擎对接的扩展增值服务使用,配合物联网套件,可形成具备设备接入.设备管理.消息分发.应用承载能力的高性能服务组合 消息队列MQ具有如下特点: 消息缓存 MQ服务支持消息缓存,可以 ...

  3. 消息中间件系列(四):消息队列MQ的特点、选型、及应用场景详解

    前面集中谈了分布式缓存Redis系列: 高并发架构系列:分布式锁的由来.特点.及Redis分布式锁的实现详解 高并发架构系列:Redis并发竞争key的解决方案详解 高并发架构系列:Redis缓存和M ...

  4. 面试官:消息队列 MQ/ JMS/ Kafka 有什么区别?

    更多架构干货请关注公众号"架构之路".是不是平常听到说消息队列啊,JMS啊,MQ啊 .kafka啊巴啦啦的一堆术语,听不懂?关系混乱?今天就让我们来一起来看看他们都是什么吧. 1消 ...

  5. 梳理消息队列 MQ/JMS/Kafka

    是不是平常听到说消息队列啊,JMS啊,MQ啊 .kafka啊巴啦啦的一堆术语,听不懂?关系混乱?今天就让我们来一起来看看他们都是什么吧. 1消息队列介绍 首先举个收快递的栗子,传统的收快递,快递小哥把 ...

  6. SpringCloud 微服务 (十) 消息队列MQ 基础

    2019独角兽企业重金招聘Python工程师标准>>> 壹 之前学习了SpringCloud Bus结合MQ,没有多学习MQ,本次学习相关内容,先了解异步,同步就不说了 异步: 客户 ...

  7. 消息队列MQ技术介绍

    一. 消息队列概述 消息队列中间件是分布式系统中重要的组件,主要解决应用耦合.异步消息.流量削锋等问题.实现高性能.高可用.可伸缩和最终一致性架构.是大型分布式系统不可缺少的中间件. 目前在生产环境, ...

  8. 消息队列MQ/JMS/Kafka,你都了解多少?

    今日推荐isEmpty 和 isBlank 的用法区别,你都知道吗?SpringBoot项目优化和Jvm调优(楼主亲测,真实有效)国内大神成功给手机装上了 Win11,代码已开源!Fluent Myb ...

  9. java队列_RPC远程调用和消息队列MQ的区别

    RPC和MQ同样都是用于分布式系统的两个很重要的技术,都有服务提供者.消费者的概念,可在一定程度上对系统进行解耦.但两者之间还是有区别的,本篇简单介绍~ 一.RPC RPC(Remote Proced ...

最新文章

  1. 面向对象进阶2 组合
  2. gitee 拉取其他分支_如何使用 Gitee 快速搭建 ESP-IDF 开发环境(Windows 版)
  3. netstat [选项]
  4. Anroid推送服务
  5. C++中父类的虚函数必需要实现吗?
  6. POJ 3253 Fence Repair 贪心
  7. 多元函数四则运算的一阶微分公式的存在性与性质
  8. Protues构建最小系统
  9. 计算机网络第七版 部分详细答案
  10. Dell Inspiron 14 3437装win7系统没有网卡驱动解决办法
  11. c语言常用函数doc下载,c语言常用函数.doc
  12. 远程桌面管理助手有哪些?11款最好的远程桌面软件推荐。
  13. 内存延时cl_内存延迟和内存时序有什么关系?内存速率和时钟周期| Crucial(英睿达)...
  14. C#实现图片压缩及裁剪
  15. python解决中文显示问题Glyph 24179 (\N{CJK UNIFIED IDEOGRAPH-5E73}) missing from current font. func(*args)
  16. 2021-07-15-2021年全球10大最佳单板计算机开发板(SBC)(第1-3名)
  17. python mysqldb_python mysqldb 教程
  18. 3d效果技术java,java3D技术展示
  19. 玩 Spring框架
  20. 安全生产管理云执法平台方案

热门文章

  1. ubuntu kylin 14.04编译openjdk-7u40
  2. MySQL完全备份与恢复
  3. Java逆向基础之AspectJ的Eclipse插件AJDT
  4. 白洁血战Node.js并发编程 01 状态机
  5. C++ Primer笔记 容器和算法(2)
  6. cnetos6,centos7添加新网卡,系统不识别的解决办法
  7. 杭电1860--统计字符
  8. 升级到Windows 8.1
  9. Exchange Server 2013预览版服务器角色概况
  10. Portal-Basic Java Web 应用开发框架:应用篇(十三) —— REST Convention