Netty实例几则

Netty是基于JDK NIO的网络框架

简化了NIO编程, 不用程序自己维护selector, 将网络通信和数据处理的部分做了分离

多用于做底层的数据通信, 心跳检测(keepalived)

1. 数据通信

1.1 Hello World

public class Server {public static void main(String[] args) throws Exception {// 1 创建线两个事件循环组// 一个是用于处理服务器端接收客户端连接的// 一个是进行网络通信的(网络读写的)EventLoopGroup pGroup = new NioEventLoopGroup();EventLoopGroup cGroup = new NioEventLoopGroup();// 2 创建辅助工具类ServerBootstrap,用于服务器通道的一系列配置ServerBootstrap b = new ServerBootstrap();b.group(pGroup, cGroup) // 绑定俩个线程组.channel(NioServerSocketChannel.class) // 指定NIO的模式.NioServerSocketChannel对应TCP, NioDatagramChannel对应UDP.option(ChannelOption.SO_BACKLOG, 1024) // 设置TCP缓冲区.option(ChannelOption.SO_SNDBUF, 32 * 1024) // 设置发送缓冲大小.option(ChannelOption.SO_RCVBUF, 32 * 1024) // 这是接收缓冲大小.option(ChannelOption.SO_KEEPALIVE, true) // 保持连接.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel sc) throws Exception {  //SocketChannel建立连接后的管道// 3 在这里配置 通信数据的处理逻辑, 可以addLast多个...sc.pipeline().addLast(new ServerHandler());}});// 4 绑定端口, bind返回future(异步), 加上sync阻塞在获取连接处ChannelFuture cf1 = b.bind(8765).sync();//ChannelFuture cf2 = b.bind(8764).sync();   //可以绑定多个端口// 5 等待关闭, 加上sync阻塞在关闭请求处
        cf1.channel().closeFuture().sync();//cf2.channel().closeFuture().sync();
        pGroup.shutdownGracefully();cGroup.shutdownGracefully();}
}

SO_BACKLOG详解:
服务器的TCP内核维护两个队列A和B
客户端向服务端请求connect时, 发送SYN(第一次握手)
服务端收到SYN后, 向客户端发送SYN ACK(第二次握手),  TCP内核将连接放入队列A
客户端收到后向服务端发送ACK(第三次握手),  TCP内核将连接从A->B, accept返回, 连接完成
A/B队列的长度和即为BACKLOG, 当accept速度跟不上, A/B队列使得BACKLOG满了, 客户端连接就会被TCP内核拒绝
可以调大backlog缓解这一现象, 经验值~100

public class ServerHandler extends ChannelHandlerAdapter {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println("server channel active... ");}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf = (ByteBuf) msg;byte[] req = new byte[buf.readableBytes()];buf.readBytes(req);String body = new String(req, "utf-8");System.out.println("Server :" + body );String response = "返回给客户端的响应:" + body ;ctx.writeAndFlush(Unpooled.copiedBuffer(response.getBytes()));// future完成后触发监听器, 此处是写完即关闭(短连接). 因此需要关闭连接时, 要通过server端关闭. 直接关闭用方法ctx[.channel()].close()//.addListener(ChannelFutureListener.CLOSE);
    }@Overridepublic void channelReadComplete(ChannelHandlerContext ctx)throws Exception {System.out.println("读完了");ctx.flush();}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable t)throws Exception {ctx.close();}
}

public class Client {public static void main(String[] args) throws Exception {EventLoopGroup group = new NioEventLoopGroup();Bootstrap b = new Bootstrap();b.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel sc) throws Exception { sc.pipeline().addLast(new ClientHandler());}});ChannelFuture cf1 = b.connect("127.0.0.1", 8765).sync();//ChannelFuture cf2 = b.connect("127.0.0.1", 8764).sync();  //可以使用多个端口//发送消息, Buffer类型. write需要flush才发送, 可用writeFlush代替cf1.channel().writeAndFlush(Unpooled.copiedBuffer("777".getBytes()));cf1.channel().writeAndFlush(Unpooled.copiedBuffer("666".getBytes()));Thread.sleep(2000);cf1.channel().writeAndFlush(Unpooled.copiedBuffer("888".getBytes()));//cf2.channel().writeAndFlush(Unpooled.copiedBuffer("999".getBytes()));
        cf1.channel().closeFuture().sync();//cf2.channel().closeFuture().sync();
        group.shutdownGracefully();}
}

public class ClientHandler extends ChannelHandlerAdapter{@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {try {ByteBuf buf = (ByteBuf) msg;byte[] req = new byte[buf.readableBytes()];buf.readBytes(req);String body = new String(req, "utf-8");System.out.println("Client :" + body );} finally {// 记得释放xxxHandler里面的方法的msg参数: 写(write)数据, msg引用将被自动释放不用手动处理; 但只读数据时,!必须手动释放引用数
             ReferenceCountUtil.release(msg);}}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)throws Exception {ctx.close();}
}

1.2 拆包粘包问题

TCP/IP确保了包的传送, 包的顺序等, 但编程中还需要解决拆包粘包问题

-> 接收的一连串包中的数据, 处理的分隔在哪里?  基本解决方案:

1)特殊字符作为结束分隔符

2)消息定长. 固定包的长度, 长度不够用空格补全. 接收方需要trim, 效率不高不推荐

3)自定义协议. 在消息头中包含消息总长度的字段. 需要安全性时可以考虑.

特殊字符

public class Server {public static void main(String[] args) throws Exception {EventLoopGroup pGroup = new NioEventLoopGroup();EventLoopGroup cGroup = new NioEventLoopGroup();ServerBootstrap b = new ServerBootstrap();b.group(pGroup, cGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024).option(ChannelOption.SO_SNDBUF, 32*1024).option(ChannelOption.SO_RCVBUF, 32*1024).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel sc) throws Exception {// 使用DelimiterBasedFrameDecoder设置结尾分隔符$_ByteBuf buf = Unpooled.copiedBuffer("$_".getBytes());sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf));// 设置字符串形式的解码.  经过StringDecoder, Handler回调方法中接收的msg的具体类型就是String了(不再是ByteBuffer). 但写时仍需要传入ByteBuffersc.pipeline().addLast(new StringDecoder());// 通信数据的处理逻辑sc.pipeline().addLast(new ServerHandler());}});//4 绑定连接ChannelFuture cf = b.bind(8765).sync();//等待服务器监听端口关闭
        cf.channel().closeFuture().sync();pGroup.shutdownGracefully();cGroup.shutdownGracefully();}
}

public class ServerHandler extends ChannelHandlerAdapter {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println(" server channel active... ");}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println("Server :" + msg);String response = "服务器响应: " + msg + "$_";ctx.writeAndFlush(Unpooled.copiedBuffer(response.getBytes()));}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable t) throws Exception {ctx.close();}
}

public class Client {public static void main(String[] args) throws Exception {EventLoopGroup group = new NioEventLoopGroup();Bootstrap b = new Bootstrap();b.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel sc) throws Exception {ByteBuf buf = Unpooled.copiedBuffer("$_".getBytes());sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf));sc.pipeline().addLast(new StringDecoder()); sc.pipeline().addLast(new ClientHandler());}});ChannelFuture cf = b.connect("127.0.0.1", 8765).sync();cf.channel().writeAndFlush(Unpooled.wrappedBuffer("bbbb$_".getBytes()));cf.channel().writeAndFlush(Unpooled.wrappedBuffer("cccc$_".getBytes()));cf.channel().closeFuture().sync();group.shutdownGracefully();}
}

public class ClientHandler extends ChannelHandlerAdapter {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println("client channel active... ");}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {try {String response = (String) msg;System.out.println("Client: " + response);} finally {ReferenceCountUtil.release(msg);}}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.close();}
}

定长

public class Server {public static void main(String[] args) throws Exception{EventLoopGroup pGroup = new NioEventLoopGroup();EventLoopGroup cGroup = new NioEventLoopGroup();ServerBootstrap b = new ServerBootstrap();b.group(pGroup, cGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024).option(ChannelOption.SO_SNDBUF, 32*1024).option(ChannelOption.SO_RCVBUF, 32*1024).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel sc) throws Exception {//设置定长字符串接收, 定长为5, 积累到5个字节才会把数据发出去sc.pipeline().addLast(new FixedLengthFrameDecoder(5));//设置字符串形式的解码sc.pipeline().addLast(new StringDecoder());sc.pipeline().addLast(new ServerHandler());}});ChannelFuture cf = b.bind(8765).sync();cf.channel().closeFuture().sync();pGroup.shutdownGracefully();cGroup.shutdownGracefully();}
}

public class ServerHandler extends ChannelHandlerAdapter {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println(" server channel active... ");}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {String request = (String)msg;System.out.println("Server :" + msg);String response =  request ;ctx.writeAndFlush(Unpooled.copiedBuffer(response.getBytes()));}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable t) throws Exception {}
}

public class Client {public static void main(String[] args) throws Exception {EventLoopGroup group = new NioEventLoopGroup();Bootstrap b = new Bootstrap();b.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel sc) throws Exception {sc.pipeline().addLast(new FixedLengthFrameDecoder(5)); sc.pipeline().addLast(new StringDecoder());sc.pipeline().addLast(new ClientHandler());}});ChannelFuture cf = b.connect("127.0.0.1", 8765).sync();cf.channel().writeAndFlush(Unpooled.wrappedBuffer("aaa".getBytes()));cf.channel().writeAndFlush(Unpooled.copiedBuffer("bbccccc".getBytes()));cf.channel().closeFuture().sync();group.shutdownGracefully();}
}

public class ClientHandler extends ChannelHandlerAdapter {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println("client channel active... ");}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {String response = (String) msg;System.out.println("Client: " + response);}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {}
}

1.3 编解码

即对象序列化技术, 目的是为了实现对象的网络传输和本地持久化
如果使用java的序列化, 码流较大. 因此多用Marshalling, Kyro(基于Protobuf)

下面的例子, 使用编解码传输javabean(Marshalling的javabean需要实现serializable), 并将message进行gzip压缩

自定义编解码器

public final class MarshallingCodeCFactory {// 解码public static MarshallingDecoder buildMarshallingDecoder() {//创建工厂对象, 参数serial指创建的是java对象序列化的工厂对象final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");//创建配置对象,版本号为5 final MarshallingConfiguration configuration = new MarshallingConfiguration();configuration.setVersion(5);//根据工厂对象和配置对象创建解码providerUnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);//创建解码器对象. 第一个参数是provider, 第二个参数是单个消息序列化后的最大长度, 超过后拒绝处理MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024 * 1024 * 1);return decoder;}// 编码public static MarshallingEncoder buildMarshallingEncoder() {final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");final MarshallingConfiguration configuration = new MarshallingConfiguration();configuration.setVersion(5);MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);//创建编码器对象. 用于将实现Serializable接口的JavaBean序列化为二进制数组MarshallingEncoder encoder = new MarshallingEncoder(provider);return encoder;}
}

javabean

public class Request implements Serializable {  // 标记Serializable接口private String id ;private String name ;private String requestMessage ;private byte[] attachment;public String getId() {return id;}public void setId(String id) {this.id = id;}public String getName() {return name;}public void setName(String name) {this.name = name;}public String getRequestMessage() {return requestMessage;}public void setRequestMessage(String requestMessage) {this.requestMessage = requestMessage;}public byte[] getAttachment() {return attachment;}public void setAttachment(byte[] attachment) {this.attachment = attachment;}
}

public class Response implements Serializable { // 标记Serializable接口private String id;private String name;private String responseMessage;public String getId() {return id;}public void setId(String id) {this.id = id;}public String getName() {return name;}public void setName(String name) {this.name = name;}public String getResponseMessage() {return responseMessage;}public void setResponseMessage(String responseMessage) {this.responseMessage = responseMessage;}
}

GZip压缩的Util

public class GzipUtils {public static byte[] gzip(byte[] data) throws Exception {ByteArrayOutputStream bos = new ByteArrayOutputStream();GZIPOutputStream gzip = new GZIPOutputStream(bos);gzip.write(data);gzip.finish();gzip.close();byte[] ret = bos.toByteArray();bos.close();return ret;}public static byte[] ungzip(byte[] data) throws Exception{ByteArrayInputStream bis = new ByteArrayInputStream(data);GZIPInputStream gzip = new GZIPInputStream(bis);byte[] buf = new byte[1024];int num = -1;ByteArrayOutputStream bos = new ByteArrayOutputStream();while((num = gzip.read(buf)) != -1 ){bos.write(buf, 0, num);}gzip.close();bis.close();byte[] ret = bos.toByteArray();bos.close();return ret;}
}

服务端与客户端

public class Server {public static void main(String[] args) throws Exception{EventLoopGroup pGroup = new NioEventLoopGroup();EventLoopGroup cGroup = new NioEventLoopGroup();ServerBootstrap b = new ServerBootstrap();b.group(pGroup, cGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024)//设置日志.handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializer<SocketChannel>() {protected void initChannel(SocketChannel sc) throws Exception {// 添加编解码. 发送自定义的类型, 而Handler的方法接收的msg参数的实际类型也是相应的自定义类了
                sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());sc.pipeline().addLast(new ServerHandler());}});ChannelFuture cf = b.bind(8765).sync();cf.channel().closeFuture().sync();pGroup.shutdownGracefully();cGroup.shutdownGracefully();}

public class ServerHandler extends ChannelHandlerAdapter{@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {Request req = (Request)msg;System.out.println("Server : " + req.getId() + ", " + req.getName() + ", " + req.getRequestMessage());byte[] attachment = GzipUtils.ungzip(req.getAttachment());String path = System.getProperty("user.dir") + File.separatorChar + "receive" +  File.separatorChar + "001.jpg";FileOutputStream fos = new FileOutputStream(path);fos.write(attachment);fos.close();Response resp = new Response();resp.setId(req.getId());resp.setName("resp" + req.getId());resp.setResponseMessage("响应内容" + req.getId());ctx.writeAndFlush(resp);//.addListener(ChannelFutureListener.CLOSE);
    }@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.close();}
}

public class Client {public static void main(String[] args) throws Exception{EventLoopGroup group = new NioEventLoopGroup();Bootstrap b = new Bootstrap();b.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel sc) throws Exception {sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());sc.pipeline().addLast(new ClientHandler());}});ChannelFuture cf = b.connect("127.0.0.1", 8765).sync();for(int i = 0; i < 5; i++){Request req = new Request();req.setId("" + i);req.setName("req" + i);req.setRequestMessage("数据信息" + i);    String path = System.getProperty("user.dir") + File.separatorChar + "sources" +  File.separatorChar + "001.jpg";File file = new File(path);FileInputStream in = new FileInputStream(file);  byte[] data = new byte[in.available()];  in.read(data);  in.close(); req.setAttachment(GzipUtils.gzip(data)); //压缩
            cf.channel().writeAndFlush(req);}cf.channel().closeFuture().sync();group.shutdownGracefully();}
}

public class ClientHandler extends ChannelHandlerAdapter{@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {try {Response resp = (Response) msg;System.out.println("Client : " + resp.getId() + ", " + resp.getName() + ", " + resp.getResponseMessage());            } finally {ReferenceCountUtil.release(msg);}}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.close();}
}

1.4 长连接/短连接

1.长连接, 一致保持着连接不主动中断, 实时性强
2.短连接. 数据放在缓存, 一次性批量提交所有数据, 服务端接收后即关闭连接
以上两种根据是否给ChannelHandlerContext添加ChannelFutureListener.ClOSE监听器实现

3.长连接, 一定时间不活跃则关闭连接. 给SocketChannel添加ReadTimeoutHandler实现. 实例如下:

public final class MarshallingCodeCFactory {public static MarshallingDecoder buildMarshallingDecoder() {final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");final MarshallingConfiguration configuration = new MarshallingConfiguration();configuration.setVersion(5);UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024);return decoder;}public static MarshallingEncoder buildMarshallingEncoder() {final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");final MarshallingConfiguration configuration = new MarshallingConfiguration();configuration.setVersion(5);MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);MarshallingEncoder encoder = new MarshallingEncoder(provider);return encoder;}
}

public class Request implements Serializable{private String id ;private String name ;private String requestMessage ;public String getId() {return id;}public void setId(String id) {this.id = id;}public String getName() {return name;}public void setName(String name) {this.name = name;}public String getRequestMessage() {return requestMessage;}public void setRequestMessage(String requestMessage) {this.requestMessage = requestMessage;}
}

public class Response implements Serializable{private String id;private String name;private String responseMessage;public String getId() {return id;}public void setId(String id) {this.id = id;}public String getName() {return name;}public void setName(String name) {this.name = name;}public String getResponseMessage() {return responseMessage;}public void setResponseMessage(String responseMessage) {this.responseMessage = responseMessage;}
}

public class Server {public static void main(String[] args) throws Exception{EventLoopGroup pGroup = new NioEventLoopGroup();EventLoopGroup cGroup = new NioEventLoopGroup();ServerBootstrap b = new ServerBootstrap();b.group(pGroup, cGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024)//设置日志.handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializer<SocketChannel>() {protected void initChannel(SocketChannel sc) throws Exception {sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());sc.pipeline().addLast(new ReadTimeoutHandler(5));  // 时限, 读客户端超时没数据则断开sc.pipeline().addLast(new ServerHandler());}});ChannelFuture cf = b.bind(8765).sync();cf.channel().closeFuture().sync();pGroup.shutdownGracefully();cGroup.shutdownGracefully();}
}

public class ServerHandler extends ChannelHandlerAdapter{@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {Request request = (Request) msg;System.out.println("Server : " + request.getId() + ", " + request.getName() + ", " + request.getRequestMessage());Response response = new Response();response.setId(request.getId());response.setName("response" + request.getId());response.setResponseMessage("响应内容" + request.getId());ctx.writeAndFlush(response);}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.close();}
}

public class Client {private EventLoopGroup group;private Bootstrap b;private ChannelFuture cf ;// 单例private static class SingletonHolder { static final Client instance = new Client();}public static Client getInstance(){return SingletonHolder.instance;}private Client(){group = new NioEventLoopGroup();b = new Bootstrap();b.group(group).channel(NioSocketChannel.class).handler(new LoggingHandler(LogLevel.INFO)).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel sc) throws Exception {sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());//超时handler(当服务器端与客户端在指定时间以上没有任何进行通信,则会关闭通道)sc.pipeline().addLast(new ReadTimeoutHandler(5));   // 时限5s, 读服务端超时没数据则断开sc.pipeline().addLast(new ClientHandler());}});}public void connect(){try {this.cf = b.connect("127.0.0.1", 8765).sync();System.out.println("远程服务器已经连接, 可以进行数据交换");                } catch (Exception e) {e.printStackTrace();}}public ChannelFuture getChannelFuture(){if(this.cf == null) {   //初次连接this.connect();}if(!this.cf.channel().isActive()){  //重连this.connect();}return this.cf;}public static void main(String[] args) throws Exception{final Client c = Client.getInstance();ChannelFuture cf = c.getChannelFuture();for(int i = 1; i <= 3; i++ ){Request request = new Request();request.setId("" + i);request.setName("request" + i);request.setRequestMessage("数据信息" + i);cf.channel().writeAndFlush(request);TimeUnit.SECONDS.sleep(4);  //间隔4s发送一次数据
        }cf.channel().closeFuture().sync(); //阻塞至超时关闭// 这里用子线程重连并发送数据一次new Thread(new Runnable() {@Overridepublic void run() {try {System.out.println("进入子线程重连一次");ChannelFuture cf = c.getChannelFuture();assert true == cf.channel().isActive(); //断言//再次发送数据Request request = new Request();request.setId("" + 4);request.setName("request" + 4);request.setRequestMessage("数据信息" + 4);cf.channel().writeAndFlush(request);                    cf.channel().closeFuture().sync();System.out.println("子线程完成");} catch (InterruptedException e) {e.printStackTrace();}}}).start();System.out.println("断开连接,主线程结束..");}}

public class ClientHandler extends ChannelHandlerAdapter{@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {try {Response resp = (Response) msg;System.out.println("Client : " + resp.getId() + ", " + resp.getName() + ", " + resp.getResponseMessage());            } finally {ReferenceCountUtil.release(msg);}}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.close();}
}

1.5 使用UDP (较少使用)

public class Server {public void run(int port) throws Exception {EventLoopGroup group = new NioEventLoopGroup();try {Bootstrap b = new Bootstrap();b.group(group).channel(NioDatagramChannel.class)  // UDP: NioDatagramChannel.option(ChannelOption.SO_BROADCAST, true) // 广播.handler(new ServerHandler());b.bind(port).sync().channel().closeFuture().await();} finally {group.shutdownGracefully();}}public static void main(String[] args) throws Exception {new Server().run(8765);}
}

public class ServerHandler extends SimpleChannelInboundHandler<DatagramPacket> {// 谚语列表private static final String[] DICTIONARY = { "只要功夫深,铁棒磨成针。","旧时王谢堂前燕,飞入寻常百姓家。", "洛阳亲友如相问,一片冰心在玉壶。","一寸光阴一寸金,寸金难买寸光阴。","老骥伏枥,志在千里。烈士暮年,壮心不已!"};private String nextQuote() {int quoteId = ThreadLocalRandom.current().nextInt(DICTIONARY.length);return DICTIONARY[quoteId];}@Overridepublic void messageReceived(ChannelHandlerContext ctx, DatagramPacket packet) throws Exception {String req = packet.content().toString(CharsetUtil.UTF_8);System.out.println(req);if ("谚语字典查询?".equals(req)) {ctx.writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer("谚语查询结果: " + nextQuote(), CharsetUtil.UTF_8), packet.sender()));}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)throws Exception {ctx.close();cause.printStackTrace();}
}

public class Client {public void run(int port) throws Exception {EventLoopGroup group = new NioEventLoopGroup();try {Bootstrap b = new Bootstrap();b.group(group).channel(NioDatagramChannel.class).option(ChannelOption.SO_BROADCAST, true).handler(new ClientHandler());Channel ch = b.bind(0).sync().channel();// 向网段内的所有机器广播UDP消息ch.writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer("谚语字典查询?", CharsetUtil.UTF_8), new InetSocketAddress("255.255.255.255", port))).sync();if (!ch.closeFuture().await(15000)) {System.out.println("查询超时!");}} finally {group.shutdownGracefully();}}public static void main(String[] args) throws Exception {new Client().run(8765);}
}

public class ClientHandler extends SimpleChannelInboundHandler<DatagramPacket> {@Overridepublic void messageReceived(ChannelHandlerContext ctx, DatagramPacket msg) throws Exception {String response = msg.content().toString(CharsetUtil.UTF_8);if (response.startsWith("谚语查询结果: ")) {System.out.println(response);ctx.close();}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();ctx.close();}
}

2. 心跳检测

集群中主服务器需要知道从服务器的状态
因此client每隔5~10秒给server发送心跳包

可通过netty与定时任务来实现

public final class MarshallingCodeCFactory {public static MarshallingDecoder buildMarshallingDecoder() {final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");final MarshallingConfiguration configuration = new MarshallingConfiguration();configuration.setVersion(5);UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024 * 1024 * 1);return decoder;}public static MarshallingEncoder buildMarshallingEncoder() {final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");final MarshallingConfiguration configuration = new MarshallingConfiguration();configuration.setVersion(5);MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);MarshallingEncoder encoder = new MarshallingEncoder(provider);return encoder;}
}

public class RequestInfo implements Serializable {private String ip ;private HashMap<String, Object> cpuPercMap ;private HashMap<String, Object> memoryMap;public String getIp() {return ip;}public void setIp(String ip) {this.ip = ip;}public HashMap<String, Object> getCpuPercMap() {return cpuPercMap;}public void setCpuPercMap(HashMap<String, Object> cpuPercMap) {this.cpuPercMap = cpuPercMap;}public HashMap<String, Object> getMemoryMap() {return memoryMap;}public void setMemoryMap(HashMap<String, Object> memoryMap) {this.memoryMap = memoryMap;}
}

public class Server {public static void main(String[] args) throws Exception{EventLoopGroup pGroup = new NioEventLoopGroup();EventLoopGroup cGroup = new NioEventLoopGroup();ServerBootstrap b = new ServerBootstrap();b.group(pGroup, cGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024)//设置日志.handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializer<SocketChannel>() {protected void initChannel(SocketChannel sc) throws Exception {sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());sc.pipeline().addLast(new ServerHeartBeatHandler());}});ChannelFuture cf = b.bind(8765).sync();cf.channel().closeFuture().sync();pGroup.shutdownGracefully();cGroup.shutdownGracefully();}
}

public class ServerHeartBeatHandler extends ChannelHandlerAdapter {private static HashMap<String, String> AUTH_IP_MAP = new HashMap<String, String>();private static final String SUCCESS_KEY = "auth_success_key";static {AUTH_IP_MAP.put("127.0.0.1", "1234");}private boolean auth(ChannelHandlerContext ctx, Object msg){String [] ret = ((String) msg).split(",");String auth = AUTH_IP_MAP.get(ret[0]);if(auth != null && auth.equals(ret[1])){// 认证成功, 返回确认信息
            ctx.writeAndFlush(SUCCESS_KEY);return true;} else {ctx.writeAndFlush("auth failure !").addListener(ChannelFutureListener.CLOSE);return false;}}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {if(msg instanceof String){auth(ctx, msg);} else if (msg instanceof RequestInfo) {RequestInfo info = (RequestInfo) msg;System.out.println("--------------------------------------------");System.out.println("当前主机ip为: " + info.getIp());System.out.println("当前主机cpu情况: ");HashMap<String, Object> cpu = info.getCpuPercMap();System.out.println("总使用率: " + cpu.get("combined"));System.out.println("用户使用率: " + cpu.get("user"));System.out.println("系统使用率: " + cpu.get("sys"));System.out.println("等待率: " + cpu.get("wait"));System.out.println("空闲率: " + cpu.get("idle"));System.out.println("当前主机memory情况: ");HashMap<String, Object> memory = info.getMemoryMap();System.out.println("内存总量: " + memory.get("total"));System.out.println("当前内存使用量: " + memory.get("used"));System.out.println("当前内存剩余量: " + memory.get("free"));System.out.println("--------------------------------------------");ctx.writeAndFlush("info received!");} else {ctx.writeAndFlush("connect failure!").addListener(ChannelFutureListener.CLOSE);}}
}

public class Client {public static void main(String[] args) throws Exception {EventLoopGroup group = new NioEventLoopGroup();Bootstrap b = new Bootstrap();b.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel sc) throws Exception {sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());sc.pipeline().addLast(new ClienHeartBeatHandler());}});ChannelFuture cf = b.connect("127.0.0.1", 8765).sync();cf.channel().closeFuture().sync();group.shutdownGracefully();}
}

public class ClienHeartBeatHandler extends ChannelHandlerAdapter {private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);private ScheduledFuture<?> heartBeat;  //定时任务//主动向服务器发送认证信息private InetAddress addr ;private static final String SUCCESS_KEY = "auth_success_key";@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {addr = InetAddress.getLocalHost();//String ip = addr.getHostAddress();String ip = "127.0.0.1";String key = "1234";//证书String auth = ip + "," + key;// 发送认证
        ctx.writeAndFlush(auth);}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {try {if(msg instanceof String){String ret = (String) msg;if(SUCCESS_KEY.equals(ret)){// 收到认证 确认信息,设置每隔5秒发送心跳消息this.heartBeat = this.scheduler.scheduleWithFixedDelay(new HeartBeatTask(ctx), 0, 5, TimeUnit.SECONDS);System.out.println(msg);                } else {  // 收到心跳包 确认信息
                    System.out.println(msg);}}} finally {// 只读, 需要手动释放引用计数
            ReferenceCountUtil.release(msg);}}private class HeartBeatTask implements Runnable {private final ChannelHandlerContext ctx;public HeartBeatTask(final ChannelHandlerContext ctx) {this.ctx = ctx;}@Overridepublic void run() {try {RequestInfo info = new RequestInfo();//ip
                info.setIp(addr.getHostAddress());Sigar sigar = new Sigar();//cpu precCpuPerc cpuPerc = sigar.getCpuPerc();HashMap<String, Object> cpuPercMap = new HashMap<String, Object>();cpuPercMap.put("combined", cpuPerc.getCombined());cpuPercMap.put("user", cpuPerc.getUser());cpuPercMap.put("sys", cpuPerc.getSys());cpuPercMap.put("wait", cpuPerc.getWait());cpuPercMap.put("idle", cpuPerc.getIdle());// memoryMem mem = sigar.getMem();HashMap<String, Object> memoryMap = new HashMap<String, Object>();memoryMap.put("total", mem.getTotal() / 1024L);memoryMap.put("used", mem.getUsed() / 1024L);memoryMap.put("free", mem.getFree() / 1024L);info.setCpuPercMap(cpuPercMap);info.setMemoryMap(memoryMap);ctx.writeAndFlush(info);} catch (Exception e) {e.printStackTrace();}}public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();// 取消定时发送心跳包的任务if (heartBeat != null) {heartBeat.cancel(true);heartBeat = null;}ctx.fireExceptionCaught(cause);}}
}

3. HTTP

3.1 Hello World

public final class HttpHelloWorldServer {static final boolean SSL = System.getProperty("ssl") != null;static final int PORT = Integer.parseInt(System.getProperty("port", SSL? "8443" : "8080"));public static void main(String[] args) throws Exception {final SslContext sslCtx;if (SSL) {SelfSignedCertificate ssc = new SelfSignedCertificate();sslCtx = SslContext.newServerContext(ssc.certificate(), ssc.privateKey());} else {sslCtx = null;}EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap b = new ServerBootstrap();b.option(ChannelOption.SO_BACKLOG, 1024);b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).handler(new LoggingHandler(LogLevel.INFO)).childHandler(new HttpHelloWorldServerInitializer(sslCtx));Channel ch = b.bind(PORT).sync().channel();System.err.println("Open your web browser and navigate to " +(SSL? "https" : "http") + "://127.0.0.1:" + PORT + '/');ch.closeFuture().sync();} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}
}

public class HttpHelloWorldServerInitializer extends ChannelInitializer<SocketChannel> {private final SslContext sslCtx;public HttpHelloWorldServerInitializer(SslContext sslCtx) {this.sslCtx = sslCtx;}@Overridepublic void initChannel(SocketChannel ch) {ChannelPipeline p = ch.pipeline();if (sslCtx != null) {p.addLast(sslCtx.newHandler(ch.alloc()));}p.addLast(new HttpServerCodec());   // !使用http通信, HttpRequest和HttpResponsep.addLast(new HttpHelloWorldServerHandler());}
}

public class HttpHelloWorldServerHandler extends ChannelHandlerAdapter {private static final byte[] CONTENT = "HELLO WORLD".getBytes();@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) {ctx.flush();}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {if (msg instanceof HttpRequest) {HttpRequest req = (HttpRequest) msg;if (HttpHeaderUtil.is100ContinueExpected(req)) {ctx.write(new DefaultFullHttpResponse(HTTP_1_1, CONTINUE));}boolean keepAlive = HttpHeaderUtil.isKeepAlive(req);// 构造响应FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, OK, Unpooled.wrappedBuffer(CONTENT));response.headers().set(CONTENT_TYPE, "text/plain;charset=UTF-8");response.headers().setInt(CONTENT_LENGTH, response.content().readableBytes());if (!keepAlive) {// Request短连接, 写完后直接关闭
                ctx.write(response).addListener(ChannelFutureListener.CLOSE);} else {// 长连接, response也设置为KEEP_ALIVE
                response.headers().set(CONNECTION, HttpHeaderValues.KEEP_ALIVE);ctx.write(response);}}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {cause.printStackTrace();ctx.close();}
}

3.2 HTTP下载文件

public class HttpDownloadServer {private static final String DEFAULT_URL = "/sources/";public void run(final int port, final String url) throws Exception {EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {// addLast的第一项为key, 自定义的// request解码器ch.pipeline().addLast("http-decoder", new HttpRequestDecoder());// response的编码器ch.pipeline().addLast("http-encoder", new HttpResponseEncoder());// chunked, 传输文件时分多个response分解地传输文件ch.pipeline().addLast("http-chunked", new ChunkedWriteHandler());// ObjectAggregator, 将多个response合并为一个FullHttpResponsech.pipeline().addLast("http-aggregator", new HttpObjectAggregator(65536));// 自定义业务逻辑handlerch.pipeline().addLast("fileServerHandler", new HttpDownoadServerHandler(url));}});ChannelFuture future = b.bind("127.0.0.1", port).sync();System.out.println("HTTP文件目录服务器启动,网址是 : " + "http://localhost:"  + port + url);future.channel().closeFuture().sync();} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}public static void main(String[] args) throws Exception {int port = 8765;new HttpDownloadServer().run(port, DEFAULT_URL);}
}

// 注意这里继承了SimpleChannelInboundHandler<T>, 含泛型, 即指定了传入参数msg的类型
public class HttpDownoadServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {private final String url;public HttpDownoadServerHandler(String url) {this.url = url;}@Overridepublic void messageReceived(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {//是否能理解(解码)请求if (!request.decoderResult().isSuccess()) {// 400
            sendError(ctx, BAD_REQUEST);return;}//对请求的方法进行判断:如果不是GET方法则返回异常if (request.method() != GET) {// 405
            sendError(ctx, METHOD_NOT_ALLOWED);return;}//获取请求uri路径final String uri = request.uri();//对url进行分析,返回本地路径final String path = parseURI(uri);//如果 路径构造不合法,则path为nullif (path == null) {//403
            sendError(ctx, FORBIDDEN);return;}// 创建file对象File file = new File(path);// 文件隐藏或不存在if (file.isHidden() || !file.exists()) {// 404
            sendError(ctx, NOT_FOUND);return;}// 是文件夹if (file.isDirectory()) {if (uri.endsWith("/")) {//如果以正常"/"结束 说明是访问的一个文件目录:则进行展示文件列表
                sendListing(ctx, file);} else {//如果非"/"结束 则重定向,让客户端补全"/"并再次请求sendRedirect(ctx, uri + '/');}return;}// 如果所创建的file对象不是文件类型if (!file.isFile()) {// 403
            sendError(ctx, FORBIDDEN);return;}//随机文件读写对象RandomAccessFile randomAccessFile = null;try {randomAccessFile = new RandomAccessFile(file, "r");// 以只读的方式打开文件} catch (FileNotFoundException fnfe) {// 404
            sendError(ctx, NOT_FOUND);return;}//获取文件长度long fileLength = randomAccessFile.length();//建立响应对象HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);//设置响应信息
        HttpHeaderUtil.setContentLength(response, fileLength);//设置Content-Type
        setContentTypeHeader(response, file);//设置为KeepAliveif (HttpHeaderUtil.isKeepAlive(request)) {response.headers().set(CONNECTION, HttpHeaderValues.KEEP_ALIVE);}//输出response header, HttpObjectAggregator能将其与下面输出整合合并
        ctx.write(response);//写出ChunkedFile. 创建ChunkedFile需要使用RandomAccessFile并设置分段. 这里每次传输8192个字节ChannelFuture sendFileFuture = ctx.write(new ChunkedFile(randomAccessFile, 0, fileLength, 8192), ctx.newProgressivePromise());//添加传输监听sendFileFuture.addListener(new ChannelProgressiveFutureListener() {@Overridepublic void operationProgressed(ChannelProgressiveFuture future, long progress, long total) {if (total < 0) { System.err.println("Transfer progress: " + progress);} else {System.err.println("Transfer progress: " + progress + " / " + total);}}@Overridepublic void operationComplete(ChannelProgressiveFuture future) throws Exception {System.out.println("Transfer complete.");}});//使用Chunked, 完成时需要发送标记结束的空消息体!ChannelFuture lastContentFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);//如果当前连接请求非Keep-Alive, 最后一包消息发送完后, 服务器主动关闭连接if (!HttpHeaderUtil.isKeepAlive(request)) {lastContentFuture.addListener(ChannelFutureListener.CLOSE);}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {if (ctx.channel().isActive()) {// 500
            sendError(ctx, INTERNAL_SERVER_ERROR);ctx.close();}}//判断非法URI的正则private static final Pattern INSECURE_URI = Pattern.compile(".*[<>&\"].*");private String parseURI(String uri) {try {//使用UTF-8字符集uri = URLDecoder.decode(uri, "UTF-8");} catch (UnsupportedEncodingException e) {try {//尝试ISO-8859-1uri = URLDecoder.decode(uri, "ISO-8859-1");} catch (UnsupportedEncodingException e1) {//抛出预想外异常信息throw new Error();}}// 对uri进行细粒度判断:4步验证操作// step 1 基础验证if (!uri.startsWith(url)) {return null;}// step 2 基础验证if (!uri.startsWith("/")) {return null;}// step 3 将文件分隔符替换为本地操作系统的文件路径分隔符uri = uri.replace('/', File.separatorChar);// step 4 验证路径合法性if (uri.contains(File.separator + '.') || uri.contains('.' + File.separator) || uri.startsWith(".") || uri.endsWith(".") || INSECURE_URI.matcher(uri).matches()) {return null;}//利用当前工程所在目录 + URI相对路径 构造绝对路径 return System.getProperty("user.dir") + File.separator + uri;}//用正则表达式过滤文件名private static final Pattern ALLOWED_FILE_NAME = Pattern.compile("[A-Za-z0-9][-_A-Za-z0-9\\.]*");//文件列表, 拼html文件private static void sendListing(ChannelHandlerContext ctx, File dir) {// 设置响应对象FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, OK);// 响应头response.headers().set(CONTENT_TYPE, "text/html; charset=UTF-8");// 构造文本内容StringBuilder ret = new StringBuilder();String dirPath = dir.getPath();ret.append("<!DOCTYPE html>\r\n");ret.append("<html><head><title>");ret.append(dirPath);ret.append(" 目录:");ret.append("</title></head><body>\r\n");ret.append("<h3>");ret.append(dirPath).append(" 目录:");ret.append("</h3>\r\n");ret.append("<ul>");ret.append("<li>链接:<a href=\"../\">..</a></li>\r\n");// 遍历文件, 生成超链接for (File f : dir.listFiles()) {//step 1: 跳过隐藏文件和不可读文件 if (f.isHidden() || !f.canRead()) {continue;}String name = f.getName();//step 2: 跳过正则过滤的文件名if (!ALLOWED_FILE_NAME.matcher(name).matches()) {continue;}ret.append("<li>链接:<a href=\"");ret.append(name);ret.append("\">");ret.append(name);ret.append("</a></li>\r\n");}ret.append("</ul></body></html>\r\n");//构造ByteBuf,写入缓冲区ByteBuf buffer = Unpooled.copiedBuffer(ret, CharsetUtil.UTF_8);//进行写出操作
        response.content().writeBytes(buffer);//重置ByteBuf
        buffer.release();//发送完成并主动关闭连接
        ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);}//重定向操作private static void sendRedirect(ChannelHandlerContext ctx, String newUri) {FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, FOUND);response.headers().set(LOCATION, newUri);ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);}//错误信息private static void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) {FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, status, Unpooled.copiedBuffer("Failure: " + status.toString()+ "\r\n", CharsetUtil.UTF_8));response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8");ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);}private static void setContentTypeHeader(HttpResponse response, File file) {//使用mime对象获取文件对应的Content-TypeMimetypesFileTypeMap mimeTypesMap = new MimetypesFileTypeMap();response.headers().set(CONTENT_TYPE, mimeTypesMap.getContentType(file.getPath()));}
}

3.3 HTTP上传文件 (较少使用)

实际应用中文件上传服务端有成熟的框架fastDFS(小文件)和HDFS(大文件)

如要实现断点续传, 需要记录上传进度. 参考HTTP头的Range和Content-Range

public final class HttpUploadServer {static final boolean SSL = System.getProperty("ssl") != null;static final int PORT = Integer.parseInt(System.getProperty("port", SSL? "8443" : "8080"));public static void main(String[] args) throws Exception {// Configure SSL.final SslContext sslCtx;if (SSL) {SelfSignedCertificate ssc = new SelfSignedCertificate();sslCtx = SslContext.newServerContext(ssc.certificate(), ssc.privateKey());} else {sslCtx = null;}EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup);b.channel(NioServerSocketChannel.class);b.handler(new LoggingHandler(LogLevel.INFO));b.childHandler(new HttpUploadServerInitializer(sslCtx));Channel ch = b.bind(PORT).sync().channel();System.err.println("Open your web browser and navigate to " + (SSL ? "https" : "http") + "://127.0.0.1:" + PORT + '/');ch.closeFuture().sync();} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}
}

public class HttpUploadServerInitializer extends ChannelInitializer<SocketChannel> {private final SslContext sslCtx;public HttpUploadServerInitializer(SslContext sslCtx) {this.sslCtx = sslCtx;}@Overridepublic void initChannel(SocketChannel ch) {ChannelPipeline pipeline = ch.pipeline();if (sslCtx != null) {pipeline.addLast(sslCtx.newHandler(ch.alloc()));}pipeline.addLast(new HttpRequestDecoder());pipeline.addLast(new HttpResponseEncoder());// 压缩pipeline.addLast(new HttpContentCompressor());pipeline.addLast(new HttpUploadServerHandler());}
}

public class HttpUploadServerHandler extends SimpleChannelInboundHandler<HttpObject> {private static final Logger logger = Logger.getLogger(HttpUploadServerHandler.class.getName());private HttpRequest request;private boolean readingChunks;private final StringBuilder responseContent = new StringBuilder();private static final HttpDataFactory factory = new DefaultHttpDataFactory(DefaultHttpDataFactory.MINSIZE); // 大小超过minsize放磁盘上private HttpPostRequestDecoder decoder;static {DiskFileUpload.deleteOnExitTemporaryFile = true; //退出时是否删除临时文件DiskFileUpload.baseDirectory = "D:" + File.separatorChar + "aa";  //文件存储路径
        DiskAttribute.deleteOnExitTemporaryFile = true; //退出时是否删除临时文件DiskAttribute.baseDirectory = "D:" + File.separatorChar + "aa"; //文件存储路径
    }@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {if (decoder != null) {decoder.cleanFiles();}}@Overridepublic void messageReceived(ChannelHandlerContext ctx, HttpObject msg) throws Exception {if (msg instanceof HttpRequest) {           // HttpRequest传输头HttpRequest request = this.request = (HttpRequest) msg;URI uri = new URI(request.uri());if (!uri.getPath().startsWith("/form")) {// 返回上传菜单
                writeMenu(ctx);return;}// 拼接反馈内容responseContent.setLength(0);responseContent.append("WELCOME TO THE WILD WILD WEB SERVER\r\n");responseContent.append("===================================\r\n");responseContent.append("VERSION: " + request.protocolVersion().text() + "\r\n");responseContent.append("REQUEST_URI: " + request.uri() + "\r\n\r\n");responseContent.append("\r\n\r\n");for (Entry<CharSequence, CharSequence> entry : request.headers()) {responseContent.append("HEADER: " + entry.getKey() + '=' + entry.getValue() + "\r\n");}responseContent.append("\r\n\r\n");Set<Cookie> cookies = null;String value = request.headers().getAndConvert(HttpHeaderNames.COOKIE);if (value == null) {cookies = Collections.emptySet();} else {cookies = ServerCookieDecoder.decode(value);}for (Cookie cookie : cookies) {responseContent.append("COOKIE: " + cookie + "\r\n");}responseContent.append("\r\n\r\n");QueryStringDecoder decoderQuery = new QueryStringDecoder(request.uri());Map<String, List<String>> uriAttributes = decoderQuery.parameters();for (Entry<String, List<String>> attr: uriAttributes.entrySet()) {for (String attrVal: attr.getValue()) {responseContent.append("URI: " + attr.getKey() + '=' + attrVal + "\r\n");}}responseContent.append("\r\n\r\n");// GET方法, 就此returnif (request.method().equals(HttpMethod.GET)) {responseContent.append("\r\n\r\nEND OF GET CONTENT\r\n");return;}// POST方法try {decoder = new HttpPostRequestDecoder(factory, request);} catch (ErrorDataDecoderException e1) {e1.printStackTrace();responseContent.append(e1.getMessage());writeResponse(ctx.channel());ctx.channel().close();return;}readingChunks = HttpHeaderUtil.isTransferEncodingChunked(request);responseContent.append("Is Chunked: " + readingChunks + "\r\n");responseContent.append("IsMultipart: " + decoder.isMultipart() + "\r\n");if (readingChunks) {responseContent.append("Chunks: ");}}if (decoder != null) {if (msg instanceof HttpContent) {    //HttpContent具体传输的内容// 读取到一个chunkHttpContent chunk = (HttpContent) msg;try {decoder.offer(chunk);} catch (ErrorDataDecoderException e1) {e1.printStackTrace();responseContent.append(e1.getMessage());writeResponse(ctx.channel());ctx.channel().close();return;}responseContent.append('o'); //每读一个chunk标记一个'o'
                readHttpDataChunkByChunk();// 最后一块chunkif (chunk instanceof LastHttpContent) {writeResponse(ctx.channel());readingChunks = false;reset();}}} else {writeResponse(ctx.channel());}}private void reset() {request = null;decoder.destroy(); //释放资源decoder = null;}private void readHttpDataChunkByChunk() throws Exception {try {while (decoder.hasNext()) {InterfaceHttpData data = decoder.next();if (data != null) {try {writeHttpData(data);} finally {data.release();}}}} catch (EndOfDataDecoderException e1) {responseContent.append("\r\n\r\nEND OF CONTENT CHUNK BY CHUNK\r\n\r\n");}}private void writeHttpData(InterfaceHttpData data) throws Exception {if (data.getHttpDataType() == HttpDataType.Attribute) {Attribute attribute = (Attribute) data;String value = null;try {value = attribute.getValue();} catch (IOException e1) {e1.printStackTrace();responseContent.append("\r\nBODY Attribute: " + attribute.getHttpDataType().name() + ": " + attribute.getName() + " Error while reading value: " + e1.getMessage() + "\r\n");return;}if (value.length() > 100) {responseContent.append("\r\nBODY Attribute: " + attribute.getHttpDataType().name() + ": " + attribute.getName() + " data too long\r\n");} else {responseContent.append("\r\nBODY Attribute: " + attribute.getHttpDataType().name() + ": " + attribute + "\r\n");}} else {responseContent.append("\r\n -----------start-------------" + "\r\n");responseContent.append("\r\nBODY FileUpload: " + data.getHttpDataType().name() + ": " + data + "\r\n");responseContent.append("\r\n ------------end------------" + "\r\n");if (data.getHttpDataType() == HttpDataType.FileUpload) {FileUpload fileUpload = (FileUpload) data;if (fileUpload.isCompleted()) {System.out.println("file name : " + fileUpload.getFilename());System.out.println("file length: " + fileUpload.length());System.out.println("file maxSize : " + fileUpload.getMaxSize());System.out.println("file path :" + fileUpload.getFile().getPath());System.out.println("file absolutepath :" + fileUpload.getFile().getAbsolutePath());System.out.println("parent path :" + fileUpload.getFile().getParentFile());if (fileUpload.length() < 1024 * 1024 * 10) {responseContent.append("\tContent of file\r\n");try {responseContent.append(fileUpload.getString(fileUpload.getCharset()));} catch (Exception e1) {e1.printStackTrace();}responseContent.append("\r\n");} else {responseContent.append("\tFile too long to be printed out:" + fileUpload.length() + "\r\n");}fileUpload.renameTo(new File(fileUpload.getFile().getPath())); // 核心操作, 写文件
                    decoder.removeHttpDataFromClean(fileUpload); } else {responseContent.append("\tFile to be continued but should not!\r\n");}}}}private void writeResponse(Channel channel) {ByteBuf buf = copiedBuffer(responseContent.toString(), CharsetUtil.UTF_8);responseContent.setLength(0);// 是否是短连接boolean close = request.headers().contains(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE, true)|| request.protocolVersion().equals(HttpVersion.HTTP_1_0)&& !request.headers().contains(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE, true);FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, buf);response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=UTF-8");// 最后一次连接不需要Content-Lengthif (!close) {response.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, buf.readableBytes());}Set<Cookie> cookies = null;String value = request.headers().getAndConvert(HttpHeaderNames.COOKIE);if (value == null) {cookies = Collections.emptySet();} else {cookies = ServerCookieDecoder.decode(value);}if (!cookies.isEmpty()) {for (Cookie cookie : cookies) {response.headers().add(HttpHeaderNames.SET_COOKIE, ServerCookieEncoder.encode(cookie));}}ChannelFuture future = channel.writeAndFlush(response);if (close) {future.addListener(ChannelFutureListener.CLOSE);}}//拼接上传页html菜单private void writeMenu(ChannelHandlerContext ctx) {responseContent.setLength(0);// create Pseudo MenuresponseContent.append("<html>");responseContent.append("<head>");responseContent.append("<title>Netty Test Form</title>\r\n");responseContent.append("</head>\r\n");responseContent.append("<body bgcolor=white><style>td{font-size: 12pt;}</style>");responseContent.append("<table border=\"0\">");responseContent.append("<tr>");responseContent.append("<td>");responseContent.append("<h1>Netty Test Form</h1>");responseContent.append("Choose one FORM");responseContent.append("</td>");responseContent.append("</tr>");responseContent.append("</table>\r\n");// GETresponseContent.append("<CENTER>GET FORM<HR WIDTH=\"75%\" NOSHADE color=\"blue\"></CENTER>");responseContent.append("<FORM ACTION=\"/formget\" METHOD=\"GET\">");responseContent.append("<input type=hidden name=getform value=\"GET\">");responseContent.append("<table border=\"0\">");responseContent.append("<tr><td>Fill with value: <br> <input type=text name=\"info\" size=10></td></tr>");responseContent.append("<tr><td>Fill with value: <br> <input type=text name=\"secondinfo\" size=20>");responseContent.append("<tr><td>Fill with value: <br> <textarea name=\"thirdinfo\" cols=40 rows=10></textarea>");responseContent.append("</td></tr>");responseContent.append("<tr><td><INPUT TYPE=\"submit\" NAME=\"Send\" VALUE=\"Send\"></INPUT></td>");responseContent.append("<td><INPUT TYPE=\"reset\" NAME=\"Clear\" VALUE=\"Clear\" ></INPUT></td></tr>");responseContent.append("</table></FORM>\r\n");responseContent.append("<CENTER><HR WIDTH=\"75%\" NOSHADE color=\"blue\"></CENTER>");// POSTresponseContent.append("<CENTER>POST FORM<HR WIDTH=\"75%\" NOSHADE color=\"blue\"></CENTER>");responseContent.append("<FORM ACTION=\"/formpost\" METHOD=\"POST\">");responseContent.append("<input type=hidden name=getform value=\"POST\">");responseContent.append("<table border=\"0\">");responseContent.append("<tr><td>Fill with value: <br> <input type=text name=\"info\" size=10></td></tr>");responseContent.append("<tr><td>Fill with value: <br> <input type=text name=\"secondinfo\" size=20>");responseContent.append("<tr><td>Fill with value: <br> <textarea name=\"thirdinfo\" cols=40 rows=10></textarea>");responseContent.append("<tr><td>Fill with file (only file name will be transmitted): <br> <input type=file name=\"myfile\">");responseContent.append("</td></tr>");responseContent.append("<tr><td><INPUT TYPE=\"submit\" NAME=\"Send\" VALUE=\"Send\"></INPUT></td>");responseContent.append("<td><INPUT TYPE=\"reset\" NAME=\"Clear\" VALUE=\"Clear\" ></INPUT></td></tr>");responseContent.append("</table></FORM>\r\n");responseContent.append("<CENTER><HR WIDTH=\"75%\" NOSHADE color=\"blue\"></CENTER>");// POST with enctype="multipart/form-data"responseContent.append("<CENTER>POST MULTIPART FORM<HR WIDTH=\"75%\" NOSHADE color=\"blue\"></CENTER>");responseContent.append("<FORM ACTION=\"/formpostmultipart\" ENCTYPE=\"multipart/form-data\" METHOD=\"POST\">");responseContent.append("<input type=hidden name=getform value=\"POST\">");responseContent.append("<table border=\"0\">");responseContent.append("<tr><td>Fill with value: <br> <input type=text name=\"info\" size=10></td></tr>");responseContent.append("<tr><td>Fill with value: <br> <input type=text name=\"secondinfo\" size=20>");responseContent.append("<tr><td>Fill with value: <br> <textarea name=\"thirdinfo\" cols=40 rows=10></textarea>");responseContent.append("<tr><td>Fill with file: <br> <input type=file name=\"myfile\">");responseContent.append("</td></tr>");responseContent.append("<tr><td><INPUT TYPE=\"submit\" NAME=\"Send\" VALUE=\"Send\"></INPUT></td>");responseContent.append("<td><INPUT TYPE=\"reset\" NAME=\"Clear\" VALUE=\"Clear\" ></INPUT></td></tr>");responseContent.append("</table></FORM>\r\n");responseContent.append("<CENTER><HR WIDTH=\"75%\" NOSHADE color=\"blue\"></CENTER>");responseContent.append("</body>");responseContent.append("</html>");ByteBuf buf = copiedBuffer(responseContent.toString(), CharsetUtil.UTF_8);FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, buf);response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html; charset=UTF-8");response.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, buf.readableBytes());ctx.channel().writeAndFlush(response);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {logger.log(Level.WARNING, responseContent.toString(), cause);ctx.channel().close();}
}

3.4 WebSocket(较少使用)

public class WebSocketServer {public void run(int port) throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("http-codec", new HttpServerCodec()); pipeline.addLast("aggregator", new HttpObjectAggregator(65536)); ch.pipeline().addLast("http-chunked", new ChunkedWriteHandler()); pipeline.addLast("handler", new WebSocketServerHandler()); } }); Channel ch = b.bind(port).sync().channel(); System.out.println("Web socket server started at port " + port + '.'); System.out.println("Open your browser and navigate to http://localhost:" + port + '/'); ch.closeFuture().sync(); bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } public static void main(String[] args) throws Exception { new WebSocketServer().run(8765); } }

public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object> {private static final Logger logger = Logger.getLogger(WebSocketServerHandler.class.getName()); private WebSocketServerHandshaker handshaker; @Override public void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception { // 传统的HTTP接入 if (msg instanceof FullHttpRequest) { handleHttpRequest(ctx, (FullHttpRequest) msg); } // WebSocket接入 else if (msg instanceof WebSocketFrame) { handleWebSocketFrame(ctx, (WebSocketFrame) msg); } } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) throws Exception { // 如果HTTP解码失败,返回HTTP异常 if (!req.decoderResult().isSuccess() || (!"websocket".equals(req.headers().get("Upgrade")))) { sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, BAD_REQUEST)); return; } // 构造握手响应返回,本机测试 WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory("ws://localhost:8080/websocket", null, false); handshaker = wsFactory.newHandshaker(req); if (handshaker == null) { WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel()); } else { handshaker.handshake(ctx.channel(), req); } } private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) { // 判断是否是关闭链路的指令 if (frame instanceof CloseWebSocketFrame) { handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain()); return; } // 判断是否是Ping消息 if (frame instanceof PingWebSocketFrame) { ctx.channel().write(new PongWebSocketFrame(frame.content().retain())); return; } // 本例程仅支持文本消息,不支持二进制消息 if (!(frame instanceof TextWebSocketFrame)) { throw new UnsupportedOperationException(String.format("%s frame types not supported", frame.getClass().getName())); } // 返回应答消息 String request = ((TextWebSocketFrame) frame).text(); if (logger.isLoggable(Level.FINE)) { logger.fine(String.format("%s received %s", ctx.channel(), request)); } ctx.channel().write( new TextWebSocketFrame(request + " , 欢迎使用Netty WebSocket服务,现在时刻:" + new java.util.Date().toString())); } private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, FullHttpResponse res) { // 返回应答给客户端 if (res.status().code() != 200) { ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(), CharsetUtil.UTF_8); res.content().writeBytes(buf); buf.release(); HttpHeaderUtil.setContentLength(res, res.content().readableBytes()); } // 如果是非Keep-Alive,关闭连接 ChannelFuture f = ctx.channel().writeAndFlush(res); if (!HttpHeaderUtil.isKeepAlive(req) || res.status().code() != 200) { f.addListener(ChannelFutureListener.CLOSE); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }

客户端是网页

<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
Netty WebSocket 时间服务器
</head>
<br>
<body><br><script type="text/javascript">var socket;if (!window.WebSocket) {window.WebSocket = window.MozWebSocket;}if (window.WebSocket) { socket = new WebSocket("ws://localhost:8765/websocket"); socket.onmessage = function(event) { var ta = document.getElementById('responseText'); ta.value = ""; ta.value = event.data }; socket.onopen = function(event) { var ta = document.getElementById('responseText'); ta.value = "打开WebSocket服务正常,浏览器支持WebSocket!"; }; socket.onclose = function(event) { var ta = document.getElementById('responseText'); ta.value = ""; ta.value = "WebSocket 关闭!"; }; } else { alert("抱歉,您的浏览器不支持WebSocket协议!"); } function send(message) { if (!window.WebSocket) { return; } if (socket.readyState == WebSocket.OPEN) { socket.send(message); } else { alert("WebSocket连接没有建立成功!"); } } </script> <form οnsubmit="return false;"> <input type="text" name="message" value="Netty最佳实践" /> <br> <br> <input type="button" value="发送WebSocket请求消息" οnclick="send(this.form.message.value)" /> <hr color="blue" /> <h3>服务端返回的应答消息</h3> <textarea id="responseText" style="width: 500px; height: 300px;"></textarea> </form> </body> </html>

Netty基本使用示例相关推荐

  1. netty epoll调用示例

    1.服务器端 import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.PooledByteBufAllocator; imp ...

  2. Flink读取Netty数据示例代码

    本示例代码记录每张表数据的爬取进度 每张表实时ID 每张表实时爬取数量 记录每张表记录总数 import lombok.AllArgsConstructor; import lombok.Builde ...

  3. netty 之 telnet HelloWorld 详解

    2019独角兽企业重金招聘Python工程师标准>>> 依赖工具 Maven Git JDK IntelliJ IDEA 源码拉取 从官方仓库 https://github.com/ ...

  4. Netty通信技术(一)

    目录 一.简介 一.概述 二.核心架构 三.为什么使用Netty不使用Java原生NIO 四.在使用Netty的项目 二.Reactor模型 三.Netty对Reactor的实现 ChannelPip ...

  5. Netty 学习笔记(已完结)

    Netty 0代码示例 A.经典IO多线程 // 获取到的inputStream是SocketInputStream,这个类不是公开的,继承了FileInputStream, InputStream ...

  6. Muduo 网络编程示例之零:前言

    陈硕 (giantchen_AT_gmail) Blog.csdn.net/Solstice Muduo 全系列文章列表: http://blog.csdn.net/Solstice/category ...

  7. Netty客户端断线重连实现及问题思考

    点击关注公众号,利用碎片时间学习 前言 在实现TCP长连接功能中,客户端断线重连是一个很常见的问题,当我们使用netty实现断线重连时,是否考虑过如下几个问题: 如何监听到客户端和服务端连接断开 ? ...

  8. java心跳机制_Java: server/client 心跳机制实现 示例

    心跳机制 心跳机制是定时发送一个自定义的结构体(心跳包),让对方知道自己还活着,以确保连接的有效性的机制. 大部分CS的应用需要心跳机制.心跳机制一般在Server和Client都要实现,两者实现原理 ...

  9. Open Source

    资源来源于http://www.cnblogs.com/Leo_wl/category/246424.html RabbitMQ 安装与使用 摘要: RabbitMQ 安装与使用 前言 吃多了拉就是队 ...

最新文章

  1. 【本人秃顶程序员】使用Spring Cloud Stream和RabbitMQ实现事件驱动的微服务
  2. Springboot集成cache的key生成策略
  3. 管理之善,在于让员工有机会试错
  4. 指针04 - 零基础入门学习C语言44
  5. LeetCode 1652. 拆炸弹(前缀和)
  6. Git常用命令——远程操作
  7. 复选框 checkbox获取值
  8. Swoole Framework 入门教程(1)
  9. Panoply软件安装
  10. PHP date函数参数
  11. InnoDB存储引擎的主要优点
  12. 数据库表的建立与基本操作
  13. vue-版的老虎机抽奖活动效果折腾小记
  14. 铁路轨道设备概述1:铁路轨道基础设备
  15. 仿钉钉考勤统计页面的日历组件,通过日历展示每日考勤打卡情况,支持在日历上打两种不同类型的点,大致适配各种分辨率效果图
  16. 删除node_modules慢【rimraf】
  17. hdu 5454 Excited Database(线段树)
  18. 计算机历史博物馆观后感:阿达·洛芙莱斯生平1
  19. 选择美国虚拟主机时要考虑的事项
  20. Python_第六篇 第三方安装包(1)_fancyimpute介绍及使用

热门文章

  1. java多线程基础学习[狂神说java-多线程笔记]
  2. 利用 iCloud Drive 来同步 Xcode 配置
  3. c文件、h文件、定义、声明(详解)
  4. Windows下安装和配置Mysql保姆级教程(图文说明)
  5. 连接器损耗、回波损耗和反射都是越低越好吗?
  6. 【Python pymongo】零基础也能轻松掌握的学习路线与参考资料
  7. AWS学习python-input
  8. wamp变橙色不变绿的解决办法
  9. 软通动力--MAG内控与BCG考试
  10. node.js接入支付宝小程序的实名认证接口