一、Jedis简介

1、Jedis对应Redis的四种工作模式

对应关系如下:

Jedis主要模块 Redis工作模式
Jedis Redis Standalone(单节点模式)
JedisCluster Redis Cluster(集群模式)
JedisSentinel Redis Sentinel(哨兵模式)
ShardedJedis Redis Sharding(分片模式)

2、Jedis三种请求模式

Jedis实例有3种请求模式:Client、Pipeline和Transaction

Jedis实例通过Socket建立客户端与服务端的长连接,往outputStream发送命令,从inputStream读取回复

1)、Client模式

客户端发一个命令,阻塞等待服务端执行,然后读取返回结果

2)、Pipeline模式

一次性发送多个命令,最后一次取回所有的返回结果

这种模式通过减少网络的往返时间和IO的读写次数,大幅度提高通信性能

原生批命令(mset、mget)与Pipeline对比

  • 原生批量命令是原子性,Pipeline是非原子性的
  • 原生批量命令是一个命令对应多个key,Pipeline支持多个命令
  • 原生批量命令是Redis服务端支持实现的,而Pipeline需要服务端与客户端的共同实现

3)、Transaction模式

Transaction模式即开启Redis的事务管理

Redis事务可以一次执行多个命令, 并且带有以下三个重要的保证:

  • 批量操作在发送EXEC命令前被放入队列缓存
  • 收到EXEC命令后进入事务执行,事务中任意命令执行失败,其余的命令依然被执行
  • 在事务执行过程,其他客户端提交的命令请求不会插入到事务执行命令序列中

一个事务从开始到执行会经历以下三个阶段:

  • 开始事务
  • 命令入队
  • 执行事务

Redis事务相关命令:

  • multi:标记一个事务块的开始
  • exec:执行所有事务块的命令(一旦执行exec后,之前加的监控锁都会被取消掉)
  • discard:取消事务,放弃事务块中的所有命令
  • watch key1 key2 ...:监视一或多个key,如果在事务执行之前,被监视的key被其他命令改动,则事务被打断(类似乐观锁)
  • unwatch:取消watch对所有key的监控

二、Jedis模块源码解析

1、Jedis类结构

上图中,标为橘色的类为核心类

  1. Jedis以输入的命令参数是否为二进制,将处理请求的具体实现分为两个类中,例如Jedis和BinaryJedis、Client和BinaryClient
  2. 与Redis服务器的连接信息封装在Client的基类Connection中
  3. BinaryJedis类中包含了Client、Pipeline和Transaction变量,对应3种请求模式

2、Jedis的初始化流程

        Jedis jedis = new Jedis("localhost", 6379, 10000);Transaction transaction = jedis.multi();Pipeline pipeline = jedis.pipelined();

Jedis通过传入Redis服务器地址(host、port)开始初始化,然后在BinaryJedis里实例化Client。Client通过Socket维持客户端与Redis服务器的连接
Transaction和Pipeline继承同一个基类MultiKeyPipelineBase。Transaction在实例化的时候,就自动发送MULTI命令,开启事务模式;Pipeline按情况手动开启。它们均依靠Client发送命令

public class BinaryJedis implements BasicCommands, BinaryJedisCommands, MultiKeyBinaryCommands,AdvancedBinaryJedisCommands, BinaryScriptingCommands, Closeable {protected final Client client;protected Transaction transaction = null;protected Pipeline pipeline = null;public BinaryJedis(final String host, final int port) {client = new Client(host, port);}public BinaryJedis(final HostAndPort hostPort, final JedisClientConfig config) {client = new Client(hostPort, config);initializeFromClientConfig(config);}public Transaction multi() {client.multi();client.getOne(); // expected OKtransaction = new Transaction(client);return transaction;}public Pipeline pipelined() {pipeline = new Pipeline();pipeline.setClient(client);return pipeline;}

3、Jedis调用流程

1)、Client模式

get(key)为例,Jedis代码如下:

public class Jedis extends BinaryJedis implements JedisCommands, MultiKeyCommands,AdvancedJedisCommands, ScriptingCommands, BasicCommands, ClusterCommands, SentinelCommands,ModuleCommands {@Overridepublic String get(final String key) {// 校验是否是transaction模式或者pipeline模式checkIsInMultiOrPipeline();// 1)调用client发送命令client.get(key);// 2)从inputStream里读取回复return client.getBulkReply();}

代码1)处会调用Connection的sendCommand()方法,再调用Protocol的sendCommand()方法按照Redis的同一请求协议组织Redis命令,写入outputStream

public class Connection implements Closeable {public void sendCommand(final ProtocolCommand cmd, final byte[]... args) {try {// 连接socket,如果已连接,跳过connect();// 按照redis的同一请求协议组织redis命令,写入outputStreamProtocol.sendCommand(outputStream, cmd, args);} catch (JedisConnectionException ex) {/** When client send request which formed by invalid protocol, Redis send back error message* before close connection. We try to read it to provide reason of failure.*/try {String errorMessage = Protocol.readErrorLineIfPossible(inputStream);if (errorMessage != null && errorMessage.length() > 0) {ex = new JedisConnectionException(errorMessage, ex.getCause());}} catch (Exception e) {/** Catch any IOException or JedisConnectionException occurred from InputStream#read and just* ignore. This approach is safe because reading error message is optional and connection* will eventually be closed.*/}// Any other exceptions related to connection?broken = true;throw ex;}}

代码2)处调用Connection的getBulkReply()方法,再调用Protocol的read()方法从inputStream中读取服务器的回复,此处会阻塞等待

public class Connection implements Closeable {public String getBulkReply() {final byte[] result = getBinaryBulkReply();if (null != result) {return SafeEncoder.encode(result);} else {return null;}}public byte[] getBinaryBulkReply() {flush();return (byte[]) readProtocolWithCheckingBroken();}protected Object readProtocolWithCheckingBroken() {if (broken) {throw new JedisConnectionException("Attempting to read from a broken connection");}try {// 从inputStream中读取服务器的回复,此处阻塞等待return Protocol.read(inputStream);} catch (JedisConnectionException exc) {broken = true;throw exc;}}

Protocol是一个通讯工具类,将Redis的各类执行关键字存储为静态变量,可以直观调用命令,例如Protocol.Command.GET。同时,将命令包装成符合Redis的统一请求协议,回复消息的处理也是在这个类进行,先通过通讯协提取出当次请求的回复消息,将Object类型的消息,格式化为String、List等具体类型,如果回复消息有Error则以异常的形式抛出

Protocol核心方法如下

public final class Protocol {// 按照redis的同一请求协议组织redis命令,写入outputStreampublic static void sendCommand(final RedisOutputStream os, final ProtocolCommand command,final byte[]... args)  // 从inputStream中读取服务器的回复,此处阻塞等待,调用process()方法处理消息public static Object read(final RedisInputStream is)// 通过检查服务器发回数据的第一个字节,确定这个回复是什么类型,分别交给下面5个函数处理  private static Object process(final RedisInputStream is) // 处理状态回复  private static byte[] processStatusCodeReply(final RedisInputStream is)// 处理批量回复  private static byte[] processBulkReply(final RedisInputStream is)// 处理多条批量回复  private static List<Object> processMultiBulkReply(final RedisInputStream is)// 处理整数回复private static Long processInteger(final RedisInputStream is)  // 处理错误回复  private static void processError(final RedisInputStream is)

2)、Pipeline模式和Transaction模式

  1. Pipeline和Transaction都继承自MultiKeyPipelineBase
  2. MultiKeyPipelineBase和PipelineBase的区别在于处理的命令不同,内部均调用Client发送命令
  3. Pipeline有一个内部类对象MultiResponseBuilder,当调用Pipeline的sync()之前,存储所有返回结果

Pipeline的使用方法:

        Pipeline pipeline = jedis.pipelined();Response<String> key1 = pipeline.get("key1");Response<String> key2 = pipeline.get("key2");Response<String> key3 = pipeline.get("key3");pipeline.sync();System.out.println("value1:" + key1.get() + ",value2:" + key2.get() + ",value3:" + key3.get());

Transaction的使用方法:

        Transaction transaction = jedis.multi();Response<String> key1 = transaction.get("key1");Response<String> key2 = transaction.get("key2");Response<String> key3 = transaction.get("key3");transaction.exec();System.out.println("value1:" + key1.get() + ",value2:" + key2.get() + ",value3:" + key3.get());

get(key)为例,Pipeline代码如下:

public abstract class PipelineBase extends Queable implements BinaryRedisPipeline, RedisPipeline {@Overridepublic Response<String> get(final String key) {// 调用client发送命令getClient(key).get(key);// 新建response,放入消息队列queue,此时response没有数据return getResponse(BuilderFactory.STRING);}
public class Queable {private Queue<Response<?>> pipelinedResponses = new LinkedList<>();protected <T> Response<T> getResponse(Builder<T> builder) {Response<T> lr = new Response<>(builder);pipelinedResponses.add(lr);return lr;}

Pipeline的sync()方法代码如下:

public class Pipeline extends MultiKeyPipelineBase implements Closeable {public void sync() {// 判断消息队列是否为空,是否发出请求if (getPipelinedResponseLength() > 0) {// 1)从inputStream中获取回复消息,消息塞入消息队列的response中List<Object> unformatted = client.getMany(getPipelinedResponseLength());for (Object o : unformatted) {generateResponse(o);}}}

代码1)会调用Connection的getMany()方法,代码如下:

public class Connection implements Closeable {public List<Object> getMany(final int count) {flush();final List<Object> responses = new ArrayList<>(count);for (int i = 0; i < count; i++) {try {responses.add(readProtocolWithCheckingBroken());} catch (JedisDataException e) {responses.add(e);}}return responses;}protected Object readProtocolWithCheckingBroken() {if (broken) {throw new JedisConnectionException("Attempting to read from a broken connection");}try {// 从inputStream中读取服务器的回复,此处阻塞等待return Protocol.read(inputStream);} catch (JedisConnectionException exc) {broken = true;throw exc;}}

Pipeline的sync()方法中调用Queable的方法如下:

public class Queable {private Queue<Response<?>> pipelinedResponses = new LinkedList<>();protected Response<?> generateResponse(Object data) {Response<?> response = pipelinedResponses.poll();if (response != null) {response.set(data);}return response;}protected int getPipelinedResponseLength() {return pipelinedResponses.size();}

Transaction的exec()方法和Pipeline的sync()很相似,代码如下:

public class Transaction extends MultiKeyPipelineBase implements Closeable {public List<Object> exec() {// 清空inputstream里面的所有数据,忽略QUEUED or ERROR回复client.getMany(getPipelinedResponseLength());// 发送EXEC指令,让服务端执行所有命令client.exec();// 事务结束inTransaction = false;// 从inputStream中读取所有回复List<Object> unformatted = client.getObjectMultiBulkReply();if (unformatted == null) {return null;}// 处理响应结果List<Object> formatted = new ArrayList<>();for (Object o : unformatted) {try {formatted.add(generateResponse(o).get());} catch (JedisDataException e) {formatted.add(e);}}return formatted;}

参考

Jedis源码分析(一)-Jedis介绍

Jedis源码分析(二)-Jedis的内部实现(Client,Pipeline,Transaction)

Jedis源码解析(一):Jedis简介、Jedis模块源码解析相关推荐

  1. 易语言linux数据库模块,易语言ADO数据库对象模块源码

    下面我们对易语言ADO数据库对象模块源码文件阐述相关使用资料和易语言ADO数据库对象模块源码文件的更新信息. 易语言ADO数据库对象模块源码 易语言ADO数据库对象模块源码 系统结构:list,取错误 ...

  2. 易语言如何做post服务器,易语言服务器提交post模块源码

    下面我们对易语言服务器提交post模块源码[db:版本]文件阐述相关使用资料和易语言服务器提交post模块源码[db:版本]文件的更新信息. 易语言服务器提交post模块源码 易语言服务器提交post ...

  3. 紫微排盘源码php,易语言紫微斗数排盘模块源码

    下面我们对易语言紫微斗数排盘模块源码文件阐述相关使用资料和易语言紫微斗数排盘模块源码文件的更新信息. 易语言紫微斗数排盘模块源码 易语言紫微斗数排盘模块源码 系统结构:紫微斗数排盘,GetStarLe ...

  4. 营销模块数据库表解析:限时购功能

    本文主要对限时购(秒杀)功能相关表进行解析,采用数据库表与功能对照的形式. 相关表结构 限时购表 用于存储限时购活动的信息,包括开始时间.结束时间以及上下线状态. create table sms_f ...

  5. sparkcore分区_Spark学习:Spark源码和调优简介 Spark Core (二)

    本文基于 Spark 2.4.4 版本的源码,试图分析其 Core 模块的部分实现原理,其中如有错误,请指正.为了简化论述,将部分细节放到了源码中作为注释,因此正文中是主要内容. 第一部分内容见: S ...

  6. Java_io体系之BufferedWriter、BufferedReader简介、走进源码及示例——16

    Java_io体系之BufferedWriter.BufferedReader简介.走进源码及示例--16 一:BufferedWriter 1.类功能简介: BufferedWriter.缓存字符输 ...

  7. Java_io体系之RandomAccessFile简介、走进源码及示例——20

    Java_io体系之RandomAccessFile简介.走进源码及示例--20 RandomAccessFile 1.       类功能简介: 文件随机访问流.关心几个特点: 1.他实现的接口不再 ...

  8. LambdaMART简介——基于Ranklib源码(一 lambda计算)

     LambdaMART简介--基于Ranklib源码(一 lambda计算) 时间:2014-08-09 21:01:49      阅读:168      评论:0      收藏:0      ...

  9. LambdaMART简介——基于Ranklib源码(二 Regression Tree训练)

     LambdaMART简介--基于Ranklib源码(二 Regression Tree训练) 上一节中介绍了 λ λ 的计算,lambdaMART就以计算的每个doc的 λ λ 值作为label ...

最新文章

  1. MongoDB极简教程
  2. http://m.blog.csdn.net/article/details?id=2630620
  3. editplus 批量删除 重复行
  4. yolov5损失函数笔记
  5. 机器学习——支持向量机SVM之多分类问题
  6. Apache Ignite本机持久性,简要概述
  7. 玩! 框架:为什么我会爱上它
  8. 防火墙问题 Linux系统 /etc/sysconfig/路径下无iptables文件
  9. 带你全面了解真正的CleanMyMac,CleanMyMac使用说明
  10. XML文档的使用方法
  11. 程序员们,你知道面试官是如何考察你的软素质吗?
  12. 软件工程-----个人总结
  13. messagedigest 图片加密_MessageDigest对密码进行加密
  14. 复旦大学邱锡鹏教授带你梳理深度学习知识脉络(直播彩蛋)
  15. 【MySQL学习笔记(十六)】之redo日志超详细讲解
  16. mysql informix_INFORMIX数据库函数
  17. ie窗口如何最大化设置
  18. 入门激光雷达点云的3D目标检测
  19. EPOCH, BATCH, INTERATION
  20. js实现页面定时跳转

热门文章

  1. php 两层便利的break,php break跳出多重循环实例
  2. 蜂巢 - Thinking in Agile - 我们需要怎样的软件过程(2)
  3. win10 mrt 找不到
  4. c语言怎么进行字符串比较,c语言怎么进行字符串比较
  5. 【机器视觉】教你选择工业机器人视觉系统!
  6. 京东华为P20手机评论数据抓取
  7. QT信号槽第五个参数
  8. spring boot手工DIY网站毕业设计源码310226
  9. 考研英语阅读理解做题技巧(2):主旨题
  10. C文件操作——基础知识