Redis——使用Jedis操作stream
低版本的Jedis不支持stream操作;
maven依赖:
<dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>3.3.0</version></dependency>
另外需要依赖个sl4j日志;
Demo1:
/*** 2020年11月5日下午4:55:43*/
package testJedisStream;import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.StreamEntry;
import redis.clients.jedis.StreamEntryID;/*** @author XWF**/
public class TestJedisStream {private static JedisPoolConfig jpc = new JedisPoolConfig();private static JedisPool jedisPool;static {jpc.setMaxTotal(200);jpc.setMaxIdle(10);jpc.setMaxWaitMillis(1000);jpc.setTestOnBorrow(true);jpc.setTestOnReturn(true);jedisPool = new JedisPool(jpc, "192.168.1.187", 6379, 1000, "654321", 1);}/*** @param args*/public static void main(String[] args) {System.out.println("StreamEntryID.NEW_ENTRY=" + StreamEntryID.NEW_ENTRY);System.out.println("StreamEntryID.LAST_ENTRY=" + StreamEntryID.LAST_ENTRY);System.out.println("StreamEntryID.UNRECEIVED_ENTRY=" + StreamEntryID.UNRECEIVED_ENTRY);System.out.println();xadd();xlen();System.out.println();xread();xrange();xrevrange();System.out.println();xdel();xlen();System.out.println();xadd();xadd();xlen();xtrim();xlen();try(Jedis jd = jedisPool.getResource()) {jd.del("k");}catch (Exception e) {e.printStackTrace();}}public static void xadd() {try(Jedis jd = jedisPool.getResource()) {Map<String, String> hash = new HashMap<>();hash.put("name", "Tom");hash.put("age", "13");//key, id, hashStreamEntryID id = jd.xadd("k", StreamEntryID.NEW_ENTRY, hash);
// StreamEntryID id = jd.xadd("k", new StreamEntryID("1-1"), hash);System.out.println("xadd1 id:" + id.toString());Map<String, String> hash2 = new HashMap<>();hash2.put("name", "Jerry");hash2.put("age", "12");//key, id, hash, len, ~(false)StreamEntryID id2 = jd.xadd("k", StreamEntryID.NEW_ENTRY, hash2, 5, false);System.out.println("xadd2 id:" + id2.toString());}catch(Exception e) {e.printStackTrace();}}public static void xlen() {try(Jedis jd = jedisPool.getResource()) {long len = jd.xlen("k");System.out.println("len=" + len);}catch(Exception e) {e.printStackTrace();}}public static void xread() {try(Jedis jd = jedisPool.getResource()) {//count, block, key-id...List<Entry<String, List<StreamEntry>>> list = jd.xread(1, 1000, new MyJedisEntry("k", "0"), new MyJedisEntry("k", "0"));System.out.println(list);}catch(Exception e) {e.printStackTrace();}}public static void xrange() {try(Jedis jd = jedisPool.getResource()) {//key, start, end, count//null表示无穷小或者无穷大List<StreamEntry> list = jd.xrange("k", null, null, 100);System.out.println("xrange:" + list);}catch(Exception e) {e.printStackTrace();}}public static void xrevrange() {try(Jedis jd = jedisPool.getResource()) {//key, end, start, countList<StreamEntry> list = jd.xrevrange("k", null, null, 100);System.out.println("xrevrange:" + list);}catch(Exception e) {e.printStackTrace();}}public static void xdel() {try(Jedis jd = jedisPool.getResource()) {List<Entry<String, List<StreamEntry>>> getid = jd.xread(1, 0, new MyJedisEntry("k", "0"));Entry<String, List<StreamEntry>> k_entrylist = getid.get(0);List<StreamEntry> entrylist = k_entrylist.getValue();String id = entrylist.get(0).getID().toString();//key, id...long result = jd.xdel("k", new StreamEntryID(id));System.out.println("xdel " + id + " return:" + result);}catch(Exception e) {e.printStackTrace();}}public static void xtrim() {try(Jedis jd = jedisPool.getResource()) {//key, len, ~(false)long result = jd.xtrim("k", 2, false);System.out.println("xtrim 2 return=" + result);}catch(Exception e) {e.printStackTrace();}}}
class MyJedisEntry implements Entry<String, StreamEntryID>{private String k;private StreamEntryID id;public MyJedisEntry(String key, String id){this.k = key;if("0".equals(id)) {this.id = new StreamEntryID();}else {this.id = new StreamEntryID(id);}}public MyJedisEntry(String key, StreamEntryID ID) {this.k = key;this.id = ID;}@Overridepublic String getKey() {return k;}@Overridepublic StreamEntryID getValue() {return id;}@Overridepublic StreamEntryID setValue(StreamEntryID value) {this.id = value;return id;}
}
结果1:
Demo2:
/*** 2020年11月6日上午9:47:18*/
package testJedisStream;import java.util.List;
import java.util.Map.Entry;import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.StreamConsumersInfo;
import redis.clients.jedis.StreamEntry;
import redis.clients.jedis.StreamEntryID;
import redis.clients.jedis.StreamGroupInfo;
import redis.clients.jedis.StreamInfo;
import redis.clients.jedis.StreamPendingEntry;/*** @author XWF**/
public class TestJedisStream2 {private static JedisPoolConfig jpc = new JedisPoolConfig();private static JedisPool jedisPool;static {jpc.setMaxTotal(200);jpc.setMaxIdle(10);jpc.setMaxWaitMillis(1000);jpc.setTestOnBorrow(true);jpc.setTestOnReturn(true);jedisPool = new JedisPool(jpc, "192.168.1.187", 6379, 1000, "654321", 1);}/*** @param args*/public static void main(String[] args) {xgroup();System.out.println();xreadgroup();xpending();System.out.println();xack();xpending();System.out.println();xclaim();xpending();System.out.println();xinfo();try(Jedis jd = jedisPool.getResource()) {jd.del("k");}catch (Exception e) {e.printStackTrace();}}public static void xgroup() {try(Jedis jd = jedisPool.getResource()) {//key, groupName, id, makeStreamString g1result = jd.xgroupCreate("k", "g1", new StreamEntryID(), true);System.out.println("xgroupCreate g1 result:" + g1result);String g2result = jd.xgroupCreate("k", "g2", new StreamEntryID(), false);System.out.println("xgroupCreate g2 result:" + g2result);String setIdResult = jd.xgroupSetID("k", "g2", new StreamEntryID("123-123"));System.out.println("xgroupSetID result:" + setIdResult);Long delConsumerResult = jd.xgroupDelConsumer("k", "g2", "Tom");System.out.println("xgroupDelConsumer result:" + delConsumerResult);long destroyResult = jd.xgroupDestroy("k", "g2");System.out.println("xgroupDestroy result:" + destroyResult);}catch(Exception e) {e.printStackTrace();}}public static void xreadgroup() {try(Jedis jd = jedisPool.getResource()) {TestJedisStream.xadd();MyJedisEntry entry = new MyJedisEntry("k", StreamEntryID.UNRECEIVED_ENTRY);//groupName, consumer, count, block, noAck(false加入pending列表), streamsList<Entry<String, List<StreamEntry>>> xreadGroupResult = jd.xreadGroup("g1", "Jerry", 2, 0, false, entry);System.out.println("xreadGroup result:" + xreadGroupResult);}catch(Exception e) {e.printStackTrace();}}public static void xpending() {try(Jedis jd = jedisPool.getResource()) {//key, groupName, start, end, count, consumerList<StreamPendingEntry> pendings = jd.xpending("k", "g1", null, null, 10, null);System.out.println("xpending result:" + pendings);//StreamPendingEntry
// pendings.get(0).getID().toString();
// pendings.get(0).getConsumerName();
// pendings.get(0).getIdleTime();
// pendings.get(0).getDeliveredTimes();}catch(Exception e) {e.printStackTrace();}}public static void xack() {try(Jedis jd = jedisPool.getResource()) {List<StreamPendingEntry> pendings = jd.xpending("k", "g1", null, null, 10, null);StreamEntryID id = pendings.get(0).getID();//key, groupName, ids...long xackResult = jd.xack("k", "g1", id);System.out.println("xack " + id + " result:" + xackResult);}catch(Exception e) {e.printStackTrace();}}public static void xclaim() {try(Jedis jd = jedisPool.getResource()) {List<StreamPendingEntry> pendings = jd.xpending("k", "g1", null, null, 10, null);StreamEntryID id = pendings.get(0).getID();//key, group, consumer, minIdleTime, newIdleTime, retryCount, force, ids...List<StreamEntry> xclaimResult = jd.xclaim("k", "g1", "Jack", 2, 1000000, 20, false, id);System.out.println("xclaim " + id + " result:" + xclaimResult);//StreamEntry
// xclaimResult.get(0).getID();//StreamEntryID
// xclaimResult.get(0).getFields();//Map<String, String>}catch(Exception e) {e.printStackTrace();}}public static void xinfo() {try(Jedis jd = jedisPool.getResource()) {List<StreamConsumersInfo> xinfoConsumersResult = jd.xinfoConsumers("k", "g1");System.out.println("xinfoConsumers result:" + xinfoConsumersResult);for( StreamConsumersInfo consumersinfo : xinfoConsumersResult) {System.out.println("-ConsumerInfo:" + consumersinfo.getConsumerInfo());System.out.println("--Name:" + consumersinfo.getName());System.out.println("--Pending:" + consumersinfo.getPending());System.out.println("--Idle:" + consumersinfo.getIdle());}List<StreamGroupInfo> xinfoGroupResult = jd.xinfoGroup("k");System.out.println("xinfoGroup result:" + xinfoGroupResult);for(StreamGroupInfo groupinfo : xinfoGroupResult) {System.out.println("-GroupInfo:" + groupinfo.getGroupInfo());System.out.println("--Name:" + groupinfo.getName());System.out.println("--Consumers:" + groupinfo.getConsumers());System.out.println("--Pending:" + groupinfo.getPending());System.out.println("--LastDeliveredId:" + groupinfo.getLastDeliveredId());}StreamInfo xinfoStreamResult = jd.xinfoStream("k");System.out.println("xinfoStream result:" + xinfoStreamResult);System.out.println("-StreamInfo:" + xinfoStreamResult.getStreamInfo());System.out.println("--Length:" + xinfoStreamResult.getLength());System.out.println("--RadixTreeKeys:" + xinfoStreamResult.getRadixTreeKeys());System.out.println("--RadixTreeNodes():" + xinfoStreamResult.getRadixTreeNodes());System.out.println("--Groups:" + xinfoStreamResult.getGroups());System.out.println("--LastGeneratedId:" + xinfoStreamResult.getLastGeneratedId());System.out.println("--FirstEntry:" + xinfoStreamResult.getFirstEntry());System.out.println("--LastEntry:" + xinfoStreamResult.getLastEntry());}catch(Exception e) {e.printStackTrace();}}
}
结果2:
Redis——使用Jedis操作stream相关推荐
- jedis操作set_Java中使用Jedis操作Redis的示例代码
使用java操作Redis需要jedis-2.1.0.jar,下载地址:jedis-2.1.0.jar 如果需要使用Redis连接池的话,还需commons-pool-1.5.4.jar,下载地址:c ...
- Jedis操作Redis数据库
添加Maven依赖: 1 <dependencies> 2 <!-- 单元测试 --> 3 <dependency> 4 <groupId>junit& ...
- 一次redis连接配置修改引发的redis.clients.jedis.exceptions.JedisConnectionException: Unexpected end of stream.异常
一次redis连接配置修改引发的redis.clients.jedis.exceptions.JedisConnectionException: Unexpected end of stream.异常 ...
- jedis操作redis(一)
redis是一个常用的内存Nosql数据库,为什么要用nosql,为什么要用redis不用memcache这些很多博客以及讲得很清楚了. 下面介绍redis的5大基本数据类型的常用操作: STRING ...
- Java中使用Jedis操作Redis
2019独角兽企业重金招聘Python工程师标准>>> 需要jar包: jedis-2.1.0.jar commons-pool-1.6.jar 单元测试: package com. ...
- Redis进阶-Jedis以及Spring Boot操作 Redis 5.x Cluster
文章目录 Pre Jedis操作Redis Cluster 添加依赖 Code Spring Boot 操作Redis Cluster 引入 依赖 application.yml Code Pre R ...
- java jedis使用_Java中使用Jedis操作Redis
Java中使用Jedis操作Redis 使用Java操作Redis需要jedis-2.1.0.jar,下载地址:http://files.cnblogs.com/liuling/jedis-2.1.0 ...
- 【Redis】7.使用jedis操作redis数据库
jedis jedis是java程序操纵Redis的工具. Jedis是Redis官方推荐的Java链接工具 使用前导入,下面的测试建议也导入测试的包 <!-- 导入jedis的包--> ...
- java代码简单操作Redis数据Jedis jar
java操作Redis数据API->Jedis Jedis引入 作为java码农,如何在代码中操作Redis呢? Jedis的介绍 Redis不仅可以使用命令来操作,现在基本上主流的语言都有AP ...
最新文章
- UA MATH523A 实分析2 测度论基础2 集族与单调类
- C++实现Dijkstra(迪杰斯特拉)算法(附完整源码)
- dotnet core调试docker下生成的dump文件
- DuangDuangDuang!码云项目的 Readme.md 特殊技能
- esb接口测试_接口测试用例.docx
- Redis集群部署(半自动)
- Android AP模式下获取SSID/PASSWORD
- 封装一个cookie
- Jquery-无法有效获取当前窗口高度
- PAIP.AHK调试以及同于脚本的调试法
- es服务器的cpu压力过大的调试
- TCP和UDP对比的优势和劣势
- “云+移动互联时代,2014移动平台高峰论坛”隆重举行
- 18 个 JavaScript 入门技巧
- 汇编语言:写一个简单的音乐程序
- 网站建设和网站运营,网站如何进行宣传推广
- 阅读 | 《娱乐至死》笔记 | Part2
- 高通量测序与杂交优势
- matlab示波器导出csv数据,示波器CSV波形数据导入Matlab进行FFT分析
- 【CSS】CSS 文本样式 ② ( font 字体设置 | CSS 2.0手册使用 | font-weight 字体粗细设置 | font-style 字体斜体设置 | font 字体样式综合写法 )