封装一个消息对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class Message implements Serializable{
private static final long serialVersionUID = 1L;
private String titile;
private String info;
public Message(String titile,String info){
this.titile=titile;
this.info=info;
}
public String getTitile() {
return titile;
}
public void setTitile(String titile) {
this.titile = titile;
}
public String getInfo() {
return info;
}
public void setInfo(String info) {
this.info = info;
}
}

  

为这个消息对象提供序列化方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class MessageUtil {
//convert To String
public static String convertToString(Object obj,String charset) throws IOException{
ByteArrayOutputStream bo = new ByteArrayOutputStream();
ObjectOutputStream oo = new ObjectOutputStream(bo);
oo.writeObject(obj);
String str = bo.toString(charset);
bo.close();
oo.close();
return str;
}
//convert To Message
public static Object convertToMessage(byte[] bytes) throws Exception{
ByteArrayInputStream in = new ByteArrayInputStream(bytes);
ObjectInputStream sIn = new ObjectInputStream(in);
return sIn.readObject();
}
}

  

从Jedis连接池中获取连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class RedisUtil {
/**
* Jedis connection pool
* @Title: config
*/
public static JedisPool getJedisPool(){
ResourceBundle bundle=ResourceBundle.getBundle("redis");
String host=bundle.getString("host");
int port=Integer.valueOf(bundle.getString("port"));
int timeout=Integer.valueOf(bundle.getString("timeout"));
//  String password=bundle.getString("password");
JedisPoolConfig config=new JedisPoolConfig();
config.setMaxActive(Integer.valueOf(bundle.getString("maxActive")));
config.setMaxWait(Integer.valueOf(bundle.getString("maxWait")));
config.setTestOnBorrow(Boolean.valueOf(bundle.getString("testOnBorrow")));
config.setTestOnReturn(Boolean.valueOf(bundle.getString("testOnReturn")));
JedisPool pool=new JedisPool(config, host, port, timeout);
return pool;
}
}

  

创建Provider类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class Producer {
private Jedis jedis;
private JedisPool pool;
public Producer(){
pool=RedisUtil.getJedisPool();
jedis = pool.getResource();
}
public void provide(String channel,Message message) throws IOException{
String str1=MessageUtil.convertToString(channel,"UTF-8");
String str2=MessageUtil.convertToString(message,"UTF-8");
jedis.publish(str1, str2);
}
//close the connection
public void close() throws IOException {
//将Jedis对象归还给连接池,关闭连接
pool.returnResource(jedis);
}
}

  

创建Consumer类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public class Consumer {
private Jedis jedis;
private JedisPool pool;
public Consumer(){
pool=RedisUtil.getJedisPool();
jedis = pool.getResource();
}
public void consum(String channel) throws IOException{
JedisPubSub jedisPubSub = new JedisPubSub() {
// 取得订阅的消息后的处理
public void onMessage(String channel, String message) {
System.out.println("Channel:"+channel);
System.out.println("Message:"+message.toString());
}
// 初始化订阅时候的处理
public void onSubscribe(String channel, int subscribedChannels) {
System.out.println("onSubscribe:"+channel);
}
// 取消订阅时候的处理
public void onUnsubscribe(String channel, int subscribedChannels) {
System.out.println("onUnsubscribe:"+channel);
}
// 初始化按表达式的方式订阅时候的处理
public void onPSubscribe(String pattern, int subscribedChannels) {
// System.out.println(pattern + "=" + subscribedChannels);
}
// 取消按表达式的方式订阅时候的处理
public void onPUnsubscribe(String pattern, int subscribedChannels) {
// System.out.println(pattern + "=" + subscribedChannels);
}
// 取得按表达式的方式订阅的消息后的处理
public void onPMessage(String pattern, String channel, String message) {
System.out.println(pattern + "=" + channel + "=" + message);
}
};
jedis.subscribe(jedisPubSub, channel);
}
//close the connection
public void close() throws IOException {
//将Jedis对象归还给连接池
pool.returnResource(jedis);
}
}

  

测试方法

1
2
3
4
5
6
7
8
9
10
11
12
13
public static void main(String[] args){
Message msg=new Message("hello!""this is the first message!");
Producer producer=new Producer();
Consumer consumer=new Consumer();
try {
producer.provide("chn1",msg);
consumer.consum("chn1");
catch (IOException e) {
e.printStackTrace();
}
}

  

本文转自邴越博客园博客,原文链接:http://www.cnblogs.com/binyue/p/4763352.html,如需转载请自行联系原作者

Redis笔记(七)Java实现Redis消息队列相关推荐

  1. Redis 笔记之 Java 操作 Redis(Jedis)

    Java 操作 Redis 环境准备 引入依赖 创建 jedis 对象 操作 Key 相关 API 操作 String 相关 API 操作 List 相关 API 操作 Set 的相关 API 操作 ...

  2. springboot使用redis实现消息队列功能,redis使用list和stream实现消息队列功能,redis实现消息队列的风险点分析

    文章目录 写在前面 基于list的消息队列解决方案 使用list基本实现消息队列 阻塞式消费,避免性能损失 替换while(true) 实现消息幂等 保证消息可靠性 基于stream的消息队列解决方案 ...

  3. redis的发布/订阅和mq消息队列的区别,该如何选择?

    好久没写笔记了,今天记录下使用消息队列的心得. 本文以reids和rocketmq对比 很多人一直有个疑问(包括我之前也是):redis支持已经消息队列(发布/订阅)了,为什么还需要mq呢? 项目已经 ...

  4. 使用redis的发布订阅模式实现消息队列

    配置文件 <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://w ...

  5. Redis笔记(六)Redis的消息通知

    Redis的消息通知可以使用List类型的LPUSH和RPOP(左进右出), 当然更方便的是直接使用Redis的Pub/Sub(发布/订阅)模式. 使用List实现队列 使用列表类型的LPUSH和RP ...

  6. Redis笔记(Linux部署redis过程)

    Redis部署openEuler 一.操作过程 ①以ROOT用户身份登录服务器 ②配置编译环境 使用wget下载相应的redis 云服务已经帮我们安装了所需要的环境 ③下载源码 wget http:/ ...

  7. activimq java集成_Java消息队列-Spring整合ActiveMq

    1.概述 首先和大家一起回顾一下Java 消息服务,在我之前的博客<Java消息队列-JMS概述>中,我为大家分析了: 消息服务:一个中间件,用于解决两个活多个程序之间的耦合,底层由Jav ...

  8. 【Redis笔记】一起学习Redis | 如何利用Redis实现一个分布式锁?

    一起学习Redis | 如何利用Redis实现一个分布式锁? 前提知识 什么是分布式锁? 为什么需要分布式锁? 分布式锁的5要素和三种实现方式 实现分布式锁 思考思考 基础方案 改进方案 保证setn ...

  9. 狂神redis笔记_狂神说redis笔记(一)

    一.Nosql概述 1.单机Mysql时代 90年代,一个网站的访问量一般不会太大,单个数据库完全够用.随着用户增多,网站出现以下问题: 数据量增加到一定程度,单机数据库就放不下了 数据的索引(B+ ...

  10. Java架构之消息队列 (一):消息队列的概述

    消息队列系列分享大纲: 一.消息队列的概述 二.消息队列之RabbitMQ的使用 三.消息队列之Kafka的使用 四.消息队列之RabbitMQ的原理详解 五.消息队列之Kafka的原理详解 六.消息 ...

最新文章

  1. HBase应用快速学习
  2. Windows server 2003 体系结构
  3. 快速理解Spark Dataset
  4. Codeforces Round #709 (Div. 1, based on Technocup 2021 Final Round) A. Basic Diplomacy
  5. linux虚拟主机_云服务器与虚拟主机的区别
  6. python读什么文件最快的软件_这些方法,能够让你的 Python 程序快如闪电
  7. 如何像高级开发人员一样思考?
  8. 从IBM SCE+落地中国看IDC的转型
  9. centos 用户和组的相关命令
  10. 电教室软件 linux,在深度操作系统上安装多媒体电子教室Veyon
  11. 安川g7变频器说明书_变频器功率调整
  12. 数字调制解调—扩频通信和伪码同步
  13. 进销存excel_Excel教程:教大家做简单的进销存
  14. Win10资源管理器CPU持续占用20%解决方法
  15. Unity中关于IphoneX的屏幕适配
  16. 猿辅导9-12编程题3道
  17. leetcode刷题第21天——1763,117,572
  18. 杂牌机搞机之旅最终章————刷入Xposed框架
  19. Java爬虫获取豆瓣的短评数据
  20. 今15年创业,享受改变的过程

热门文章

  1. 转:Ruby 的性能 与如何选用正确的framework来做web
  2. process is bad
  3. ASP.NET多附件上传和附件编辑的实现
  4. 一年太久,研究员决定不等补丁直接披露 Safari 0day 详情
  5. sonic——可替代Elasticsearch的简单搜索引擎
  6. 《数学计算出题系统》功能规格说明书
  7. EclipseIDE常用快捷键
  8. 十五张思维导图带你快速学习PHP语言基础
  9. InflateException:Bin file line #19:Error inflating class MyTextView
  10. jQuery插件开发的准备