前一篇博客,我简单提了下怎么为NIO2增加TransmitFile支持,文件传送吞吐量是一个性能关注点,此外,并发连接数也是重要的关注点。

不过JDK7中又一次做了简单的实现,不支持同时投递多个AcceptEx请求,只支持一次一个,返回后再投递。这样,客户端连接的接受速度必然大打折扣。不知道为什么sun会做这样的实现,WSASend()/WSAReceive()一次只允许一个还是可以理解,毕竟简化了编程,不用考虑封包乱序问题。

也降低了内存耗尽的风险。AcceptEx却没有这样的理由了。

于是再一次为了性能,我增加了同时投递多个的支持。

另外,在JDK7的默认实现中,AcceptEx返回后,为了设置远程和本地InetSocketAddress也采用了效率很低的方法。4次通过JNI调用getsockname,2次为了取sockaddr,2次为了取port. 这些操作本人采用GetAcceptExSockaddrs一次完成,进一步提高效率。

先看Java部分的代码,框架跟JDK7的一样,细节处理不一样:

/**

*

*/

package sun.nio.ch;

import java.io.IOException;

import java.lang.reflect.Field;

import java.lang.reflect.Method;

import java.net.InetAddress;

import java.net.InetSocketAddress;

import java.nio.channels.AcceptPendingException;

import java.nio.channels.AsynchronousCloseException;

import java.nio.channels.AsynchronousServerSocketChannel;

import java.nio.channels.AsynchronousSocketChannel;

import java.nio.channels.ClosedChannelException;

import java.nio.channels.CompletionHandler;

import java.nio.channels.NotYetBoundException;

import java.nio.channels.ShutdownChannelGroupException;

import java.security.AccessControlContext;

import java.security.AccessController;

import java.security.PrivilegedAction;

import java.util.Queue;

import java.util.concurrent.ConcurrentLinkedQueue;

import java.util.concurrent.Future;

import java.util.concurrent.atomic.AtomicBoolean;

import java.util.concurrent.atomic.AtomicInteger;

import sun.misc.Unsafe;

/**

* This class enable multiple 'AcceptEx' post on the completion port, hence improve the concurrent connection number.

* @author Yvon

*

*/

public class WindowsMultiAcceptSupport {

WindowsAsynchronousServerSocketChannelImpl schannel;

private static final Unsafe unsafe = Unsafe.getUnsafe();

// 2 * (sizeof(SOCKET_ADDRESS) + 16)

private static final int ONE_DATA_BUFFER_SIZE = 88;

private long handle;

private Iocp iocp;

// typically there will be zero, or one I/O operations pending. In rare

// cases there may be more. These rare cases arise when a sequence of accept

// operations complete immediately and handled by the initiating thread.

// The corresponding OVERLAPPED cannot be reused/released until the completion

// event has been posted.

private PendingIoCache ioCache;

private Queue dataBuffers;

// the data buffer to receive the local/remote socket address

//        private final long dataBuffer;

private AtomicInteger pendingAccept;

private int maxPending;

Method updateAcceptContextM;

Method acceptM;

WindowsMultiAcceptSupport() {

//dummy for JNI code.

}

public void close() throws IOException {

schannel.close();

for (int i = 0; i

{

long addr = dataBuffers.poll();

// release  resources

unsafe.freeMemory(addr);

}

}

/**

*

*/

public WindowsMultiAcceptSupport(AsynchronousServerSocketChannel ch, int maxPost) {

if (maxPost <= 0 || maxPost > 1024)

throw new IllegalStateException("maxPost can't less than 1 and greater than 1024");

this.schannel = (WindowsAsynchronousServerSocketChannelImpl) ch;

maxPending = maxPost;

dataBuffers = new ConcurrentLinkedQueue();

for (int i = 0; i

dataBuffers.add(unsafe.allocateMemory(ONE_DATA_BUFFER_SIZE));

}

pendingAccept = new AtomicInteger(0);

try {

Field f = WindowsAsynchronousServerSocketChannelImpl.class.getDeclaredField("handle");

f.setAccessible(true);

handle = f.getLong(schannel);

f = WindowsAsynchronousServerSocketChannelImpl.class.getDeclaredField("iocp");

f.setAccessible(true);

iocp = (Iocp) f.get(schannel);

f = WindowsAsynchronousServerSocketChannelImpl.class.getDeclaredField("ioCache");

f.setAccessible(true);

ioCache = (PendingIoCache) f.get(schannel);

f = WindowsAsynchronousServerSocketChannelImpl.class.getDeclaredField("accepting");

f.setAccessible(true);

AtomicBoolean accepting = (AtomicBoolean) f.get(schannel);

accepting.set(true);//disable accepting by origin channel.

} catch (Exception e) {

e.printStackTrace();

}

}

@SuppressWarnings("unchecked")

public final  void accept(A attachment,

CompletionHandler handler) {

if (handler == null)

throw new NullPointerException("'handler' is null");

implAccept(attachment, (CompletionHandler) handler);

}

/**

* Task to initiate accept operation and to handle result.

*/

private class AcceptTask implements Runnable, Iocp.ResultHandler {

private final WindowsAsynchronousSocketChannelImpl channel;

private final AccessControlContext acc;

private final PendingFuture result;

private final long dataBuffer;

AcceptTask(WindowsAsynchronousSocketChannelImpl channel, AccessControlContext acc,

long dataBuffer, PendingFuture result) {

this.channel = channel;

this.acc = acc;

this.result = result;

this.dataBuffer = dataBuffer;

}

void enableAccept() {

pendingAccept.decrementAndGet();

dataBuffers.add(dataBuffer);

}

void closeChildChannel() {

try {

channel.close();

} catch (IOException ignore) {

}

}

// caller must have acquired read lock for the listener and child channel.

void finishAccept() throws IOException {

/**

* JDK7 use 4 calls to getsockname  to setup

* local& remote address, this is very inefficient.

*

* I change this to use GetAcceptExSockaddrs

*/

InetAddress[] socks = new InetAddress[2];

int[] ports = new int[2];

updateAcceptContext(handle, channel.handle(), socks, ports, dataBuffer);

InetSocketAddress local = new InetSocketAddress(socks[0], ports[0]);

final InetSocketAddress remote = new InetSocketAddress(socks[1], ports[1]);

channel.setConnected(local, remote);

// permission check (in context of initiating thread)

if (acc != null) {

AccessController.doPrivileged(new PrivilegedAction() {

public Void run() {

SecurityManager sm = System.getSecurityManager();

sm.checkAccept(remote.getAddress().getHostAddress(), remote.getPort());

return null;

}

}, acc);

}

}

/**

* Initiates the accept operation.

*/

@Override

public void run() {

long overlapped = 0L;

try {

// begin usage of listener socket

schannel.begin();

try {

// begin usage of child socket (as it is registered with

// completion port and so may be closed in the event that

// the group is forcefully closed).

channel.begin();

synchronized (result) {

overlapped = ioCache.add(result);

int n = accept0(handle, channel.handle(), overlapped, dataBuffer);//Be careful for the buffer address

if (n == IOStatus.UNAVAILABLE) {

return;

}

// connection accepted immediately

finishAccept();

// allow another accept before the result is set

enableAccept();

result.setResult(channel);

}

} finally {

// end usage on child socket

channel.end();

}

} catch (Throwable x) {

// failed to initiate accept so release resources

if (overlapped != 0L)

ioCache.remove(overlapped);

closeChildChannel();

if (x instanceof ClosedChannelException)

x = new AsynchronousCloseException();

if (!(x instanceof IOException) && !(x instanceof SecurityException))

x = new IOException(x);

enableAccept();

result.setFailure(x);

} finally {

// end of usage of listener socket

schannel.end();

}

// accept completed immediately but may not have executed on

// initiating thread in which case the operation may have been

// cancelled.

if (result.isCancelled()) {

closeChildChannel();

}

// invoke completion handler

Invoker.invokeIndirectly(result);

}

/**

* Executed when the I/O has completed

*/

@Override

public void completed(int bytesTransferred, boolean canInvokeDirect) {

try {

// connection accept after group has shutdown

if (iocp.isShutdown()) {

throw new IOException(new ShutdownChannelGroupException());

}

// finish the accept

try {

schannel.begin();

try {

channel.begin();

finishAccept();

} finally {

channel.end();

}

} finally {

schannel.end();

}

// allow another accept before the result is set

enableAccept();

result.setResult(channel);

} catch (Throwable x) {

enableAccept();

closeChildChannel();

if (x instanceof ClosedChannelException)

x = new AsynchronousCloseException();

if (!(x instanceof IOException) && !(x instanceof SecurityException))

x = new IOException(x);

result.setFailure(x);

}

// if an async cancel has already cancelled the operation then

// close the new channel so as to free resources

if (result.isCancelled()) {

closeChildChannel();

}

// invoke handler (but not directly)

Invoker.invokeIndirectly(result);

}

@Override

public void failed(int error, IOException x) {

enableAccept();

closeChildChannel();

// release waiters

if (schannel.isOpen()) {

result.setFailure(x);

} else {

result.setFailure(new AsynchronousCloseException());

}

Invoker.invokeIndirectly(result);

}

}

Future implAccept(Object attachment,

final CompletionHandler handler) {

if (!schannel.isOpen()) {

Throwable exc = new ClosedChannelException();

if (handler == null)

return CompletedFuture.withFailure(exc);

Invoker.invokeIndirectly(schannel, handler, attachment, null, exc);

return null;

}

if (schannel.isAcceptKilled())

throw new RuntimeException("Accept not allowed due to cancellation");

// ensure channel is bound to local address

if (schannel.localAddress == null)

throw new NotYetBoundException();

// create the socket that will be accepted. The creation of the socket

// is enclosed by a begin/end for the listener socket to ensure that

// we check that the listener is open and also to prevent the I/O

// port from being closed as the new socket is registered.

WindowsAsynchronousSocketChannelImpl ch = null;

IOException ioe = null;

try {

schannel.begin();

ch = new WindowsAsynchronousSocketChannelImpl(iocp, false);

} catch (IOException x) {

ioe = x;

} finally {

schannel.end();

}

if (ioe != null) {

if (handler == null)

return CompletedFuture.withFailure(ioe);

Invoker.invokeIndirectly(this.schannel, handler, attachment, null, ioe);

return null;

}

// need calling context when there is security manager as

// permission check may be done in a different thread without

// any application call frames on the stack

AccessControlContext acc =

(System.getSecurityManager() == null) ? null : AccessController.getContext();

PendingFuture result =

new PendingFuture(schannel, handler, attachment);

// check and set flag to prevent concurrent accepting

if (pendingAccept.get() >= maxPending)

throw new AcceptPendingException();

pendingAccept.incrementAndGet();

AcceptTask task = new AcceptTask(ch, acc, dataBuffers.poll(), result);

result.setContext(task);

// initiate I/O

if (Iocp.supportsThreadAgnosticIo()) {

task.run();

} else {

Invoker.invokeOnThreadInThreadPool(this.schannel, task);

}

return result;

}

//    //reimplements for performance

static native void updateAcceptContext(long listenSocket, long acceptSocket,

InetAddress[] addresses, int[] ports, long dataBuffer) throws IOException;

static native int accept0(long handle, long handle2, long overlapped, long dataBuffer);

}

对应的CPP代码如下:

/*

* Class:     sun_nio_ch_WindowsMultiAcceptSupport

* Method:    updateAcceptContext

* Signature: (JJ[Ljava/net/InetAddress;[IJ)V

*/

JNIEXPORT void JNICALL Java_sun_nio_ch_WindowsMultiAcceptSupport_updateAcceptContext

(JNIEnv *env , jclass clazz, jlong listenSocket, jlong acceptSocket, jobjectArray sockArray,jintArray portArray,jlong buf)

{

SOCKET s1 = (SOCKET)jlong_to_ptr(listenSocket);

SOCKET s2 = (SOCKET)jlong_to_ptr(acceptSocket);

PVOID outputBuffer = (PVOID)jlong_to_ptr(buf);

INT iLocalAddrLen=0;

INT iRemoteAddrLen=0;

SOCKETADDRESS* lpLocalAddr;

SOCKETADDRESS* lpRemoteAddr;

jobject localAddr;

jobject remoteAddr;

jint ports[2]={0};

setsockopt(s2, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, (char *)&s1, sizeof(s1));

(lpGetAcceptExSockaddrs)(outputBuffer,

0,

sizeof(SOCKETADDRESS)+16,

sizeof(SOCKETADDRESS)+16,

(LPSOCKADDR*)&lpLocalAddr,

&iLocalAddrLen,

(LPSOCKADDR*)&lpRemoteAddr,

&iRemoteAddrLen);

localAddr=lpNET_SockaddrToInetAddress(env,(struct sockaddr *)lpLocalAddr,(int *)ports);

remoteAddr=lpNET_SockaddrToInetAddress(env,(struct sockaddr *)lpRemoteAddr,(int *)(ports+1));

env->SetObjectArrayElement(sockArray,0,localAddr);

env->SetObjectArrayElement(sockArray,1,remoteAddr);

env->SetIntArrayRegion(portArray,0,2,ports);

}

/*

* Class:     sun_nio_ch_WindowsMultiAcceptSupport

* Method:    accept0

* Signature: (JJJJ)I

*/

jint JNICALL Java_sun_nio_ch_WindowsMultiAcceptSupport_accept0

(JNIEnv *env, jclass clazz, jlong listenSocket, jlong acceptSocket, jlong ov, jlong buf)

{

BOOL res;

SOCKET s1 = (SOCKET)jlong_to_ptr(listenSocket);

SOCKET s2 = (SOCKET)jlong_to_ptr(acceptSocket);

PVOID outputBuffer = (PVOID)jlong_to_ptr(buf);

DWORD nread = 0;

OVERLAPPED* lpOverlapped = (OVERLAPPED*)jlong_to_ptr(ov);

ZeroMemory((PVOID)lpOverlapped, sizeof(OVERLAPPED));

//why use SOCKETADDRESS?

//because client may use IPv6 to connect to server.

res = (lpAcceptEx)(s1,

s2,

outputBuffer,

0,

sizeof(SOCKETADDRESS)+16,

sizeof(SOCKETADDRESS)+16,

&nread,

lpOverlapped);

if (res == 0) {

int error = WSAGetLastError();

if (error == ERROR_IO_PENDING) {

return NIO2_IOS_UNAVAILABLE;

}

return NIO2_THROWN;

}

return 0;

}

这里用到的lpNET_SockaddrToInetAddress是JDK7中NET.DLL暴露的方法,从DLL里加载。相应代码如下:

*

* Class:     com_yovn_jabhttpd_utilities_SunPackageFixer

* Method:    initFds

* Signature: ()V

*/

JNIEXPORT void JNICALL Java_com_yovn_jabhttpd_utilities_SunPackageFixer_initFds

(JNIEnv *env, jclass clazz)

{

GUID GuidAcceptEx = WSAID_ACCEPTEX;

GUID GuidTransmitFile = WSAID_TRANSMITFILE;

GUID GuidGetAcceptExSockAddrs = WSAID_GETACCEPTEXSOCKADDRS;

SOCKET s;

int rv;

DWORD dwBytes;

HMODULE hModule;

s = socket(AF_INET, SOCK_STREAM, 0);

if (s == INVALID_SOCKET) {

JNU_ThrowByName(env,"java/io/IOException", "socket failed");

return;

}

rv = WSAIoctl(s,

SIO_GET_EXTENSION_FUNCTION_POINTER,

(LPVOID)&GuidAcceptEx,

sizeof(GuidAcceptEx),

&lpAcceptEx,

sizeof(lpAcceptEx),

&dwBytes,

NULL,

NULL);

if (rv != 0)

{

JNU_ThrowByName(env, "java/io/IOException","WSAIoctl failed on get AcceptEx ");

goto _ret;

}

rv = WSAIoctl(s,

SIO_GET_EXTENSION_FUNCTION_POINTER,

(LPVOID)&GuidTransmitFile,

sizeof(GuidTransmitFile),

&lpTransmitFile,

sizeof(lpTransmitFile),

&dwBytes,

NULL,

NULL);

if (rv != 0)

{

JNU_ThrowByName(env, "java/io/IOException","WSAIoctl failed on get TransmitFile");

goto _ret;

}

rv = WSAIoctl(s,

SIO_GET_EXTENSION_FUNCTION_POINTER,

(LPVOID)&GuidGetAcceptExSockAddrs,

sizeof(GuidGetAcceptExSockAddrs),

&lpGetAcceptExSockaddrs,

sizeof(lpGetAcceptExSockaddrs),

&dwBytes,

NULL,

NULL);

if (rv != 0)

{

JNU_ThrowByName(env, "java/io/IOException","WSAIoctl failed on get GetAcceptExSockaddrs");

goto _ret;

}

hModule=LoadLibrary("net.dll");

if(hModule==NULL)

{

JNU_ThrowByName(env, "java/io/IOException","can't load java net.dll");

goto _ret;

}

lpNET_SockaddrToInetAddress=(NET_SockaddrToInetAddress_t)GetProcAddress(hModule,"_NET_SockaddrToInetAddress@12");

if(lpNET_SockaddrToInetAddress==NULL)

{

JNU_ThrowByName(env, "java/io/IOException","can't resolve _NET_SockaddrToInetAddress function ");

}

_ret:

closesocket(s);

return;

}

细心的同学可能会发现,在创建socket之前没有初始化WinSock库,因为在这段代码前,我初始化了一个InetSocketAddress对象,这样JVM会加载NET.DLL并初始化WinSock库了。

OK,现在,你可以在支持类上同时发起多个AcceptEx请求了。

PS:基于这个我简单测试了下我的服务器,同时开5000个线程,每个下载3M多点的文件,一分钟内能够全部正确完成。

服务器正在开发中,有兴趣的请加入:http://code.google.com/p/jabhttpd

java nio2 iocp_基于JDK7 NIO2的高性能web服务器实践之二(转)相关推荐

  1. 基于JDK7 NIO2的高性能web服务器实践之二(转)

    前一篇博客,我简单提了下怎么为NIO2增加TransmitFile支持,文件传送吞吐量是一个性能关注点,此外,并发连接数也是重要的关注点. 不过JDK7中又一次做了简单的实现,不支持同时投递多个Acc ...

  2. nginx高性能WEB服务器系列之九--nginx运维故障日常解决方案

    nginx系列友情链接: nginx高性能WEB服务器系列之一简介及安装 https://www.cnblogs.com/maxtgood/p/9597596.html nginx高性能WEB服务器系 ...

  3. nginx高性能WEB服务器系列之七--nginx反向代理

    nginx系列友情链接: nginx高性能WEB服务器系列之一简介及安装 https://www.cnblogs.com/maxtgood/p/9597596.html nginx高性能WEB服务器系 ...

  4. Nginx高性能Web服务器实战教程PDF

    网站 更多书籍点击进入>> CiCi岛 下载 电子版仅供预览及学习交流使用,下载后请24小时内删除,支持正版,喜欢的请购买正版书籍 电子书下载(皮皮云盘-点击"普通下载" ...

  5. Warp : Haskell 的高性能 Web 服务器(译文)

    Warp : Haskell 的高性能 Web 服务器(译文) 按 GHC 7.8 马上就要发布了.一个很大的改进就是加入了本文所说的并行 IO 管理器.从此之后 Haskell 在高性能服务器领域将 ...

  6. 分享关于搭建高性能WEB服务器的一篇文章

    这篇文章主要介绍了Centos5.4+Nginx-0.8.50+UWSGI-0.9.6.2+Django-1.2.3搭建高性能WEB服务器的相关资料,需要的朋友可以参考下(http://m.0813s ...

  7. LEMP构建高性能WEB服务器(第三版)

    LEMP 自动化编译脚本下载:http://docs.linuxtone.org/autoinstall/ (定期更新,欢迎多测试,找bug) 介绍参考:http://bbs.linuxtone.or ...

  8. 基于epoll实现简单的web服务器

    1. 简介 epoll 是 Linux 平台下特有的一种 I/O 复用模型实现,于 2002 年在 Linux kernel 2.5.44 中被引入.在 epoll 之前,Unix/Linux 平台下 ...

  9. arm Linux 低成本方案,参赛作品《低成本基于ARM+Linux平台搭建web服务器的物联网学习板》...

    [报名阶段需要填写的内容] 1. 参赛者姓名(必填项): 王徕泽 2. 单位或学校名称(选填项): 徕泽电子工作室 3. 当前职务或职称(选填项): 室长 4. 参赛作品的名字(必填项): 低成本基于 ...

最新文章

  1. onethink的熟悉
  2. Python:TypeError: 'int' object is not callable
  3. 点击按钮测试用例标题_功能测试有哪些用例?分享功能测试用例大全
  4. 盒马鲜生,快而准确的秘密!
  5. nginx模块开发—HTTP初始化之listen
  6. 【报告分享】2020中国低代码平台指数测评报告.pdf(附下载链接)
  7. 借给朋友两万块钱,已经两年,每次要钱都各种借口,我该怎么办?
  8. Flutter基础—手势处理
  9. Alpha、Beta、RC、GA、RTM、OEM等版本的解释
  10. 数字图像处理笔记⑧——纹理分析
  11. 【今日CV 计算机视觉论文速览 第108期】Tue, 30 Apr 2019
  12. 国家央行数字货币的优势与挑战
  13. 忘记了mysql的root密码(分享:重置密码过程)
  14. 【基因调控网络】Gene regulatory networks modelling using a dynamic evolutionary hybrid(ENFRN ,动态进化混合模型2010)
  15. [UE5蓝图基础一]13.类似”人类一败涂地”掉落一定距离会回到空中 最终着落点还是设定地形上
  16. Android View的事件分发机制和滑动冲突解决方案
  17. 2021安徽高考体考成绩查询,2021年安徽体育专业考试成绩查询网址:https://www.ahzsks.cn/...
  18. Python脚本操作Excel实现批量替换
  19. setTimeout开始,暂停,停止功能
  20. Python 入门经典必背的 18 个程序,学完就入门 Python 了

热门文章

  1. Linux环境搭建:设置主机名颜色、设置vim颜色
  2. netty系列之:文本聊天室
  3. 深入理解Java类加载机制
  4. memcached和redis的区别
  5. c语言设计第六章答案,c语言第六章 循环结构程序设计(习题册答案)
  6. java如何输出指定两个日期之间的所有日期
  7. Java读取Excel内容
  8. Android 通过字符串来获取R下面资源的ID 值 文字资源
  9. 利用JNative实现Java调用动态库
  10. 计算机应用基础第二版在线作业c,计算机应用基础作业二(答案)