Jedis源码解析(一):Jedis简介、Jedis模块源码解析
一、Jedis简介
1、Jedis对应Redis的四种工作模式
对应关系如下:
Jedis主要模块 | Redis工作模式 |
---|---|
Jedis | Redis Standalone(单节点模式) |
JedisCluster | Redis Cluster(集群模式) |
JedisSentinel | Redis Sentinel(哨兵模式) |
ShardedJedis | Redis Sharding(分片模式) |
2、Jedis三种请求模式
Jedis实例有3种请求模式:Client、Pipeline和Transaction
Jedis实例通过Socket建立客户端与服务端的长连接,往outputStream发送命令,从inputStream读取回复
1)、Client模式
客户端发一个命令,阻塞等待服务端执行,然后读取返回结果
2)、Pipeline模式
一次性发送多个命令,最后一次取回所有的返回结果
这种模式通过减少网络的往返时间和IO的读写次数,大幅度提高通信性能
原生批命令(mset、mget)与Pipeline对比
- 原生批量命令是原子性,Pipeline是非原子性的
- 原生批量命令是一个命令对应多个key,Pipeline支持多个命令
- 原生批量命令是Redis服务端支持实现的,而Pipeline需要服务端与客户端的共同实现
3)、Transaction模式
Transaction模式即开启Redis的事务管理
Redis事务可以一次执行多个命令, 并且带有以下三个重要的保证:
- 批量操作在发送EXEC命令前被放入队列缓存
- 收到EXEC命令后进入事务执行,事务中任意命令执行失败,其余的命令依然被执行
- 在事务执行过程,其他客户端提交的命令请求不会插入到事务执行命令序列中
一个事务从开始到执行会经历以下三个阶段:
- 开始事务
- 命令入队
- 执行事务
Redis事务相关命令:
multi
:标记一个事务块的开始exec
:执行所有事务块的命令(一旦执行exec后,之前加的监控锁都会被取消掉)discard
:取消事务,放弃事务块中的所有命令watch key1 key2 ...
:监视一或多个key,如果在事务执行之前,被监视的key被其他命令改动,则事务被打断(类似乐观锁)unwatch
:取消watch对所有key的监控
二、Jedis模块源码解析
1、Jedis类结构
上图中,标为橘色的类为核心类
- Jedis以输入的命令参数是否为二进制,将处理请求的具体实现分为两个类中,例如Jedis和BinaryJedis、Client和BinaryClient
- 与Redis服务器的连接信息封装在Client的基类Connection中
- BinaryJedis类中包含了Client、Pipeline和Transaction变量,对应3种请求模式
2、Jedis的初始化流程
Jedis jedis = new Jedis("localhost", 6379, 10000);Transaction transaction = jedis.multi();Pipeline pipeline = jedis.pipelined();
Jedis通过传入Redis服务器地址(host、port)开始初始化,然后在BinaryJedis里实例化Client。Client通过Socket维持客户端与Redis服务器的连接
Transaction和Pipeline继承同一个基类MultiKeyPipelineBase。Transaction在实例化的时候,就自动发送MULTI命令,开启事务模式;Pipeline按情况手动开启。它们均依靠Client发送命令
public class BinaryJedis implements BasicCommands, BinaryJedisCommands, MultiKeyBinaryCommands,AdvancedBinaryJedisCommands, BinaryScriptingCommands, Closeable {protected final Client client;protected Transaction transaction = null;protected Pipeline pipeline = null;public BinaryJedis(final String host, final int port) {client = new Client(host, port);}public BinaryJedis(final HostAndPort hostPort, final JedisClientConfig config) {client = new Client(hostPort, config);initializeFromClientConfig(config);}public Transaction multi() {client.multi();client.getOne(); // expected OKtransaction = new Transaction(client);return transaction;}public Pipeline pipelined() {pipeline = new Pipeline();pipeline.setClient(client);return pipeline;}
3、Jedis调用流程
1)、Client模式
以get(key)
为例,Jedis代码如下:
public class Jedis extends BinaryJedis implements JedisCommands, MultiKeyCommands,AdvancedJedisCommands, ScriptingCommands, BasicCommands, ClusterCommands, SentinelCommands,ModuleCommands {@Overridepublic String get(final String key) {// 校验是否是transaction模式或者pipeline模式checkIsInMultiOrPipeline();// 1)调用client发送命令client.get(key);// 2)从inputStream里读取回复return client.getBulkReply();}
代码1)处会调用Connection的sendCommand()
方法,再调用Protocol的sendCommand()
方法按照Redis的同一请求协议组织Redis命令,写入outputStream
public class Connection implements Closeable {public void sendCommand(final ProtocolCommand cmd, final byte[]... args) {try {// 连接socket,如果已连接,跳过connect();// 按照redis的同一请求协议组织redis命令,写入outputStreamProtocol.sendCommand(outputStream, cmd, args);} catch (JedisConnectionException ex) {/** When client send request which formed by invalid protocol, Redis send back error message* before close connection. We try to read it to provide reason of failure.*/try {String errorMessage = Protocol.readErrorLineIfPossible(inputStream);if (errorMessage != null && errorMessage.length() > 0) {ex = new JedisConnectionException(errorMessage, ex.getCause());}} catch (Exception e) {/** Catch any IOException or JedisConnectionException occurred from InputStream#read and just* ignore. This approach is safe because reading error message is optional and connection* will eventually be closed.*/}// Any other exceptions related to connection?broken = true;throw ex;}}
代码2)处调用Connection的getBulkReply()
方法,再调用Protocol的read()
方法从inputStream中读取服务器的回复,此处会阻塞等待
public class Connection implements Closeable {public String getBulkReply() {final byte[] result = getBinaryBulkReply();if (null != result) {return SafeEncoder.encode(result);} else {return null;}}public byte[] getBinaryBulkReply() {flush();return (byte[]) readProtocolWithCheckingBroken();}protected Object readProtocolWithCheckingBroken() {if (broken) {throw new JedisConnectionException("Attempting to read from a broken connection");}try {// 从inputStream中读取服务器的回复,此处阻塞等待return Protocol.read(inputStream);} catch (JedisConnectionException exc) {broken = true;throw exc;}}
Protocol是一个通讯工具类,将Redis的各类执行关键字存储为静态变量,可以直观调用命令,例如Protocol.Command.GET
。同时,将命令包装成符合Redis的统一请求协议,回复消息的处理也是在这个类进行,先通过通讯协提取出当次请求的回复消息,将Object类型的消息,格式化为String、List等具体类型,如果回复消息有Error则以异常的形式抛出
Protocol核心方法如下:
public final class Protocol {// 按照redis的同一请求协议组织redis命令,写入outputStreampublic static void sendCommand(final RedisOutputStream os, final ProtocolCommand command,final byte[]... args) // 从inputStream中读取服务器的回复,此处阻塞等待,调用process()方法处理消息public static Object read(final RedisInputStream is)// 通过检查服务器发回数据的第一个字节,确定这个回复是什么类型,分别交给下面5个函数处理 private static Object process(final RedisInputStream is) // 处理状态回复 private static byte[] processStatusCodeReply(final RedisInputStream is)// 处理批量回复 private static byte[] processBulkReply(final RedisInputStream is)// 处理多条批量回复 private static List<Object> processMultiBulkReply(final RedisInputStream is)// 处理整数回复private static Long processInteger(final RedisInputStream is) // 处理错误回复 private static void processError(final RedisInputStream is)
2)、Pipeline模式和Transaction模式
- Pipeline和Transaction都继承自MultiKeyPipelineBase
- MultiKeyPipelineBase和PipelineBase的区别在于处理的命令不同,内部均调用Client发送命令
- Pipeline有一个内部类对象MultiResponseBuilder,当调用Pipeline的
sync()
之前,存储所有返回结果
Pipeline的使用方法:
Pipeline pipeline = jedis.pipelined();Response<String> key1 = pipeline.get("key1");Response<String> key2 = pipeline.get("key2");Response<String> key3 = pipeline.get("key3");pipeline.sync();System.out.println("value1:" + key1.get() + ",value2:" + key2.get() + ",value3:" + key3.get());
Transaction的使用方法:
Transaction transaction = jedis.multi();Response<String> key1 = transaction.get("key1");Response<String> key2 = transaction.get("key2");Response<String> key3 = transaction.get("key3");transaction.exec();System.out.println("value1:" + key1.get() + ",value2:" + key2.get() + ",value3:" + key3.get());
以get(key)
为例,Pipeline代码如下:
public abstract class PipelineBase extends Queable implements BinaryRedisPipeline, RedisPipeline {@Overridepublic Response<String> get(final String key) {// 调用client发送命令getClient(key).get(key);// 新建response,放入消息队列queue,此时response没有数据return getResponse(BuilderFactory.STRING);}
public class Queable {private Queue<Response<?>> pipelinedResponses = new LinkedList<>();protected <T> Response<T> getResponse(Builder<T> builder) {Response<T> lr = new Response<>(builder);pipelinedResponses.add(lr);return lr;}
Pipeline的sync()
方法代码如下:
public class Pipeline extends MultiKeyPipelineBase implements Closeable {public void sync() {// 判断消息队列是否为空,是否发出请求if (getPipelinedResponseLength() > 0) {// 1)从inputStream中获取回复消息,消息塞入消息队列的response中List<Object> unformatted = client.getMany(getPipelinedResponseLength());for (Object o : unformatted) {generateResponse(o);}}}
代码1)会调用Connection的getMany()
方法,代码如下:
public class Connection implements Closeable {public List<Object> getMany(final int count) {flush();final List<Object> responses = new ArrayList<>(count);for (int i = 0; i < count; i++) {try {responses.add(readProtocolWithCheckingBroken());} catch (JedisDataException e) {responses.add(e);}}return responses;}protected Object readProtocolWithCheckingBroken() {if (broken) {throw new JedisConnectionException("Attempting to read from a broken connection");}try {// 从inputStream中读取服务器的回复,此处阻塞等待return Protocol.read(inputStream);} catch (JedisConnectionException exc) {broken = true;throw exc;}}
Pipeline的sync()
方法中调用Queable的方法如下:
public class Queable {private Queue<Response<?>> pipelinedResponses = new LinkedList<>();protected Response<?> generateResponse(Object data) {Response<?> response = pipelinedResponses.poll();if (response != null) {response.set(data);}return response;}protected int getPipelinedResponseLength() {return pipelinedResponses.size();}
Transaction的exec()
方法和Pipeline的sync()
很相似,代码如下:
public class Transaction extends MultiKeyPipelineBase implements Closeable {public List<Object> exec() {// 清空inputstream里面的所有数据,忽略QUEUED or ERROR回复client.getMany(getPipelinedResponseLength());// 发送EXEC指令,让服务端执行所有命令client.exec();// 事务结束inTransaction = false;// 从inputStream中读取所有回复List<Object> unformatted = client.getObjectMultiBulkReply();if (unformatted == null) {return null;}// 处理响应结果List<Object> formatted = new ArrayList<>();for (Object o : unformatted) {try {formatted.add(generateResponse(o).get());} catch (JedisDataException e) {formatted.add(e);}}return formatted;}
参考:
Jedis源码分析(一)-Jedis介绍
Jedis源码分析(二)-Jedis的内部实现(Client,Pipeline,Transaction)
Jedis源码解析(一):Jedis简介、Jedis模块源码解析相关推荐
- 易语言linux数据库模块,易语言ADO数据库对象模块源码
下面我们对易语言ADO数据库对象模块源码文件阐述相关使用资料和易语言ADO数据库对象模块源码文件的更新信息. 易语言ADO数据库对象模块源码 易语言ADO数据库对象模块源码 系统结构:list,取错误 ...
- 易语言如何做post服务器,易语言服务器提交post模块源码
下面我们对易语言服务器提交post模块源码[db:版本]文件阐述相关使用资料和易语言服务器提交post模块源码[db:版本]文件的更新信息. 易语言服务器提交post模块源码 易语言服务器提交post ...
- 紫微排盘源码php,易语言紫微斗数排盘模块源码
下面我们对易语言紫微斗数排盘模块源码文件阐述相关使用资料和易语言紫微斗数排盘模块源码文件的更新信息. 易语言紫微斗数排盘模块源码 易语言紫微斗数排盘模块源码 系统结构:紫微斗数排盘,GetStarLe ...
- 营销模块数据库表解析:限时购功能
本文主要对限时购(秒杀)功能相关表进行解析,采用数据库表与功能对照的形式. 相关表结构 限时购表 用于存储限时购活动的信息,包括开始时间.结束时间以及上下线状态. create table sms_f ...
- sparkcore分区_Spark学习:Spark源码和调优简介 Spark Core (二)
本文基于 Spark 2.4.4 版本的源码,试图分析其 Core 模块的部分实现原理,其中如有错误,请指正.为了简化论述,将部分细节放到了源码中作为注释,因此正文中是主要内容. 第一部分内容见: S ...
- Java_io体系之BufferedWriter、BufferedReader简介、走进源码及示例——16
Java_io体系之BufferedWriter.BufferedReader简介.走进源码及示例--16 一:BufferedWriter 1.类功能简介: BufferedWriter.缓存字符输 ...
- Java_io体系之RandomAccessFile简介、走进源码及示例——20
Java_io体系之RandomAccessFile简介.走进源码及示例--20 RandomAccessFile 1. 类功能简介: 文件随机访问流.关心几个特点: 1.他实现的接口不再 ...
- LambdaMART简介——基于Ranklib源码(一 lambda计算)
LambdaMART简介--基于Ranklib源码(一 lambda计算) 时间:2014-08-09 21:01:49 阅读:168 评论:0 收藏:0 ...
- LambdaMART简介——基于Ranklib源码(二 Regression Tree训练)
LambdaMART简介--基于Ranklib源码(二 Regression Tree训练) 上一节中介绍了 λ λ 的计算,lambdaMART就以计算的每个doc的 λ λ 值作为label ...
最新文章
- MongoDB极简教程
- http://m.blog.csdn.net/article/details?id=2630620
- editplus 批量删除 重复行
- yolov5损失函数笔记
- 机器学习——支持向量机SVM之多分类问题
- Apache Ignite本机持久性,简要概述
- 玩! 框架:为什么我会爱上它
- 防火墙问题 Linux系统 /etc/sysconfig/路径下无iptables文件
- 带你全面了解真正的CleanMyMac,CleanMyMac使用说明
- XML文档的使用方法
- 程序员们,你知道面试官是如何考察你的软素质吗?
- 软件工程-----个人总结
- messagedigest 图片加密_MessageDigest对密码进行加密
- 复旦大学邱锡鹏教授带你梳理深度学习知识脉络(直播彩蛋)
- 【MySQL学习笔记(十六)】之redo日志超详细讲解
- mysql informix_INFORMIX数据库函数
- ie窗口如何最大化设置
- 入门激光雷达点云的3D目标检测
- EPOCH, BATCH, INTERATION
- js实现页面定时跳转