用Netty解析Redis网络协议

根据Redis官方文档的介绍,学习了一下Redis网络通信协议。然后偶然在GitHub上发现了个用Netty实现的Redis服务器,很有趣,于是就动手实现了一下!

1.RESP协议

Redis的客户端与服务端采用一种叫做 RESP(REdis Serialization Protocol)的网络通信协议交换数据。RESP的设计权衡了实现简单、解析快速、人类可读这三个因素。Redis客户端通过RESP序列化整数、字符串、数据等数据类型,发送字符串数组表示参数的命令到服务端。服务端根据不同的请求命令响应不同的数据类型。除了管道和订阅外,Redis客户端和服务端都是以这种简单的请求-响应模型通信的。

具体来看,RESP支持五种数据类型。以”*”消息头标识总长度,消息内部还可能有”$”标识字符串长度,每行以\r\n结束

  • 简单字符串(Simple String):以”+”开头,表示正确的状态信息,”+”后就是具体信息。许多Redis命令使用简单字符串作为成功的响应,例如”+OK\r\n”。但简单字符串因为不像Bulk String那样有长度信息,而只能靠\r\n确定是否结束,所以 Simple String不是二进制安全的,即字符串里不能包含\r\n。
  • 错误(Error):以”-“开头,表示错误的状态信息,”-“后就是具体信息。
  • 整数(Integer):以”:”开头,像SETNX, DEL, EXISTS, INCR, INCRBY, DECR, DECRBY, DBSIZE, LASTSAVE, RENAMENX, MOVE, LLEN, SADD, SREM, SISMEMBER, SCARD都返回整数。
  • 批量字符串(Bulk String):以”$”开头,表示下一行的字符串长度,具体字符串在下一行中,字符串最大能达到512MB。”$-1\r\n”叫做Null Bulk String,表示没有数据存在。
  • 数组(Array):以”*”开头,表示消息体总共有多少行(不包括当前行),”*”是具体行数。客户端用RESP数组表示命令发送到服务端,反过来服务端也可以用RESP数组返回数据的集合给客户端。数组可以是混合数据类型,例如一个整数加一个字符串”*2\r\n:1\r\n$6\r\nfoobar\r\n”。另外,嵌套数组也是可以的。

例如,观察下面命令对应的RESP,这一组set/get也正是我们要在Netty里实现的:

set name helloworld
->
*3\r\n
$3\r\n
set\r\n
$4\r\n
name\r\n
$10\r\n
helloworld\r\n
<-
:1\r\nget name
->
*2\r\n
$3\r\n
get\r\n
$4\r\n
name\r\n
<-
$10\r\n
helloworld\r\nset name abc111
->
*3\r\n
$3\r\n
set\r\n
$4\r\n
name\r\n
$6\r\n
abc111\r\n
<-
:0\r\nget age
->
*2\r\n
$3\r\n
get\r\n
$3\r\n
age\r\n
<-
:-1\r\n

2.用Netty解析协议

下面就用高性能的网络通信框架Netty实现一个简单的Redis服务器后端,解析set和get命令,并保存键值对。

2.1 Netty版本

Netty版本,5.0还处于alpha,使用Final版里最新的。但即便是4.0.25.Final竟然也跟4.0的前几个版本有些不同,网上一些例子中用的API根本就找不到了。Netty的API改得有点太“任性”了吧?:)

        <dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.0.25.Final</version></dependency>

2.2 启动服务

Netty服务器启动代码,这套代码应该是Netty 4里的标准模板了,具体细节就不在本文赘述了。主要关注我们注册的几个Handler。Netty中Handler分为Inbound和Outbound,RedisCommandDecoder和RedisCommandHandler是Inbound,RedisCommandDecoder是Outbound:

  • RedisCommandDecoder:解析Redis协议,将字节数组转为Command对象。
  • RedisReplyEncoder:将响应写入到输出流中,返回给客户端。
  • RedisCommandHandler:执行Command中的命令。
public class Main {public static void main(String[] args) throws Exception {new Main().start(6379);}public void start(int port) throws Exception {EventLoopGroup group = new NioEventLoopGroup();try {ServerBootstrap b = new ServerBootstrap().group(group).channel(NioServerSocketChannel.class).localAddress(port).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new RedisCommandDecoder()).addLast(new RedisReplyEncoder()).addLast(new RedisCommandHandler());}});// Bind and start to accept incoming connections.ChannelFuture f = b.bind(port).sync();// Wait until the server socket is closed.f.channel().closeFuture().sync();} finally {// Shutdown the EventLoopGroup, which releases all resources.group.shutdownGracefully();}}}

2.3 协议解析

RedisCommandDecoder开始时cmds是null,进入doDecodeNumOfArgs先解析出命令和参数的个数,并初始化cmds。之后就会进入doDecodeArgs逐一解析命令名和参数了。当最后完成时,会根据解析结果创建出RedisCommand对象,并加入到out列表里。这样下一个handler就能继续处理了。

public class RedisCommandDecoder extends ReplayingDecoder<Void> {/** Decoded command and arguments */private byte[][] cmds;/** Current argument */private int arg;/** Decode in block-io style, rather than nio. */@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {if (cmds == null) {if (in.readByte() == '*') {doDecodeNumOfArgs(in);}} else {doDecodeArgs(in);}if (isComplete()) {doSendCmdToHandler(out);doCleanUp();}}/** Decode number of arguments */private void doDecodeNumOfArgs(ByteBuf in) {// Ignore negative caseint numOfArgs = readInt(in);System.out.println("RedisCommandDecoder NumOfArgs: " + numOfArgs);cmds = new byte[numOfArgs][];checkpoint();}/** Decode arguments */private void doDecodeArgs(ByteBuf in) {for (int i = arg; i < cmds.length; i++) {if (in.readByte() == '$') {int lenOfBulkStr = readInt(in);System.out.println("RedisCommandDecoder LenOfBulkStr[" + i + "]: " + lenOfBulkStr);cmds[i] = new byte[lenOfBulkStr];in.readBytes(cmds[i]);// Skip CRLF(\r\n)in.skipBytes(2);arg++;checkpoint();} else {throw new IllegalStateException("Invalid argument");}}}/*** cmds != null means header decode complete* arg > 0 means arguments decode has begun* arg == cmds.length means complete!*/private boolean isComplete() {return (cmds != null)&& (arg > 0)&& (arg == cmds.length);}/** Send decoded command to next handler */private void doSendCmdToHandler(List<Object> out) {System.out.println("RedisCommandDecoder: Send command to next handler");if (cmds.length == 2) {out.add(new RedisCommand(new String(cmds[0]), cmds[1]));} else if (cmds.length == 3) {out.add(new RedisCommand(new String(cmds[0]), cmds[1], cmds[2]));} else {throw new IllegalStateException("Unknown command");}}/** Clean up state info */private void doCleanUp() {this.cmds = null;this.arg = 0;}private int readInt(ByteBuf in) {int integer = 0;char c;while ((c = (char) in.readByte()) != '\r') {integer = (integer * 10) + (c - '0');}if (in.readByte() != '\n') {throw new IllegalStateException("Invalid number");}return integer;}}

因为我们只是简单实现set和get命令,所以只可能有一个参数或两个参数:

public class RedisCommand {/** Command name */private final String name;/** Optional arguments */private byte[] arg1;private byte[] arg2;public RedisCommand(String name, byte[] arg1) {this.name = name;this.arg1 = arg1;}public RedisCommand(String name, byte[] arg1, byte[] arg2) {this.name = name;this.arg1 = arg1;this.arg2 = arg2;}public String getName() {return name;}public byte[] getArg1() {return arg1;}public byte[] getArg2() {return arg2;}@Overridepublic String toString() {return "Command{" +"name='" + name + '\'' +", arg1=" + Arrays.toString(arg1) +", arg2=" + Arrays.toString(arg2) +'}';}
}

2.4 命令执行

RedisCommandHandler拿到RedisCommand后,根据命令名执行命令。这里用一个HashMap模拟数据库了,set就往Map里放,get就从里面取。除了执行具体操作,还要根据执行结果返回不同的Reply对象:

  • 保存成功:返回:1\r\n。
  • 修改成功:返回:0\r\n。说明之前Map中已存在此Key。
  • 查询成功:返回Bulk String。具体见后面BulkReply。
  • Key不存在:返回:-1\r\n。
@ChannelHandler.Sharable
public class RedisCommandHandler extends SimpleChannelInboundHandler<RedisCommand> {private HashMap<String, byte[]> database = new HashMap<String, byte[]>();@Overrideprotected void channelRead0(ChannelHandlerContext ctx, RedisCommand msg) throws Exception {System.out.println("RedisCommandHandler: " + msg);if (msg.getName().equalsIgnoreCase("set")) {if (database.put(new String(msg.getArg1()), msg.getArg2()) == null) {ctx.writeAndFlush(new IntegerReply(1));} else {ctx.writeAndFlush(new IntegerReply(0));}}else if (msg.getName().equalsIgnoreCase("get")) {byte[] value = database.get(new String(msg.getArg1()));if (value != null && value.length > 0) {ctx.writeAndFlush(new BulkReply(value));} else {ctx.writeAndFlush(BulkReply.NIL_REPLY);}}}}

2.5 发送响应

RedisReplyEncoder实现比较简单,拿到RedisReply消息后,直接写入到ByteBuf中就可以了。具体的写入方法都在各个RedisReply的具体实现中。

public class RedisReplyEncoder extends MessageToByteEncoder<RedisReply> {@Overrideprotected void encode(ChannelHandlerContext ctx, RedisReply msg, ByteBuf out) throws Exception {System.out.println("RedisReplyEncoder: " + msg);msg.write(out);}}
public interface RedisReply<T> {byte[] CRLF = new byte[] { '\r', '\n' };T data();void write(ByteBuf out) throws IOException;}public class IntegerReply implements RedisReply<Integer> {private static final char MARKER = ':';private final int data;public IntegerReply(int data) {this.data = data;}@Overridepublic Integer data() {return this.data;}@Overridepublic void write(ByteBuf out) throws IOException {out.writeByte(MARKER);out.writeBytes(String.valueOf(data).getBytes());out.writeBytes(CRLF);}@Overridepublic String toString() {return "IntegerReply{" +"data=" + data +'}';}}public class BulkReply implements RedisReply<byte[]> {public static final BulkReply NIL_REPLY = new BulkReply();private static final char MARKER = '$';private final byte[] data;private final int len;public BulkReply() {this.data = null;this.len = -1;}public BulkReply(byte[] data) {this.data = data;this.len = data.length;}@Overridepublic byte[] data() {return this.data;}@Overridepublic void write(ByteBuf out) throws IOException {// 1.Write headerout.writeByte(MARKER);out.writeBytes(String.valueOf(len).getBytes());out.writeBytes(CRLF);// 2.Write dataif (len > 0) {out.writeBytes(data);out.writeBytes(CRLF);}}@Overridepublic String toString() {return "BulkReply{" +"bytes=" + Arrays.toString(data) +'}';}
}

2.6 运行测试

服务端跑起来后,用官方的redis-cli就能连上我们的服务,执行一些命令测试一下。看到自己实现的Redis“伪服务端”能够“骗过”redis-cli,还是很有成就感的!

127.0.0.1:6379> set name helloworld
(integer) 1
127.0.0.1:6379> get name
"helloworld"
127.0.0.1:6379> set name abc123
(integer) 0
127.0.0.1:6379> get name
"abc123"
127.0.0.1:6379> get age
(nil)

3.Netty 4中的那些“坑”

因为是初次使用Netty 4,好多网上的资料都是Netty 3或者Netty 4早期版本的,API都不一样了,所以碰到了不少问题,官方文档里也没找到答案,一点点调试、猜测、看源码才摸出点儿“门道”:

  • Handler的基础类:Netty 4里使用SimpleChannelInboundHandler就可以了,之前的API已经不适用了。
  • Inbound和Outbound处理器间的数据交换:Context对象是数据交换的接口,不同的是:Inbound之间是靠fireChannelRead()进行数据交换,但从Inbound到Outbound就要靠writeAndFlush()触发了。
  • Inbound和Outbound的顺序:fireChannelRead()会向后找下一个Inbound处理器,但writeAndFlush()会向前找前一个Outbound处理器。所以在ChannelInitializer中,Outbound要放在SimpleChannelInboundHandler前面才能进行数据交换。
  • @Sharable注解:如果Handler是无状态的话,可以标这个注解。

转载于:https://www.cnblogs.com/xiaomaohai/p/6157613.html

用Netty解析Redis网络协议相关推荐

  1. Java 面试知识点解析(五)——网络协议篇

    前言: 在遨游了一番 Java Web 的世界之后,发现了自己的一些缺失,所以就着一篇深度好文:知名互联网公司校招 Java 开发岗面试知识点解析 ,来好好的对 Java 知识点进行复习和学习一番,大 ...

  2. 网络协议和Netty——第一章 网络协议笔记

    一.计算机网络体系结构 1.OSI七层模型 开放系统互连参考模型 (Open System Interconnect 简称OSI)是国际标准化组织(ISO)和国际电报电话咨询委员会(CCITT)联合制 ...

  3. 物联网架构成长之路(35)-利用Netty解析物联网自定义协议

    一.前言 前面博客大部分介绍了基于EMQ中间件,通信协议使用的是MQTT,而传输的数据为纯文本数据,采用JSON格式.这种方式,大部分一看就知道是熟悉Web开发.软件开发的人喜欢用的方式.由于我也是做 ...

  4. 现场解析服务化 即时通讯方案丨网络协议,应用层协议的选择

    90分钟解析服务化 即时通讯方案,(不要错过) 1. 网络协议选择 udp/tcp 2. 应用层协议选择 protobuf/xmpp/mqtt 3. 数据库表的设计 视频讲解如下,点击观看: [Lin ...

  5. 腐蚀rust服务器命令_【使用 Rust 写 Parser】2. 解析Redis协议

    系列所有文章 https://zhuanlan.zhihu.com/p/115017849​zhuanlan.zhihu.com https://zhuanlan.zhihu.com/p/139387 ...

  6. Java 面试知识点解析——网络协议篇

    Java 面试知识点解析--网络协议篇 前言: 在遨游了一番 Java Web 的世界之后,发现了自己的一些缺失,所以就着一篇深度好文:知名互联网公司校招 Java 开发岗面试知识点解析 ,来好好的对 ...

  7. 网络协议:透彻解析HTTPS协议

    本篇内容包括:HTTPS 协议概述,SSL 与 TLS(SSL 与 TLS 概述.SSL证书 和 SSL 与 TLS 二者之间关系),以及 HTTPS 下浏览器访问一个网站的全过程的内容! 文章目录 ...

  8. 新版思科CCNA认证1.0 零基础入门技术VTP协议解析-ielab网络实验室

     新版思科CCNA认证1.0 零基础入门技术VTP协议解析-ielab网络实验室 VTP(VLAN Trunking Protocol):VLAN中继协议,是Cisco专用协议.也被称为虚拟局域网干道 ...

  9. 网络协议OSI、TCP/IP协议、Socket套接字和第三方AsyncSock的使用等解析

    一.网络协议定义 1.OSI参考模型:全称(Open System Interconnection), 开放式系统互联参考模型.是一个逻辑上的定义,一个规范,它把网络协议从逻辑上分为七层,只要目的是为 ...

最新文章

  1. 正则表达式学习实例1
  2. LL1分析构造法_数学建模算法--最优赋权法(含代码)
  3. android wear 2.0 moto360 二代,最好看的 Android Wear, 二代 Moto 360 可能就是这样了
  4. go mod引用git仓库中的包:拉取存放在gitee中的package
  5. 光感是什么_Olay5款热门精华:淡斑小白瓶VS光感小白瓶如何选?超A瓶不太A
  6. ITK:创建一个后向差分运算符
  7. day16前端(Dom+Jquery)
  8. linux下安装 配置 redis数据库
  9. Nginx的官方简介
  10. 上传图片-服务端-创建文件系统服务工程
  11. iOS数据库操作(使用FMDB)
  12. 16.定位模板,布局和样式
  13. switchhosts使用
  14. 银河帝国----基地与地球
  15. 黑镜狗再现!波士顿动力「大黄狗」上岗SpaceX,勘察火箭爆炸现场
  16. 计算机方向论文选题,初中计算机方向论文选题 初中计算机论文题目如何取
  17. Android开发工程师文集-layout_weight讲解
  18. shell编程实例练习
  19. html+css实战174-SEO
  20. 【深入kotlin】 - 与Java互操作:java调用kotlin

热门文章

  1. 北京国家开放大学计算机学院,李继先
  2. php动态增加div,JavaScript动态创建div等元素实例
  3. java中的四个指令_JAVA命令学习系列(四) ---- jstat
  4. 修改yarn的默认安装和缓存位置
  5. jQuery Ajax请求成功后,为什么一直在error函数里
  6. 【考研】考研5大分数线——国家线、院校线、自划线、单科线、录取线的区别
  7. 【编辑器】VSCode的Web前端(html,css,JavaScript)开发环境打造
  8. oracle 权限控制表,Oracle 用户权限管理与常用权限数据字典列表
  9. 为什么普通红包自己不能领_腾讯为推广新游王牌战士而豪撒千金?快去看看你能不能领红包...
  10. Python入门--函数的参数总结