前言

MQTT定义了物联网传输协议,其标准倾向于原始TCP实现。构建于TCP的上层协议堆栈,诸如HTTP等,在空间上多了一些处理路径,稍微耗费了CPU和内存,虽看似微乎其微,但对很多处理能力不足的嵌入式设备而言,选择原始的TCP却是最好的选择。

但单纯TCP不是所有物件联网的最佳选择,提供构建与TCP基础之上的传统的HTTP通信支持,尤其是浏览器、性能富裕的桌面涉及领域,还是企业最 可信赖、最可控的传输方式之一。支持多种多样的连接通道,让目前所有一切皆可联网,除了原始TCP Socket,还要支持构建于其之上的HTTP、HTML5 Websocket,就很有必要。

mqtt.io,Pub/Sub中间件,也可以称之为推送服务器,涵盖所有主流桌面系统、浏览器平台,并且倾斜 于移动互联网,以及物联网的广阔适应天地。使用一句英文概括可能更为合适:"Make everything connect”,让所有物件都可连接。其业务目标,可用下图概括:

mqtt.io致力于做下一代支持所有主流桌面平台、所有主流浏览器、所有可联网物件都可以联网的PUB/SUB消息推送系统。

构建此系统,在于降低传统企业各自分散的推送系统,统一运营,统一管理,节省人员、运维开支。

注意事项

  1. mqtt.io是一个项目名称,没有官网,http://www.mqtt.io,和这个项目没有一毛钱关系。
  2. 项目地址:https://github.com/yongboy/mqtt.io,
  3. 项目名称启发于 http://socket.io http://netty.io 等知名framework。
  4. 目前只实现QoS 0基本特性,实现概览,后期会根据反馈,做出一些调整

依赖

  1. netty 4,目前JAVA IO界明星
  2. mqtt-library 二进制和MQTT对象的转换,这种苦活累活都是它来做,真心让人喜欢。

数据流转

解码器

用于转换二进制流到JAVA对象的过程:

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051
package io.mqtt.handler.coder;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
import java.io.ByteArrayInputStream;
import java.util.List;
import org.meqantt.message.Message;
import org.meqantt.message.MessageInputStream;
public class MqttMessageNewDecoder extends MessageToMessageDecoder<ByteBuf> {
@Override
public void decode(ChannelHandlerContext ctx, ByteBuf buf,
List<Object> out) throws Exception {
if (buf.readableBytes() < 2) {
return;
}
buf.markReaderIndex();
buf.readByte(); // read away header
int msgLength = 0;
int multiplier = 1;
int digit;
int lengthSize = 0;
do {
lengthSize++;
digit = buf.readByte();
msgLength += (digit & 0x7f) * multiplier;
multiplier *= 128;
if ((digit & 0x80) > 0 && !buf.isReadable()) {
buf.resetReaderIndex();
return;
}
} while ((digit & 0x80) > 0);
if (buf.readableBytes() < msgLength) {
buf.resetReaderIndex();
return;
}
byte[] data = new byte[1 + lengthSize + msgLength];
buf.resetReaderIndex();
buf.readBytes(data);
MessageInputStream mis = new MessageInputStream(
new ByteArrayInputStream(data));
Message msg = mis.readMessage();
mis.close();
out.add(msg);
}
}
view rawMqttMessageNewDecoder.java hosted with ❤ by GitHub

编码器

对所有要写入网卡缓冲区的JAVA对象转换成二进制:

12345678910111213141516171819202122232425
package io.mqtt.handler.coder;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder;
import java.util.List;
import org.meqantt.message.Message;
@Sharable
public class MqttMessageNewEncoder extends MessageToMessageEncoder<Object> {
@Override
protected void encode(ChannelHandlerContext ctx, Object msg,
List<Object> out) throws Exception {
if (!(msg instanceof Message)) {
return;
}
byte[] data = ((Message) msg).toBytes();
out.add(Unpooled.wrappedBuffer(data));
}
}
view rawMqttMessageNewEncoder.java hosted with ❤ by GitHub

借助于mqtt-library项目,编解码不复杂。

MQTT的消息处理

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
package io.mqtt.handler;
import io.mqtt.processer.ConnectProcesser;
import io.mqtt.processer.DisConnectProcesser;
import io.mqtt.processer.PingReqProcesser;
import io.mqtt.processer.Processer;
import io.mqtt.processer.PublishProcesser;
import io.mqtt.processer.SubscribeProcesser;
import io.mqtt.processer.UnsubscribeProcesser;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.ReadTimeoutException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.meqantt.message.ConnAckMessage;
import org.meqantt.message.ConnAckMessage.ConnectionStatus;
import org.meqantt.message.DisconnectMessage;
import org.meqantt.message.Message;
import org.meqantt.message.Message.Type;
import org.meqantt.message.PingRespMessage;
public class MqttMessageHandler extends ChannelInboundHandlerAdapter {
private static PingRespMessage PINGRESP = new PingRespMessage();
private static final Map<Message.Type, Processer> processers;
static {
Map<Message.Type, Processer> map = new HashMap<Message.Type, Processer>(
6);
map.put(Type.CONNECT, new ConnectProcesser());
map.put(Type.PUBLISH, new PublishProcesser());
map.put(Type.SUBSCRIBE, new SubscribeProcesser());
map.put(Type.UNSUBSCRIBE, new UnsubscribeProcesser());
map.put(Type.PINGREQ, new PingReqProcesser());
map.put(Type.DISCONNECT, new DisConnectProcesser());
processers = Collections.unmodifiableMap(map);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable e)
throws Exception {
try {
if (e.getCause() instanceof ReadTimeoutException) {
ctx.write(PINGRESP).addListener(
ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
ctx.channel().close();
}
} catch (Throwable t) {
t.printStackTrace();
ctx.channel().close();
}
e.printStackTrace();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object obj)
throws Exception {
Message msg = (Message) obj;
Processer p = processers.get(msg.getType());
if (p == null) {
return;
}
Message rmsg = p.proc(msg, ctx);
if (rmsg == null) {
return;
}
if (rmsg instanceof ConnAckMessage
&& ((ConnAckMessage) rmsg).getStatus() != ConnectionStatus.ACCEPTED) {
ctx.write(rmsg).addListener(ChannelFutureListener.CLOSE);
} else if (rmsg instanceof DisconnectMessage) {
ctx.write(rmsg).addListener(ChannelFutureListener.CLOSE);
} else {
ctx.write(rmsg).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
}
view rawMqttMessageHandler.java hosted with ❤ by GitHub

更具体的可以查看项目。

小结

简单介绍了一个简单的不能再简单的MQTT Server,只具有最基本的QoS 0类型的消息订阅等。

后面,对HTML 5 Websocket,会在现有基础代码之上,不做多大改动,增加对MQTT Over WebSocket的支持。

MQTT协议笔记之mqtt.io项目TCP协议支持相关推荐

  1. Mqtt开发笔记:Mqtt服务器搭建

    若该文为原创文章,未经允许不得转载 原博主博客地址:https://blog.csdn.net/qq21497936 原博主博客导航:https://blog.csdn.net/qq21497936/ ...

  2. MQTT学习笔记——Yeelink MQTT服务 使用mqtt.js和paho-mqtt

    0 前言 2014年8月yeelink推出基于MQTT协议的开关类型设备控制API,相比于基于HTTP RESTful的轮训方式,通过订阅相关主题消息,可以远程控制类应用实时性更好.本文使用两种方式实 ...

  3. TCP/IP详解阅读笔记(一):TCP协议

    作者: remcarpediem 联系方式:segmentfault,csdn,简书 本文转载请注明作者.文章来源,链接,版权归作者所有.  前段时间提交了本科毕业论文,这段时间特别空闲,于是希望研究 ...

  4. lwIP TCP/IP 协议栈笔记之十五: TCP协议

    目录 1. TCP 服务简介 2. TCP 的特性 2.1 连接机制 2.2 确认与重传 2.3 缓冲机制 2.4 全双工通信 2.5 流量控制 2.6 差错控制 2.7 拥塞控制 3. 端口号的概念 ...

  5. HTTP笔记1:网络模型与TCP协议

    1 URI 和 URL Uniform Resource Identifier (URI) URI:统一资源标识符,用于标识互联网上某一个资源 URL:表示资源的地点(互联网上所处的位置). 可见 U ...

  6. 网络七层模型都是哪七层,HTTP协议是在哪一层,Tcp协议在哪一层

    网络OSI七层模型: 物理层.数据链路层.网络层.传输层.会话层.表示层.应用层 其中,HTTP协议处于应用层,TCP协议处于传输层

  7. 【学习笔记】传输层:TCP协议(报文段、连接管理{握手}、可靠传输、流量控制、拥塞控制)

    文章目录 一. 协议特点 & 报文段 ① 特点 ② 报文段首部格式 二. TCP连接管理 ① 建立联系(三次握手) SYN洪泛攻击 ② 连接释放(四次挥手) 三. TCP流量控制 ① 序号 ② ...

  8. 对接物联网设备tcp协议_什么是物联网?常见IoT协议最全讲解

    本文介绍物联网基础知识:什么是物联网,以及常见的物联网协议. 一.什么是物联网? 物联网(Internet of Things)这个概念读者应该不会陌生.物联网的概念最早于1999年被提出来,曾被称为 ...

  9. 自定义Udp/Tcp协议,通信协议Socket/WebSocket,IM粘包、分包解决等(2),ProtocolBuffer

    > 自定义Udp/Tcp协议/通信协议(Java/C):自定义构建和解析IM协议消息:IM自定义UDP通信协议   类似于网络通信中的TCPIP协议一般,比较可靠的通信协议往往包含有以下几个组成 ...

最新文章

  1. 面试题----中断的一些知识
  2. 美国爱因斯坦计划技术分析
  3. 【07月01日】A股滚动市净率PB历史新低排名
  4. android luajava,android嵌入lua
  5. 脑电分析系列[MNE-Python-9]| 参考电极应用
  6. VMware 无法打开内核设备 \\.\Global\vmx86
  7. 创建启动oracle快捷方式,GNOME3创建连接OracleFS管理软件启动快捷方式
  8. Qt笔记-解决QSocketNotifier: Multiple socket notifiers for same socket xxx and type Read问题
  9. Python在应用层实现UDP协议的可靠传输
  10. python模拟seo_Python模拟鼠标点击实现方法(将通过实例自动化模拟在360浏览器中自动搜索python)_天津SEO...
  11. 10 年前的我 VS 10 年后的我
  12. 基于vue2.0与追书神器api的小说阅读webapp
  13. [转] 电子技术*笔记4【2013-03】
  14. 如何购买阿里云域名教程(域名购买+配置)~
  15. yyds,35岁自学编程,入职微软
  16. html特殊符号小企鹅,企鹅侦探名字如何取特殊 名字可以使用的特殊符号
  17. 新华社和中移动联手打造的搜索引擎盘古搜索开通
  18. swift 使用Moya进行网络请求
  19. 爱奇艺APP的自动化录制回放系统 全云化处理新体验
  20. android 盒子刷机,一加5刷机盒子

热门文章

  1. 【Flashback】Flashback Database闪回数据库功能实践
  2. 使用System Center Operations Manager监视Exchange 2007客户端连通性(二)
  3. 短url服务java_Serverless-实现一个短网址服务(二)
  4. MySQL流程控制函数-if函数
  5. MySQL高级 - 锁 - 锁的概述及分类
  6. RabbitMQ消息应答
  7. 为什么要使用spring IOC
  8. 消息发送到消息接收的整体流程
  9. 分布式事务中的Base理论
  10. 分布式文件系统研究-测试-搭建测试环境