netty 通道配置接口定义
netty 通道接口定义:[url]http://donald-draper.iteye.com/blog/2392740[/url]
netty 抽象通道初始化:[url]http://donald-draper.iteye.com/blog/2392801[/url]
netty 抽象Unsafe定义:[url]http://donald-draper.iteye.com/blog/2393053[/url]
netty 通道Outbound缓冲区:[url]http://donald-draper.iteye.com/blog/2393098[/url]
netty 抽象通道后续:[url]http://donald-draper.iteye.com/blog/2393166[/url]
netty 抽象nio通道:[url]http://donald-draper.iteye.com/blog/2393269[/url]
netty 抽象nio字节通道:[url]http://donald-draper.iteye.com/blog/2393323[/url]
netty 抽象nio消息通道:[url]http://donald-draper.iteye.com/blog/2393364[/url]
netty NioServerSocketChannel解析:[url]http://donald-draper.iteye.com/blog/2393443[/url]
[size=medium][b]引言[/b][/size]
上一篇文章,我们看了nio服务端socket通道,先来回顾一下:
nio服务端socket通道NioServerSocketChannel内部有两个变量,一个为选择器提供者SelectorProvider,一个为通道配置ServerSocketChannelConfig。
通道实际绑定socket地址,首先判断jdk版本信息,如果jdk版本大于1.7 则使用通道bind方法,绑定socket地址,否则为通道关联Socket的bind方法。
doReadMessages方法,实际为当接受客户端的连接请求时,创建一个与客户端交互的socket通道,并添加到读操作结果集中,实际为socket通道集。并将socket通道集交给ServerBootStrap的引导配置监听器ServerBootstrapAcceptor处理,Server引导配置监听器实际为一个Inbound通道处理器,每当有客户端连接请求时,则创建一个与客户端交互的通道,将child通道选项及属性配置给通道,并将通道注册到childGroup事件循环组,然后将通道处理器添加到与客户端交互的通道内部的Channel管道中。 客户端连接服务端时,首先向服务端发送连接请求数据,服务端接受到连接请求时,创建一个与客户端交互的socket通道。
由于服务端通道用于接受客户端的请求,所有不支持连接,写消息,消息过滤等等操作。
上一篇文章中我们遗留了一点,即nio服务端通道配置,今天我们就来看一通道配置:
//Nio服务端通道配置, 为NioServerSocketChannel的内部类,这个我们单独列一篇文章来说
private final class NioServerSocketChannelConfig extends DefaultServerSocketChannelConfig { private NioServerSocketChannelConfig(NioServerSocketChannel channel, ServerSocket javaSocket) { super(channel, javaSocket); }
@Override protected void autoReadCleared() { clearReadPending(); }}
//DefaultServerSocketChannelConfig
/** * The default {@link ServerSocketChannelConfig} implementation. */public class DefaultServerSocketChannelConfig extends DefaultChannelConfig implements ServerSocketChannelConfig {
//ServerSocketChannelConfig
public interface ServerSocketChannelConfig extends ChannelConfig {
//DefaultChannelConfig
/** * The default {@link ChannelConfig} implementation. */public class DefaultChannelConfig implements ChannelConfig {
我们先来看通道配置接口ChannelConfig的定义:
package io.netty.channel;
import io.netty.buffer.ByteBufAllocator;import io.netty.channel.socket.SocketChannelConfig;
import java.nio.ByteBuffer;import java.nio.channels.WritableByteChannel;import java.util.Map;
/** * A set of configuration properties of a {@link Channel}. 通道配置属性集 * * Please down-cast to more specific configuration type such as * {@link SocketChannelConfig} or use {@link #setOptions(Map)} to set the * transport-specific properties: 可以强制转化为特殊的配置,不如socket通道配置,设置transport属性如下: * <pre> * {@link Channel} ch = ...; * {@link SocketChannelConfig} cfg = [b]({@link SocketChannelConfig}) ch.getConfig();[/b] * cfg.setTcpNoDelay(false); * </pre> * * <h3>Option map</h3> * * An option map property is a dynamic write-only property which allows * the configuration of a {@link Channel} without down-casting its associated * {@link ChannelConfig}. To update an option map, please call {@link #setOptions(Map)}. 选项Map是一个动态只写的集合,允许在没有转化为通道配置的情况下,配置选项: * <p> * All {@link ChannelConfig} has the following options: *所有通道配置选项 * <table border="1" cellspacing="0" cellpadding="6"> * <tr> * <th>Name</th><th>Associated setter method</th> * </tr><tr> * <td>{@link ChannelOption#CONNECT_TIMEOUT_MILLIS}</td><td>{@link #setConnectTimeoutMillis(int)}</td> * </tr><tr> * <td>{@link ChannelOption#WRITE_SPIN_COUNT}</td><td>{@link #setWriteSpinCount(int)}</td> * </tr><tr> * <td>{@link ChannelOption#WRITE_BUFFER_WATER_MARK}</td><td>{@link #setWriteBufferWaterMark(WriteBufferWaterMark)}</td> * </tr><tr> * <td>{@link ChannelOption#ALLOCATOR}</td><td>{@link #setAllocator(ByteBufAllocator)}</td> * </tr><tr> * <td>{@link ChannelOption#AUTO_READ}</td><td>{@link #setAutoRead(boolean)}</td> * </tr> * </table> * <p> * More options are available in the sub-types of {@link ChannelConfig}. For * example, you can configure the parameters which are specific to a TCP/IP * socket as explained in {@link SocketChannelConfig}. */public interface ChannelConfig {
/** * Return all set {@link ChannelOption}'s. 获取所有通道选项 */ Map<ChannelOption<?>, Object> getOptions();
/** * Sets the configuration properties from the specified {@link Map}. 配置通道选项 */ boolean setOptions(Map<ChannelOption<?>, ?> options);
/** * Return the value of the given {@link ChannelOption} 获取给定通道选项的值 */ <T> T getOption(ChannelOption<T> option);
/** * Sets a configuration property with the specified name and value. * To override this method properly, you must call the super class: * <pre> * public boolean setOption(ChannelOption<T> option, T value) { * if (super.setOption(option, value)) { * return true; * } * * if (option.equals(additionalOption)) { * .... * return true; * } * * return false; * } * </pre> *配置给定通道选项的值 * @return {@code true} if and only if the property has been set */ <T> boolean setOption(ChannelOption<T> option, T value);
/** * Returns the connect timeout of the channel in milliseconds. If the * {@link Channel} does not support connect operation, this property is not * used at all, and therefore will be ignored. *获取通道连接超时时间 * @return the connect timeout in milliseconds. {@code 0} if disabled. */ int getConnectTimeoutMillis();
/** * Sets the connect timeout of the channel in milliseconds. If the * {@link Channel} does not support connect operation, this property is not * used at all, and therefore will be ignored. *设置通道连接超时时间 * @param connectTimeoutMillis the connect timeout in milliseconds. * {@code 0} to disable. */ ChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis);
/** * @deprecated Use {@link MaxMessagesRecvByteBufAllocator} * <p> * Returns the maximum number of messages to read per read loop. * a {@link ChannelInboundHandler#channelRead(ChannelHandlerContext, Object) channelRead()} event. * If this value is greater than 1, an event loop might attempt to read multiple times to procure multiple messages. 这个方法已经被丢弃,返回每次通道处理器读操作事件,允许读取的最大消息数量, 如果值大于1,则每个事件循环尝试多次读操作,以产生多个消息。 */ @Deprecated int getMaxMessagesPerRead();
/** * @deprecated Use {@link MaxMessagesRecvByteBufAllocator} 设置每次读取允许读取的消息数,当前方法已丢弃,请使用MaxMessagesRecvByteBufAllocator * <p> * Sets the maximum number of messages to read per read loop. * If this value is greater than 1, an event loop might attempt to read multiple times to procure multiple messages. */ @Deprecated ChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead);
/** * Returns the maximum loop count for a write operation until * {@link WritableByteChannel#write(ByteBuffer)} returns a non-zero value. * It is similar to what a spin lock is used for in concurrency programming. * It improves memory utilization and write throughput depending on * the platform that JVM runs on. The default value is {@code 16}. 返回直到可写字节通道写操作返回非零值时,允许循环写的次数。相当于并发编程中的 自旋锁。能否改善内存使用率和写吞吐量依赖句具体的JVM平台。默认值为16 */ int getWriteSpinCount();
/** * Sets the maximum loop count for a write operation until * {@link WritableByteChannel#write(ByteBuffer)} returns a non-zero value. * It is similar to what a spin lock is used for in concurrency programming. * It improves memory utilization and write throughput depending on * the platform that JVM runs on. The default value is {@code 16}. *设置写操作最大自旋数 * @throws IllegalArgumentException * if the specified value is {@code 0} or less than {@code 0} */ ChannelConfig setWriteSpinCount(int writeSpinCount); //获取设置字节分配器 /** * Returns {@link ByteBufAllocator} which is used for the channel * to allocate buffers. */ ByteBufAllocator getAllocator();
/** * Set the {@link ByteBufAllocator} which is used for the channel * to allocate buffers. */ ChannelConfig setAllocator(ByteBufAllocator allocator); //设置获取接受字节分配器 /** * Returns {@link RecvByteBufAllocator} which is used for the channel to allocate receive buffers. */ <T extends RecvByteBufAllocator> T getRecvByteBufAllocator();
/** * Set the {@link RecvByteBufAllocator} which is used for the channel to allocate receive buffers. */ ChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator); //获取设置,通道上下文是否自动调用#read操作,默认为true /** * Returns {@code true} if and only if {@link ChannelHandlerContext#read()} will be invoked automatically so that * a user application doesn't need to call it at all. The default value is {@code true}. */ boolean isAutoRead();
/** * Sets if {@link ChannelHandlerContext#read()} will be invoked automatically so that a user application doesn't * need to call it at all. The default value is {@code true}. */ ChannelConfig setAutoRead(boolean autoRead); //是否自动关闭,已丢弃 /** * @deprecated Auto close will be removed in a future release. * * Returns {@code true} if and only if the {@link Channel} will be closed automatically and immediately on * write failure. The default is {@code false}. */ @Deprecated boolean isAutoClose();
/** * @deprecated Auto close will be removed in a future release. * * Sets whether the {@link Channel} should be closed automatically and immediately on write failure. * The default is {@code false}. */ @Deprecated ChannelConfig setAutoClose(boolean autoClose); //返回写buffer的高位掩码,如果写buf中缓存的字节数大于高位掩码值,则Channel#isWritabl方法返回false /** * Returns the high water mark of the write buffer. If the number of bytes * queued in the write buffer exceeds this value, {@link Channel#isWritable()} * will start to return {@code false}. */ int getWriteBufferHighWaterMark();
/** * <p> * Sets the high water mark of the write buffer. If the number of bytes * queued in the write buffer exceeds this value, {@link Channel#isWritable()} * will start to return {@code false}. */ ChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark); //返回写buffer的低位掩码,如果写buf中缓存的字节数大于高位掩码值,则Channel#isWritabl方法返回true /** * Returns the low water mark of the write buffer. Once the number of bytes * queued in the write buffer exceeded the * {@linkplain #setWriteBufferHighWaterMark(int) high water mark} and then * dropped down below this value, {@link Channel#isWritable()} will start to return * {@code true} again. */ int getWriteBufferLowWaterMark();
/** * <p> * Sets the low water mark of the write buffer. Once the number of bytes * queued in the write buffer exceeded the * {@linkplain #setWriteBufferHighWaterMark(int) high water mark} and then * dropped down below this value, {@link Channel#isWritable()} will start to return * {@code true} again. */ ChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark); //消息size估算器 /** * Returns {@link MessageSizeEstimator} which is used for the channel * to detect the size of a message. */ MessageSizeEstimator getMessageSizeEstimator();
/** * Set the {@link MessageSizeEstimator} which is used for the channel * to detect the size of a message. */ ChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator); //获取写buf高位掩码 /** * Returns the {@link WriteBufferWaterMark} which is used for setting the high and low * water mark of the write buffer. */ WriteBufferWaterMark getWriteBufferWaterMark();
/** * Set the {@link WriteBufferWaterMark} which is used for setting the high and low * water mark of the write buffer. */ ChannelConfig setWriteBufferWaterMark(WriteBufferWaterMark writeBufferWaterMark);}
从上面来看,通道配置接口,主要配置通道的字节buf分配器,接受buf分配器,消息size估算器,和通道选项。
socket通道配置和server socket通道配置,很简单,我们不再讲解,简单看一下:
//ServerSocketChannelConfig
package io.netty.channel.socket;
import io.netty.buffer.ByteBufAllocator;import io.netty.channel.ChannelConfig;import io.netty.channel.MessageSizeEstimator;import io.netty.channel.RecvByteBufAllocator;import io.netty.channel.WriteBufferWaterMark;
import java.net.ServerSocket;import java.net.StandardSocketOptions;
/** * A {@link ChannelConfig} for a {@link ServerSocketChannel}. * * <h3>Available options</h3> * * In addition to the options provided by {@link ChannelConfig}, * {@link ServerSocketChannelConfig} allows the following options in the * option map: * * <table border="1" cellspacing="0" cellpadding="6"> * <tr> * <th>Name</th><th>Associated setter method</th> * </tr><tr> * <td>{@code "backlog"}</td><td>{@link #setBacklog(int)}</td> * </tr><tr> * <td>{@code "reuseAddress"}</td><td>{@link #setReuseAddress(boolean)}</td> * </tr><tr> * <td>{@code "receiveBufferSize"}</td><td>{@link #setReceiveBufferSize(int)}</td> * </tr> * </table> */public interface ServerSocketChannelConfig extends ChannelConfig {
/** * Gets the backlog value to specify when the channel binds to a local * address. */ int getBacklog();
/** * Sets the backlog value to specify when the channel binds to a local * address. */ ServerSocketChannelConfig setBacklog(int backlog);
/** * Gets the {@link StandardSocketOptions#SO_REUSEADDR} option. */ boolean isReuseAddress();
/** * Sets the {@link StandardSocketOptions#SO_REUSEADDR} option. */ ServerSocketChannelConfig setReuseAddress(boolean reuseAddress);
/** * Gets the {@link StandardSocketOptions#SO_RCVBUF} option. */ int getReceiveBufferSize();
/** * Gets the {@link StandardSocketOptions#SO_SNDBUF} option. */ ServerSocketChannelConfig setReceiveBufferSize(int receiveBufferSize);
/** * Sets the performance preferences as specified in * {@link ServerSocket#setPerformancePreferences(int, int, int)}. */ ServerSocketChannelConfig setPerformancePreferences(int connectionTime, int latency, int bandwidth);
@Override ServerSocketChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis);
@Override @Deprecated ServerSocketChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead);
@Override ServerSocketChannelConfig setWriteSpinCount(int writeSpinCount);
@Override ServerSocketChannelConfig setAllocator(ByteBufAllocator allocator);
@Override ServerSocketChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator);
@Override ServerSocketChannelConfig setAutoRead(boolean autoRead);
@Override ServerSocketChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator);
@Override ServerSocketChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark);
@Override ServerSocketChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark);
@Override ServerSocketChannelConfig setWriteBufferWaterMark(WriteBufferWaterMark writeBufferWaterMark);
}
//SocketChannelConfig
package io.netty.channel.socket;
import io.netty.buffer.ByteBufAllocator;import io.netty.channel.ChannelConfig;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandler;import io.netty.channel.ChannelOption;import io.netty.channel.MessageSizeEstimator;import io.netty.channel.RecvByteBufAllocator;import io.netty.channel.WriteBufferWaterMark;
import java.net.Socket;import java.net.StandardSocketOptions;
/** * A {@link ChannelConfig} for a {@link SocketChannel}. * * <h3>Available options</h3> * * In addition to the options provided by {@link ChannelConfig}, * {@link SocketChannelConfig} allows the following options in the option map: * * <table border="1" cellspacing="0" cellpadding="6"> * <tr> * <th>Name</th><th>Associated setter method</th> * </tr><tr> * <td>{@link ChannelOption#SO_KEEPALIVE}</td><td>{@link #setKeepAlive(boolean)}</td> * </tr><tr> * <td>{@link ChannelOption#SO_REUSEADDR}</td><td>{@link #setReuseAddress(boolean)}</td> * </tr><tr> * <td>{@link ChannelOption#SO_LINGER}</td><td>{@link #setSoLinger(int)}</td> * </tr><tr> * <td>{@link ChannelOption#TCP_NODELAY}</td><td>{@link #setTcpNoDelay(boolean)}</td> * </tr><tr> * <td>{@link ChannelOption#SO_RCVBUF}</td><td>{@link #setReceiveBufferSize(int)}</td> * </tr><tr> * <td>{@link ChannelOption#SO_SNDBUF}</td><td>{@link #setSendBufferSize(int)}</td> * </tr><tr> * <td>{@link ChannelOption#IP_TOS}</td><td>{@link #setTrafficClass(int)}</td> * </tr><tr> * <td>{@link ChannelOption#ALLOW_HALF_CLOSURE}</td><td>{@link #setAllowHalfClosure(boolean)}</td> * </tr> * </table> */public interface SocketChannelConfig extends ChannelConfig {
/** * Gets the {@link StandardSocketOptions#TCP_NODELAY} option. Please note that the default value of this option * is {@code true} unlike the operating system default ({@code false}). However, for some buggy platforms, such as * Android, that shows erratic behavior with Nagle's algorithm disabled, the default value remains to be * {@code false}. */ boolean isTcpNoDelay();
/** * Sets the {@link StandardSocketOptions#TCP_NODELAY} option. Please note that the default value of this option * is {@code true} unlike the operating system default ({@code false}). However, for some buggy platforms, such as * Android, that shows erratic behavior with Nagle's algorithm disabled, the default value remains to be * {@code false}. */ SocketChannelConfig setTcpNoDelay(boolean tcpNoDelay);
/** * Gets the {@link StandardSocketOptions#SO_LINGER} option. */ int getSoLinger();
/** * Sets the {@link StandardSocketOptions#SO_LINGER} option. */ SocketChannelConfig setSoLinger(int soLinger);
/** * Gets the {@link StandardSocketOptions#SO_SNDBUF} option. */ int getSendBufferSize();
/** * Sets the {@link StandardSocketOptions#SO_SNDBUF} option. */ SocketChannelConfig setSendBufferSize(int sendBufferSize);
/** * Gets the {@link StandardSocketOptions#SO_RCVBUF} option. */ int getReceiveBufferSize();
/** * Sets the {@link StandardSocketOptions#SO_RCVBUF} option. */ SocketChannelConfig setReceiveBufferSize(int receiveBufferSize);
/** * Gets the {@link StandardSocketOptions#SO_KEEPALIVE} option. */ boolean isKeepAlive();
/** * Sets the {@link StandardSocketOptions#SO_KEEPALIVE} option. */ SocketChannelConfig setKeepAlive(boolean keepAlive);
/** * Gets the {@link StandardSocketOptions#IP_TOS} option. */ int getTrafficClass();
/** * Sets the {@link StandardSocketOptions#IP_TOS} option. */ SocketChannelConfig setTrafficClass(int trafficClass);
/** * Gets the {@link StandardSocketOptions#SO_REUSEADDR} option. */ boolean isReuseAddress();
/** * Sets the {@link StandardSocketOptions#SO_REUSEADDR} option. */ SocketChannelConfig setReuseAddress(boolean reuseAddress);
/** * Sets the performance preferences as specified in * {@link Socket#setPerformancePreferences(int, int, int)}. */ SocketChannelConfig setPerformancePreferences(int connectionTime, int latency, int bandwidth);
/** * Returns {@code true} if and only if the channel should not close itself when its remote * peer shuts down output to make the connection half-closed. If {@code false}, the connection * is closed automatically when the remote peer shuts down output. */ boolean isAllowHalfClosure();
/** * Sets whether the channel should not close itself when its remote peer shuts down output to * make the connection half-closed. If {@code true} the connection is not closed when the * remote peer shuts down output. Instead, * {@link ChannelInboundHandler#userEventTriggered(ChannelHandlerContext, Object)} * is invoked with a {@link ChannelInputShutdownEvent} object. If {@code false}, the connection * is closed automatically. 设置当远端peer关闭输出流,使连接半关闭时,通道是否应该关闭。如果是true,表示当远端peer关闭输出流, 不应该关闭通道 */ SocketChannelConfig setAllowHalfClosure(boolean allowHalfClosure);
@Override SocketChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis);
@Override @Deprecated SocketChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead);
@Override SocketChannelConfig setWriteSpinCount(int writeSpinCount);
@Override SocketChannelConfig setAllocator(ByteBufAllocator allocator);
@Override SocketChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator);
@Override SocketChannelConfig setAutoRead(boolean autoRead);
@Override SocketChannelConfig setAutoClose(boolean autoClose);
@Override SocketChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator);
@Override SocketChannelConfig setWriteBufferWaterMark(WriteBufferWaterMark writeBufferWaterMark);
}
[b]总结:[/b]
[color=blue]
通道配置接口,主要配置通道的字节buf分配器,接受buf分配器,消息size估算器,和通道选项。通通配置有两类分别为Socket通道和ServerSocket通道配置,大部分配置与Socket和SeverSocket的基本相同
[/color]
附:
//ByteBufAllocator
package io.netty.buffer;
/** * Implementations are responsible to allocate buffers. Implementations of this interface are expected to be * thread-safe. 字节buf分配器,主要负责分配buf,接口的实现必须是线程安全的 */public interface ByteBufAllocator {
ByteBufAllocator DEFAULT = ByteBufUtil.DEFAULT_ALLOCATOR;
/** * Allocate a {@link ByteBuf}. If it is a direct or heap buffer * depends on the actual implementation. 分配一个字节buf,具体buf是direct还是heap依赖于具体的实现 */ ByteBuf buffer();
/** * Allocate a {@link ByteBuf} with the given initial capacity. * If it is a direct or heap buffer depends on the actual implementation. 分配一个给定初始化容量的字节buf,具体buf是direct还是heap依赖于具体的实现 */ ByteBuf buffer(int initialCapacity);
/** * Allocate a {@link ByteBuf} with the given initial capacity and the given * maximal capacity. If it is a direct or heap buffer depends on the actual * implementation. 不上面不同的是,限制了最大容量 */ ByteBuf buffer(int initialCapacity, int maxCapacity);
/** * Allocate a {@link ByteBuf}, preferably a direct buffer which is suitable for I/O. 分配字节buf,优先为direct类型的buf */ ByteBuf ioBuffer();
/** * Allocate a {@link ByteBuf}, preferably a direct buffer which is suitable for I/O. 与上面不同的是,多一个初始化容量参数 */ ByteBuf ioBuffer(int initialCapacity);
/** * Allocate a {@link ByteBuf}, preferably a direct buffer which is suitable for I/O. 与上面不同的是,限制了最大容量 */ ByteBuf ioBuffer(int initialCapacity, int maxCapacity); //分配一个heap buf /** * Allocate a heap {@link ByteBuf}. */ ByteBuf heapBuffer();
/** * Allocate a heap {@link ByteBuf} with the given initial capacity. */ ByteBuf heapBuffer(int initialCapacity);
/** * Allocate a heap {@link ByteBuf} with the given initial capacity and the given * maximal capacity. */ ByteBuf heapBuffer(int initialCapacity, int maxCapacity); //分配一个direct buf /** * Allocate a direct {@link ByteBuf}. */ ByteBuf directBuffer();
/** * Allocate a direct {@link ByteBuf} with the given initial capacity. */ ByteBuf directBuffer(int initialCapacity);
/** * Allocate a direct {@link ByteBuf} with the given initial capacity and the given * maximal capacity. */ ByteBuf directBuffer(int initialCapacity, int maxCapacity); //分配复合buf,具体为direct还是heap,依赖于具体的实现 /** * Allocate a {@link CompositeByteBuf}. * If it is a direct or heap buffer depends on the actual implementation. */ CompositeByteBuf compositeBuffer();
/** * Allocate a {@link CompositeByteBuf} with the given maximum number of components that can be stored in it. * If it is a direct or heap buffer depends on the actual implementation. */ CompositeByteBuf compositeBuffer(int maxNumComponents); //分配heap类型的复合buf /** * Allocate a heap {@link CompositeByteBuf}. */ CompositeByteBuf compositeHeapBuffer();
/** * Allocate a heap {@link CompositeByteBuf} with the given maximum number of components that can be stored in it. */ CompositeByteBuf compositeHeapBuffer(int maxNumComponents); //分配direct类型的复合buf /** * Allocate a direct {@link CompositeByteBuf}. */ CompositeByteBuf compositeDirectBuffer();
/** * Allocate a direct {@link CompositeByteBuf} with the given maximum number of components that can be stored in it. */ CompositeByteBuf compositeDirectBuffer(int maxNumComponents);
/** * Returns {@code true} if direct {@link ByteBuf}'s are pooled 判断buffer是否为direct类型的池buf */ boolean isDirectBufferPooled();
/** * Calculate the new capacity of a {@link ByteBuf} that is used when a {@link ByteBuf} needs to expand by the * {@code minNewCapacity} with {@code maxCapacity} as upper-bound. 当buf在新的最小容量为minNewCapacity,最大容量为情况maxCapacity下,需要扩展时,计算buf的新容量 */ int calculateNewCapacity(int minNewCapacity, int maxCapacity); }
从上面来看字节buf分配器,主要定义了分配buf的方法,bufffer*方法分配的buffer具体类型依赖于具体的实现,可能为heap,可能为direct类型;ioBuffer*方法分配的buf,优先为direct类型;directBuffer*方法,分配的是direct类型的buf;heapBuffer*分配的是heap类型的buf,compositeBuffer方法,分配的是复合buf,具体可能为heap,可能为direct类型,
依赖于具体的实现;compositeDirectBuffer*分配的为复合direct buf;compositeHeapBuffer*方法分配的为复合heap buf。
再来看默认的字节buf分配器
ByteBufAllocator DEFAULT = ByteBufUtil.DEFAULT_ALLOCATOR;
public final class ByteBufUtil { static { String allocType = SystemPropertyUtil.get( "io.netty.allocator.type", PlatformDependent.isAndroid() ? "unpooled" : "pooled"); allocType = allocType.toLowerCase(Locale.US).trim(); ByteBufAllocator alloc; if ("unpooled".equals(allocType)) { alloc = UnpooledByteBufAllocator.DEFAULT; logger.debug("-Dio.netty.allocator.type: {}", allocType); } else if ("pooled".equals(allocType)) { alloc = PooledByteBufAllocator.DEFAULT; logger.debug("-Dio.netty.allocator.type: {}", allocType); } else { alloc = PooledByteBufAllocator.DEFAULT; logger.debug("-Dio.netty.allocator.type: pooled (unknown: {})", allocType); } DEFAULT_ALLOCATOR = alloc;
THREAD_LOCAL_BUFFER_SIZE = SystemPropertyUtil.getInt("io.netty.threadLocalDirectBufferSize", 64 * 1024); logger.debug("-Dio.netty.threadLocalDirectBufferSize: {}", THREAD_LOCAL_BUFFER_SIZE);
MAX_CHAR_BUFFER_SIZE = SystemPropertyUtil.getInt("io.netty.maxThreadLocalCharBufferSize", 16 * 1024); logger.debug("-Dio.netty.maxThreadLocalCharBufferSize: {}", MAX_CHAR_BUFFER_SIZE); }}
从字节buf工具类,可以看出,如果系统属性io.netty.allocator.type,配置的为unpooled,
则默认的buf为UnpooledByteBufAllocator,否则为PooledByteBufAllocator,对于Android平台,默认为UnpooledByteBufAllocator。
//UnpooledByteBufAllocator
public final class UnpooledByteBufAllocator extends AbstractByteBufAllocator implements ByteBufAllocatorMetricProvider {
private final UnpooledByteBufAllocatorMetric metric = new UnpooledByteBufAllocatorMetric(); private final boolean disableLeakDetector;
/** * Default instance which uses leak-detection for direct buffers. */ public static final UnpooledByteBufAllocator DEFAULT = new UnpooledByteBufAllocator(PlatformDependent.directBufferPreferred()); ...}
//PooledByteBufAllocator
public class PooledByteBufAllocator extends AbstractByteBufAllocator implements ByteBufAllocatorMetricProvider { public static final PooledByteBufAllocator DEFAULT = new PooledByteBufAllocator(PlatformDependent.directBufferPreferred()); ...}
//WriteBufferWaterMark
package io.netty.channel;
/** * WriteBufferWaterMark is used to set low water mark and high water mark for the write buffer. * <p> * If the number of bytes queued in the write buffer exceeds the * {@linkplain #high high water mark}, {@link Channel#isWritable()} * will start to return {@code false}. * <p> * If the number of bytes queued in the write buffer exceeds the * {@linkplain #high high water mark} and then * dropped down below the {@linkplain #low low water mark}, * {@link Channel#isWritable()} will start to return * {@code true} again. */public final class WriteBufferWaterMark { private static final int DEFAULT_LOW_WATER_MARK = 32 * 1024;//默认低位掩码 private static final int DEFAULT_HIGH_WATER_MARK = 64 * 1024;//默认高位掩码 public static final WriteBufferWaterMark DEFAULT = new WriteBufferWaterMark(DEFAULT_LOW_WATER_MARK, DEFAULT_HIGH_WATER_MARK, false); private final int low; private final int high; /** * Create a new instance. * * @param low low water mark for write buffer. * @param high high water mark for write buffer */ public WriteBufferWaterMark(int low, int high) { this(low, high, true); } /** * This constructor is needed to keep backward-compatibility. */ WriteBufferWaterMark(int low, int high, boolean validate) { if (validate) { if (low < 0) { throw new IllegalArgumentException("write buffer's low water mark must be >= 0"); } if (high < low) { throw new IllegalArgumentException( "write buffer's high water mark cannot be less than " + " low water mark (" + low + "): " + high); } } this.low = low; this.high = high; }
/** * Returns the low water mark for the write buffer. */ public int low() { return low; } /** * Returns the high water mark for the write buffer. */ public int high() { return high; }
@Override public String toString() { StringBuilder builder = new StringBuilder(55) .append("WriteBufferWaterMark(low: ") .append(low) .append(", high: ") .append(high) .append(")"); return builder.toString(); }}
再来看MaxMessagesRecvByteBufAllocator:
package io.netty.channel;
/** * {@link RecvByteBufAllocator} that limits the number of read operations that will be attempted when a read operation * is attempted by the event loop. 在事件循环中,尝试读操作是,限制读操作的尝试次数 */public interface MaxMessagesRecvByteBufAllocator extends RecvByteBufAllocator { /** * Returns the maximum number of messages to read per read loop. * a {@link ChannelInboundHandler#channelRead(ChannelHandlerContext, Object) channelRead()} event. * If this value is greater than 1, an event loop might attempt to read multiple times to procure multiple messages. 获取尝试读操作次数 */ int maxMessagesPerRead();
/** 设置最大尝试读操作次数 * Sets the maximum number of messages to read per read loop. * If this value is greater than 1, an event loop might attempt to read multiple times to procure multiple messages. */ MaxMessagesRecvByteBufAllocator maxMessagesPerRead(int maxMessagesPerRead);}
//RecvByteBufAllocator
package io.netty.channel;
import io.netty.buffer.ByteBuf;import io.netty.buffer.ByteBufAllocator;import io.netty.util.UncheckedBooleanSupplier;import io.netty.util.internal.UnstableApi;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
/** * Allocates a new receive buffer whose capacity is probably large enough to read all inbound data and small enough * not to waste its space. */public interface RecvByteBufAllocator { /** * Creates a new handle. The handle provides the actual operations and keeps the internal information which is * required for predicting an optimal buffer capacity. 创建一个新的Handle,提供实际的操作和维护buf的容量等信息 */ Handle newHandle();
/** * @deprecated Use {@link ExtendedHandle}. */ @Deprecated interface Handle { /** * Creates a new receive buffer whose capacity is probably large enough to read all inbound data and small * enough not to waste its space. 创建一个接收字节buf,可以存放从读取的inbound数据,并且不会浪费空间 */ ByteBuf allocate(ByteBufAllocator alloc);
/** * Similar to {@link #allocate(ByteBufAllocator)} except that it does not allocate anything but just tells the * capacity. 返回分配buf的容量 */ int guess();
/** * Reset any counters that have accumulated and recommend how many messages/bytes should be read for the next * read loop. 重置所有累计和下一次循环需要读取的字节数计数器 * <p> * This may be used by {@link #continueReading()} to determine if the read operation should complete. 用于#continueReading方法,决定读操作是否完成 *
* This is only ever a hint and may be ignored by the implementation. * @param config The channel configuration which may impact this object's behavior. */ void reset(ChannelConfig config);
/** * Increment the number of messages that have been read for the current read loop. * @param numMessages The amount to increment by. 增加当前读循环读取的消息数 */ void incMessagesRead(int numMessages);
/** * Set the bytes that have been read for the last read operation. 设置上一次读操作,读取的字节数 * This may be used to increment the number of bytes that have been read. 用于增加读字节数计数器 * @param bytes The number of bytes from the previous read operation. This may be negative if an read error * occurs. If a negative value is seen it is expected to be return on the next call to * {@link #lastBytesRead()}. A negative value will signal a termination condition enforced externally * to this class and is not required to be enforced in {@link #continueReading()}. 上次读操作,读取的字节数。如果读错误发生,可以为一个负数,负数表示不需要继续读操作 */ void lastBytesRead(int bytes);
/** * Get the amount of bytes for the previous read operation. 获取先前读操作,读取的数据 * @return The amount of bytes for the previous read operation. */ int lastBytesRead();
/** * Set how many bytes the read operation will (or did) attempt to read. * @param bytes How many bytes the read operation will (or did) attempt to read. 设置读操作将会或已经读取多少字节数据 */ void attemptedBytesRead(int bytes);
/** * Get how many bytes the read operation will (or did) attempt to read. 获取读操作将会或已经读取多少字节数据 * @return How many bytes the read operation will (or did) attempt to read. */ int attemptedBytesRead();
/** * Determine if the current read loop should should continue. 决定一个读循环是否应该继续 * @return {@code true} if the read loop should continue reading. {@code false} if the read loop is complete. */ boolean continueReading();
/** * The read has completed. */ void readComplete(); } //拓展Handle @SuppressWarnings("deprecation") @UnstableApi interface ExtendedHandle extends Handle { /** * Same as {@link Handle#continueReading()} except "more data" is determined by the supplier parameter. * @param maybeMoreDataSupplier A supplier that determines if there maybe more data to read. 由于参数maybeMoreDataSupplier决定是否需要继续读取数据。 */ boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier); }
/** * A {@link Handle} which delegates all call to some other {@link Handle}. Hanlder代理 */ class DelegatingHandle implements Handle { private final Handle delegate;//代理Handle
public DelegatingHandle(Handle delegate) { this.delegate = checkNotNull(delegate, "delegate"); }
/** * Get the {@link Handle} which all methods will be delegated to. * @return the {@link Handle} which all methods will be delegated to. */ protected final Handle delegate() { return delegate; }
@Override public ByteBuf allocate(ByteBufAllocator alloc) { return delegate.allocate(alloc); }
@Override public int guess() { return delegate.guess(); }
@Override public void reset(ChannelConfig config) { delegate.reset(config); }
@Override public void incMessagesRead(int numMessages) { delegate.incMessagesRead(numMessages); }
@Override public void lastBytesRead(int bytes) { delegate.lastBytesRead(bytes); }
@Override public int lastBytesRead() { return delegate.lastBytesRead(); }
@Override public boolean continueReading() { return delegate.continueReading(); }
@Override public int attemptedBytesRead() { return delegate.attemptedBytesRead(); }
@Override public void attemptedBytesRead(int bytes) { delegate.attemptedBytesRead(bytes); }
@Override public void readComplete() { delegate.readComplete(); } }}
//UncheckedBooleanSupplier
package io.netty.util;
/** * Represents a supplier of {@code boolean}-valued results which doesn't throw any checked exceptions. */public interface UncheckedBooleanSupplier extends BooleanSupplier { /** * Gets a boolean value. * @return a boolean value. */ @Override boolean get();
/** * A supplier which always returns {@code false} and never throws. */ UncheckedBooleanSupplier FALSE_SUPPLIER = new UncheckedBooleanSupplier() { @Override public boolean get() { return false; } };
/** * A supplier which always returns {@code true} and never throws. */ UncheckedBooleanSupplier TRUE_SUPPLIER = new UncheckedBooleanSupplier() { @Override public boolean get() { return true; } };}
//MessageSizeEstimator
package io.netty.channel;
/** * Responsible to estimate size of a message. The size represent how much memory the message will ca. reserve in * memory. */public interface MessageSizeEstimator {
/** * Creates a new handle. The handle provides the actual operations. */ Handle newHandle();
interface Handle {
/** * Calculate the size of the given message. *计算给定消息的大小 * @param msg The message for which the size should be calculated * @return size The size in bytes. The returned size must be >= 0 */ int size(Object msg); }}
netty 通道配置接口定义相关推荐
- netty 默认通道配置初始化
netty 通道接口定义:[url]http://donald-draper.iteye.com/blog/2392740[/url] netty 抽象通道初始化:[url]http://donald ...
- Fabric 1.0源代码分析(6)configtx(配置交易) #ChannelConfig(通道配置)
# Fabric 1.0源代码笔记 之 configtx(配置交易) #ChannelConfig(通道配置) ## 1.ChannelConfig概述 ChannelConfig代码分布在commo ...
- 兄弟连区块链教程Fabric1.0源代码分析configupdate处理通道配置更新
区块链教程Fabric1.0源代码分析configupdate处理通道配置更新,2018年下半年,区块链行业正逐渐褪去发展之初的浮躁.回归理性,表面上看相关人才需求与身价似乎正在回落.但事实上,正是初 ...
- Fabric 1.0源代码分析(7)configtx(配置交易) #configtxgen(生成通道配置)
# Fabric 1.0源代码笔记 之 configtx(配置交易) #configtxgen(生成通道配置) ## 1.configtxgen概述 configtxgen,用于生成通道配置,具体有如 ...
- Fabric 1.0源代码分析(27) Orderer #configupdate(处理通道配置更新)
# Fabric 1.0源代码笔记 之 Orderer #configupdate(处理通道配置更新) ## 1.configupdate概述 configupdate,用于接收配置交易,并处理通道配 ...
- sata7p 定义_PATA和SATA的接口定义简析
ATA,全称Advanced Technology Attachment,翻译过来叫"高技术配置",是一种硬盘接口,被广泛使用. PATA早就随着SATA的普及而走入历史,在这个S ...
- 3GPP接口定义及相关协议一, 手机通信专有名词中英文对照
1 3GPP接口定义及相关协议一览 Controlling Radio Network Controller (CRNC) A role an RNC can take with respect to ...
- ANT无线通信技术(2) 通道配置
一.ANT通道介绍 ANT通道的配置是ANT应用开发中的关键.官方文档中的叫法是"channel",之所以我翻译为"通道",而不是"信道"或 ...
- vue项目接口地址暴露_vue项目打包后可以配置接口地址的方法
因为项目测试的需要,我需要给测试人员一个项目的测试环境版本和生产环境版本,两个版本的区别就是请求的接口地址不一样而已,一开始,我是在webpack配置了不同的命令,让测试去打包,比如:使用npm ru ...
最新文章
- java画满天星_java_java实现的满天星效果实例,本文实例讲述了java实现满天星 - phpStudy...
- 使用StateServer机制来解决session丢失而造成用户验证失败
- 2018年第九届蓝桥杯C/C++ C组国赛 —— 第四题:约瑟夫环
- 过滤设置_深圳外置鱼池过滤器零售_杰蒙尼鱼池过滤器
- 实战MEF(3):只导出类的成员
- MaxScript 例子 渲染
- 三种Web服务交互方案
- Mysql个人学习总结
- 《数据结构》第十篇、线性表中的链式存储结构--双链表
- 使用证件照研究院接口实现制作证件照
- 昆石VOS3000/VOS2009 V2.1.7.01/V2.1.7.03 操作指南
- MobaXterm登录密码重置
- JavaScript入门小试,水仙花数的辨别以及再深入学习一些定义区间。
- 具统计全国淘宝村增至211个 直接就业超过28万人
- 基于轻量级CNN的12306验证码识别分析系统
- java计算机毕业设计springboot+vue南天在线求助系统
- 【安卓开发】数据库Room框架的学习和使用
- 锡铋-铋锡-焊锡的缺点
- 门户通专访免费吧站长:做免费领域的小百度
- 2020.10.11--PS--矢量蒙版图层蒙版区别、文字工具、文字与路径
热门文章
- 关于unity打包时出现插件DLL错误: The Assembly UnityEditor is referenced by Population解决方法
- halcon视觉 一维、二维码的区别-@龙熙视觉培训李杰
- vbscript+asp编写接口
- 计算机控制器和主控芯片,看完这三点让你完全了解微控制器与微处理器的差别?...
- S7-300PLC顺序控制指令与程序编写实例
- 妙法删除多余Windows XP管理员账户
- 无线WiFi安全渗透与攻防(三)之Windows扫描wifi和破解WiFi密码
- Java正则表达式(一)、抓取网页email地址实例
- 如何解决version `GLIBCXX_3.4.29‘ not found的问题
- 无处不在的Oracle数据库控制文件备份