第三章 通道

通道(Channel)是 java.nio 的第二个主要创新。它们既不是一个扩展也不是一项增强,而 是全新、极好的 Java I/O 示例,提供与 I/O 服务的直接连接。Channel 用于在字节缓冲区和位于通 道另一侧的实体(通常是一个文件或套接字)之间有效地传输数据。

通道可以形象地比喻为银行出纳窗口使用的气动导管。您的薪水支票就是您要传送的信息,载 体(Carrier)就好比一个缓冲区。您先填充缓冲区(将您的支票放到载体上),接着将缓冲“写”到 通道中(将载体丢进导管中),然后信息负载就被传递到通道另一侧的 I/O 服务(银行出纳员)。

出纳员填充缓冲区(将您的收据放到载体上),接着开始一个反方向的通道 传输(将载体丢回到导管中)。载体就到了通道的您这一侧(一个填满了的缓冲区正等待您的查 验),然后您就会 flip 缓冲区(打开盖子)并将它清空(移除您的收据)。现在您可以开车走了, 下一个对象(银行客户)将使用同样的载体(Buffer)和导管(Channel)对象来重复上述过程。

多数情况下,通道与操作系统的文件描述符(File Descriptor)和文件句柄(File Handle)有着 一对一的关系。虽然通道比文件描述符更广义,但您将经常使用到的多数通道都是连接到开放的文 件描述符的。Channel 类提供维持平台独立性所需的抽象过程,不过仍然会模拟现代操作系统本身 的 I/O 性能。

通道是一种途径,借助该途径,可以用最小的总开销来访问操作系统本身的 I/O 服务。缓冲区 则是通道内部用来发送和接收数据的端点。 (见图 3-1)

观察图 3-2 所示的 UML 类图会发现,channel 类的继承关系要比 buffer 类复杂一些。Channel 类相互之间的关系更复杂,并且部分 channel 类依赖于在 java.nio.channels.spi 子包中定义的类。本章我们将对该困惑进行澄清。通道 SPI 归纳参见附录 B。

1.通道基础

/** Copyright (c) 2000, 2013, Oracle and/or its affiliates. All rights reserved.* ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.*********************/package java.nio.channels;import java.io.IOException;
import java.io.Closeable;/*** A nexus for I/O operations.** <p> A channel represents an open connection to an entity such as a hardware* device, a file, a network socket, or a program component that is capable of* performing one or more distinct I/O operations, for example reading or* writing.** <p> A channel is either open or closed.  A channel is open upon creation,* and once closed it remains closed.  Once a channel is closed, any attempt to* invoke an I/O operation upon it will cause a {@link ClosedChannelException}* to be thrown.  Whether or not a channel is open may be tested by invoking* its {@link #isOpen isOpen} method.** <p> Channels are, in general, intended to be safe for multithreaded access* as described in the specifications of the interfaces and classes that extend* and implement this interface.*** @author Mark Reinhold* @author JSR-51 Expert Group* @since 1.4*/public interface Channel extends Closeable {/*** Tells whether or not this channel is open.** @return <tt>true</tt> if, and only if, this channel is open*/public boolean isOpen();/*** Closes this channel.** <p> After a channel is closed, any further attempt to invoke I/O* operations upon it will cause a {@link ClosedChannelException} to be* thrown.** <p> If this channel is already closed then invoking this method has no* effect.** <p> This method may be invoked at any time.  If some other thread has* already invoked it, however, then another invocation will block until* the first invocation is complete, after which it will return without* effect. </p>** @throws  IOException  If an I/O error occurs*/public void close() throws IOException;}

与缓冲区不同,通道 API 主要由接口指定。不同的操作系统上通道实现(Channel Implementation)会有根本性的差异,所以通道 API 仅仅描述了可以做什么。因此很自然地,通道 实现经常使用操作系统的本地代码。通道接口允许您以一种受控且可移植的方式来访问底层的 I/O 服务。

您可以从顶层的 Channel 接口看到,对所有通道来说只有两种共同的操作:检查一个通道是否 打开(IsOpen())和关闭一个打开的通道(close())。图 3-2 显示,所有有趣的东西都是那些实现 Channel 接口以及它的子接口的类。

InterruptibleChannel 是一个标记接口,当被通道使用时可以标示该通道是可以中断的 (Interruptible)。如果连接可中断通道的线程被中断,那么该通道会以特别的方式工作,关于这一 点我们会在 3.1.3 节中进行讨论。大多数但非全部的通道都是可以中断的。

从 Channel 接口引申出的其他接口都是面向字节的子接口,包括 Writable ByteChannel 和 ReadableByteChannel。这也正好支持了我们之前所学的:通道只能在字节缓冲区上操作。层次结构 表明其他数据类型的通道也可以从 Channel 接口引申而来。这是一种很好的类设计,不过非字节实 现是不可能的,因为操作系统都是以字节的形式实现底层 I/O 接口的。

观察图 3-2,您还会发现类层次结构中有两个类位于一个不同的包: java.nio.channels.spi。这两个类是 AbstractInterruptibleChannel 和 AbstractSelectableChannel,它们分别为可中断的(interruptible)和可选择的(selectable)的通道实 现提供所需的常用方法。尽管描述通道行为的接口都是在 java.nio.channels 包中定义的,不 过具体的通道实现却都是从 java.nio.channels.spi 中的类引申来的。这使得他们可以访问受 保护的方法,而这些方法普通的通道用户永远都不会调用。

作为通道的一个使用者,您可以放心地忽视 SPI 包中包含的中间类。这种有点费解的继承层次 只会让那些使用新通道的用户感兴趣。SPI 包允许新通道实现以一种受控且模块化的方式被植入到Java 虚拟机上。这意味着可以使用专为某种操作系统、文件系统或应用程序而优化的通道来使性能 最大化。

1)打开通道

通道是访问 I/O 服务的导管。正如我们在第一章中所讨论的,I/O 可以分为广义的两大类别: File I/O 和 Stream I/O。那么相应地有两种类型的通道也就不足为怪了,它们是文件(file)通道和 套接字(socket)通道。如果您参考一下图 3-2,您就会发现有一个 FileChannel 类和三个 socket 通 道类:SocketChannel、ServerSocketChannel 和 DatagramChannel。

通道可以以多种方式创建。Socket 通道有可以直接创建新 socket 通道的工厂方法。但是一个 FileChannel 对象却只能通过在一个打开的 RandomAccessFile、FileInputStream 或 FileOutputStream 对象上调用 getChannel( )方法来获取。您不能直接创建一个 FileChannel 对象。File 和 socket 通道会 在后面的章节中予以详细讨论。

SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("somehost", someport));
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.socket().bind(new InetSocketAddress(somelocalport));
DatagramChannel dc = DatagramChannel.open();
RandomAccessFile raf = new RandomAccessFile("somefile", "r");
FileChannel fc = raf.getChannel();

在 3.5 节中您会发现,java.net 的 socket 类也有新的 getChannel( )方法。这些方法虽然 能返回一个相应的 socket 通道对象,但它们却并非新通道的来源, RandomAccessFile.getChannel( )方法才是。只有在已经有通道存在的时候,它们才返回与 一个 socket 关联的通道;它们永远不会创建新通道。

2)使用通道

通道将数据传输给 ByteBuffer 对象或者从 ByteBuffer 对象 获取数据进行传输。

将图 3-2 中大部分零乱内容移除可以得到图 3-3 所示的 UML 类图。子接口 API 代码如下:

/** Copyright (c) 2000, 2001, Oracle and/or its affiliates. All rights reserved.* ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.*********************/package java.nio.channels;import java.io.IOException;
import java.nio.ByteBuffer;/*** A channel that can read bytes.** <p> Only one read operation upon a readable channel may be in progress at* any given time.  If one thread initiates a read operation upon a channel* then any other thread that attempts to initiate another read operation will* block until the first operation is complete.  Whether or not other kinds of* I/O operations may proceed concurrently with a read operation depends upon* the type of the channel. </p>*** @author Mark Reinhold* @author JSR-51 Expert Group* @since 1.4*/public interface ReadableByteChannel extends Channel {/*** Reads a sequence of bytes from this channel into the given buffer.** <p> An attempt is made to read up to <i>r</i> bytes from the channel,* where <i>r</i> is the number of bytes remaining in the buffer, that is,* <tt>dst.remaining()</tt>, at the moment this method is invoked.** <p> Suppose that a byte sequence of length <i>n</i> is read, where* <tt>0</tt>&nbsp;<tt>&lt;=</tt>&nbsp;<i>n</i>&nbsp;<tt>&lt;=</tt>&nbsp;<i>r</i>.* This byte sequence will be transferred into the buffer so that the first* byte in the sequence is at index <i>p</i> and the last byte is at index* <i>p</i>&nbsp;<tt>+</tt>&nbsp;<i>n</i>&nbsp;<tt>-</tt>&nbsp;<tt>1</tt>,* where <i>p</i> is the buffer's position at the moment this method is* invoked.  Upon return the buffer's position will be equal to* <i>p</i>&nbsp;<tt>+</tt>&nbsp;<i>n</i>; its limit will not have changed.** <p> A read operation might not fill the buffer, and in fact it might not* read any bytes at all.  Whether or not it does so depends upon the* nature and state of the channel.  A socket channel in non-blocking mode,* for example, cannot read any more bytes than are immediately available* from the socket's input buffer; similarly, a file channel cannot read* any more bytes than remain in the file.  It is guaranteed, however, that* if a channel is in blocking mode and there is at least one byte* remaining in the buffer then this method will block until at least one* byte is read.** <p> This method may be invoked at any time.  If another thread has* already initiated a read operation upon this channel, however, then an* invocation of this method will block until the first operation is* complete. </p>** @param  dst*         The buffer into which bytes are to be transferred** @return  The number of bytes read, possibly zero, or <tt>-1</tt> if the*          channel has reached end-of-stream** @throws  NonReadableChannelException*          If this channel was not opened for reading** @throws  ClosedChannelException*          If this channel is closed** @throws  AsynchronousCloseException*          If another thread closes this channel*          while the read operation is in progress** @throws  ClosedByInterruptException*          If another thread interrupts the current thread*          while the read operation is in progress, thereby*          closing the channel and setting the current thread's*          interrupt status** @throws  IOException*          If some other I/O error occurs*/public int read(ByteBuffer dst) throws IOException;}
/** Copyright (c) 2000, 2005, Oracle and/or its affiliates. All rights reserved.* ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.*********************/package java.nio.channels;import java.io.IOException;
import java.nio.ByteBuffer;/*** A channel that can write bytes.** <p> Only one write operation upon a writable channel may be in progress at* any given time.  If one thread initiates a write operation upon a channel* then any other thread that attempts to initiate another write operation will* block until the first operation is complete.  Whether or not other kinds of* I/O operations may proceed concurrently with a write operation depends upon* the type of the channel. </p>*** @author Mark Reinhold* @author JSR-51 Expert Group* @since 1.4*/public interface WritableByteChannelextends Channel
{/*** Writes a sequence of bytes to this channel from the given buffer.** <p> An attempt is made to write up to <i>r</i> bytes to the channel,* where <i>r</i> is the number of bytes remaining in the buffer, that is,* <tt>src.remaining()</tt>, at the moment this method is invoked.** <p> Suppose that a byte sequence of length <i>n</i> is written, where* <tt>0</tt>&nbsp;<tt>&lt;=</tt>&nbsp;<i>n</i>&nbsp;<tt>&lt;=</tt>&nbsp;<i>r</i>.* This byte sequence will be transferred from the buffer starting at index* <i>p</i>, where <i>p</i> is the buffer's position at the moment this* method is invoked; the index of the last byte written will be* <i>p</i>&nbsp;<tt>+</tt>&nbsp;<i>n</i>&nbsp;<tt>-</tt>&nbsp;<tt>1</tt>.* Upon return the buffer's position will be equal to* <i>p</i>&nbsp;<tt>+</tt>&nbsp;<i>n</i>; its limit will not have changed.** <p> Unless otherwise specified, a write operation will return only after* writing all of the <i>r</i> requested bytes.  Some types of channels,* depending upon their state, may write only some of the bytes or possibly* none at all.  A socket channel in non-blocking mode, for example, cannot* write any more bytes than are free in the socket's output buffer.** <p> This method may be invoked at any time.  If another thread has* already initiated a write operation upon this channel, however, then an* invocation of this method will block until the first operation is* complete. </p>** @param  src*         The buffer from which bytes are to be retrieved** @return The number of bytes written, possibly zero** @throws  NonWritableChannelException*          If this channel was not opened for writing** @throws  ClosedChannelException*          If this channel is closed** @throws  AsynchronousCloseException*          If another thread closes this channel*          while the write operation is in progress** @throws  ClosedByInterruptException*          If another thread interrupts the current thread*          while the write operation is in progress, thereby*          closing the channel and setting the current thread's*          interrupt status** @throws  IOException*          If some other I/O error occurs*/public int write(ByteBuffer src) throws IOException;}
/** Copyright (c) 2000, 2001, Oracle and/or its affiliates. All rights reserved.* ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.*********************/package java.nio.channels;import java.io.IOException;/*** A channel that can read and write bytes.  This interface simply unifies* {@link ReadableByteChannel} and {@link WritableByteChannel}; it does not* specify any new operations.** @author Mark Reinhold* @author JSR-51 Expert Group* @since 1.4*/public interface ByteChannelextends ReadableByteChannel, WritableByteChannel
{}

通道可以是单向(unidirectional)或者双向的(bidirectional)。一个 channel 类可能实现定义 read( )方法的 ReadableByteChannel 接口,而另一个 channel 类也许实现 WritableByteChannel 接口以 提供 write( )方法。实现这两种接口其中之一的类都是单向的,只能在一个方向上传输数据。如果 一个类同时实现这两个接口,那么它是双向的,可以双向传输数据。

图 3-3 显示了一个 ByteChannel 接口,该接口引申出了 ReadableByteChannel 和 WritableByteChannel 两个接口。ByteChannel 接口本身并不定义新的 API 方法,它是一种用来聚集 它自己以一个新名称继承的多个接口的便捷接口。根据定义,实现 ByteChannel 接口的通道会同时 实现 ReadableByteChannel 和 WritableByteChannel 两个接口,所以此类通道是双向的。这是简化类 定义的语法糖(syntactic sugar),它使得用操作器(operator)实例来测试通道对象变得更加简 单。

这是一种好的类设计技巧,如果您在写您自己的 Channel 实现的话,您可以适当地实现这些接 口。不过对于使用 java.nio.channels 包中标准通道类的程序员来说,这些接口并没有太大的 吸引力。假如您快速回顾一下图 3-2 或者向前跳跃到关于 file 和 socket 通道的章节,您将发现每一 个 file 或 socket 通道都实现全部三个接口。从类定义的角度而言,这意味着全部 file 和 socket 通道 对象都是双向的。这对于 sockets 不是问题,因为它们一直都是双向的,不过对于 files 却是个问题了。

一个文件可以在不同的时候以不同的权限打开。从 FileInputStream 对象的 getChannel( )方法获取的 FileChannel 对象是只读的,不过从接口声明的角度来看却是双向的,因为 FileChannel 实现 ByteChannel 接口。在这样一个通道上调用 write( )方法将抛出未经检查的 NonWritableChannelException 异常,因为 FileInputStream 对象总是以 read-only 的权限打开文件。

通道会连接一个特定 I/O 服务且通道实例(channel instance)的性能受它所连接的 I/O 服务的 特征限制,记住这很重要。一个连接到只读文件的 Channel 实例不能进行写操作,即使该实例所属 的类可能有 write( )方法。基于此,程序员需要知道通道是如何打开的,避免试图尝试一个底层 I/O 服务不允许的操作。

// A ByteBuffer named buffer contains data to be written
FileInputStream input = new FileInputStream(fileName);
FileChannel channel = input.getChannel();
// This will compile but will throw an IOException
// because the underlying file is read-only
channel.write(buffer);

ByteChannel 的 read( ) 和 write( )方法使用 ByteBuffer 对象作为参数。两种方法均返回已传输的 字节数,可能比缓冲区的字节数少甚至可能为零。缓冲区的位置也会发生与已传输字节相同数量的 前移。如果只进行了部分传输,缓冲区可以被重新提交给通道并从上次中断的地方继续传输。该过 程重复进行直到缓冲区的 hasRemaining( )方法返回 false 值。例 3-1 表示了如何从一个通道复制 数据到另一个通道。

例 3-1 在通道之间复制数据


package javatest.niotest;import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;/*** Test copying between channels.** @author Ron Hitchens (ron@ronsoft.com)*/
public class ChannelCopy {/*** This code copies data from stdin to stdout. Like the 'cat'* command, but without any useful options.*/public static void main(String[] argv) throws IOException {ReadableByteChannel source = Channels.newChannel(System.in);WritableByteChannel dest = Channels.newChannel(System.out);channelCopy1(source, dest);// alternatively, call channelCopy2 (source, dest);source.close();dest.close();}/*** Channel copy method 1. This method copies data from the src* channel and writes it to the dest channel until EOF on src.* <p>* This implementation makes use of compact( ) on the temp buffer* to pack down the data if the buffer wasn't fully drained. This* may result in data copying, but minimizes system calls. It also* requires a cleanup loop to make sure all the data gets sent.*/private static void channelCopy1(ReadableByteChannel src, WritableByteChannel dest) throws IOException {ByteBuffer buffer = ByteBuffer.allocateDirect(16 * 1024);while (src.read(buffer) != -1) {// Prepare the buffer to be drainedbuffer.flip();// Write to the channel; may blockdest.write(buffer);// If partial transfer, shift remainder down// If buffer is empty, same as doing clear( )buffer.compact();}// EOF will leave buffer in fill statebuffer.flip();// Make sure that the buffer is fully drainedwhile (buffer.hasRemaining()) {dest.write(buffer);}}/*** Channel copy method 2. This method performs the same copy, but* assures the temp buffer is empty before reading more data. This* never requires data copying but may result in more systems calls.* No post-loop cleanup is needed because the buffer will be empty* when the loop is exited.*/private static void channelCopy2(ReadableByteChannel src, WritableByteChannel dest) throws IOException {ByteBuffer buffer = ByteBuffer.allocateDirect(16 * 1024);while (src.read(buffer) != -1) {// Prepare the buffer to be drainedbuffer.flip();// Make sure that the buffer was fully drainedwhile (buffer.hasRemaining()) {dest.write(buffer);}// Make the buffer empty, ready for fillingbuffer.clear();}}
}

通道可以以阻塞(blocking)或非阻塞(nonblocking)模式运行。非阻塞模式的通道永远不会 让调用的线程休眠。请求的操作要么立即完成,要么返回一个结果表明未进行任何操作。只有面向 流的(stream-oriented)的通道,如 sockets 和 pipes 才能使用非阻塞模式。

从图 3-2 可以看出,socket 通道类从 SelectableChannel 引申而来。从 SelectableChannel 引申而 来的类可以和支持有条件的选择(readiness selectio)的选择器(Selectors)一起使用。将非阻塞 I/O 和选择器组合起来可以使您的程序利用多路复用 I/O(multiplexed I/O)。选择和多路复用将在 第四章中予以讨论。关于怎样将 sockets 置于非阻塞模式的细节会在 3.5 节中涉及。

3)关闭通道

与缓冲区不同,通道不能被重复使用。一个打开的通道即代表与一个特定 I/O 服务的特定连接 并封装该连接的状态。当通道关闭时,那个连接会丢失,然后通道将不再连接任何东西。

/** Copyright (c) 2000, 2013, Oracle and/or its affiliates. All rights reserved.* ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.*********************/package java.nio.channels;import java.io.IOException;
import java.io.Closeable;/*** A nexus for I/O operations.** <p> A channel represents an open connection to an entity such as a hardware* device, a file, a network socket, or a program component that is capable of* performing one or more distinct I/O operations, for example reading or* writing.** <p> A channel is either open or closed.  A channel is open upon creation,* and once closed it remains closed.  Once a channel is closed, any attempt to* invoke an I/O operation upon it will cause a {@link ClosedChannelException}* to be thrown.  Whether or not a channel is open may be tested by invoking* its {@link #isOpen isOpen} method.** <p> Channels are, in general, intended to be safe for multithreaded access* as described in the specifications of the interfaces and classes that extend* and implement this interface.*** @author Mark Reinhold* @author JSR-51 Expert Group* @since 1.4*/public interface Channel extends Closeable {/*** Tells whether or not this channel is open.** @return <tt>true</tt> if, and only if, this channel is open*/public boolean isOpen();/*** Closes this channel.** <p> After a channel is closed, any further attempt to invoke I/O* operations upon it will cause a {@link ClosedChannelException} to be* thrown.** <p> If this channel is already closed then invoking this method has no* effect.** <p> This method may be invoked at any time.  If some other thread has* already invoked it, however, then another invocation will block until* the first invocation is complete, after which it will return without* effect. </p>** @throws  IOException  If an I/O error occurs*/public void close() throws IOException;}

调用通道的close( )方法时,可能会导致在通道关闭底层I/O服务的过程中线程暂时阻塞 7 ,哪怕 该通道处于非阻塞模式。通道关闭时的阻塞行为(如果有的话)是高度取决于操作系统或者文件系 统的。在一个通道上多次调用close( )方法是没有坏处的,但是如果第一个线程在close( )方法中阻 塞,那么在它完成关闭通道之前,任何其他调用close( )方法都会阻塞。后续在该已关闭的通道上调 用close( )不会产生任何操作,只会立即返回。

可以通过 isOpen( )方法来测试通道的开放状态。如果返回 true 值,那么该通道可以使用。如 果返回 false 值,那么该通道已关闭,不能再被使用。尝试进行任何需要通道处于开放状态作为 前提的操作,如读、写等都会导致 ClosedChannelException 异常。

通道引入了一些与关闭和中断有关的新行为。如果一个通道实现 InterruptibleChannel 接口(参 见图 3-2),它的行为以下述语义为准:如果一个线程在一个通道上被阻塞并且同时被中断(由调 用该被阻塞线程的 interrupt( )方法的另一个线程中断),那么该通道将被关闭,该被阻塞线程也会 产生一个 ClosedByInterruptException 异常。

此外,假如一个线程的 interrupt status 被设置并且该线程试图访问一个通道,那么这个通道将 立即被关闭,同时将抛出相同的 ClosedByInterruptException 异常。线程的 interrupt status 在线程的 interrupt( )方法被调用时会被设置。我们可以使用 isInterrupted( )来测试某个线程当前的 interrupt status。当前线程的 interrupt status 可以通过调用静态的 Thread.interrupted( )方法清除。

仅仅因为休眠在其上的线程被中断就关闭通道,这看起来似乎过于苛刻了。不过这却是 NIO 架构师们所做出的明确的设计决定。经验表明,想要在所有的操作系统上一致而可靠地处理被中断 的 I/O 操作是不可能的。“在全部平台上提供确定的通道行为”这一需求导致了“当 I/O 操作被中断时 总是关闭通道”这一设计选择。这个选择被认为是可接受的,因为大部分时候一个线程被中断就是 希望以此来关闭通道。java.nio 包中强制使用此行为来避免因操作系统独特性而导致的困境,因 为该困境对 I/O 区域而言是极其危险的。这也是为增强健壮性(robustness)而采用的一种经典的权 衡。

可中断的通道也是可以异步关闭的。实现 InterruptibleChannel 接口的通道可以在任何时候被关 闭,即使有另一个被阻塞的线程在等待该通道上的一个 I/O 操作完成。当一个通道被关闭时,休眠 在该通道上的所有线程都将被唤醒并接收到一个 AsynchronousCloseException 异常。接着通道就被 关闭并将不再可用。

不实现 InterruptibleChannel 接口的通道一般都是不进行底层本地代码实现的有特殊用途的通 道。这些也许是永远不会阻塞的特殊用途通道,如旧系统数据流的封装包或不能实现可中断语义的 writer 类等。(参见 3.7 节)

2.Scatter/Gather

通道提供了一种被称为 Scatter/Gather 的重要新功能(有时也被称为矢量 I/O)。Scatter/Gather 是一个简单却强大的概念(参见 1.4.1.1 节),它是指在多个缓冲区上实现一个简单的 I/O 操作。对 于一个 write 操作而言,数据是从几个缓冲区按顺序抽取(称为 gather)并沿着通道发送的。缓冲 区本身并不需要具备这种 gather 的能力(通常它们也没有此能力)。该 gather 过程的效果就好比全 部缓冲区的内容被连结起来,并在发送数据前存放到一个大的缓冲区中。对于 read 操作而言,从 通道读取的数据会按顺序被散布(称为 scatter)到多个缓冲区,将每个缓冲区填满直至通道中的数据或者缓冲区的最大空间被消耗完。

大多数现代操作系统都支持本地矢量 I/O(native vectored I/O)。当您在一个通道上请求一个 Scatter/Gather 操作时,该请求会被翻译为适当的本地调用来直接填充或抽取缓冲区。这是一个很大 的进步,因为减少或避免了缓冲区拷贝和系统调用。Scatter/Gather 应该使用直接的 ByteBuffers 以从 本地 I/O 获取最大性能优势。

将 scatter/gather 接口添加到图 3-3 的 UML 类图中可以得到图 3-4。下面的代码描述了 scatter 是 如何扩展读操作的,以及 gather 是如何基于写操作构建的:

/** Copyright (c) 2000, 2006, Oracle and/or its affiliates. All rights reserved.* ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.*********************/package java.nio.channels;import java.io.IOException;
import java.nio.ByteBuffer;/*** A channel that can read bytes into a sequence of buffers.** <p> A <i>scattering</i> read operation reads, in a single invocation, a* sequence of bytes into one or more of a given sequence of buffers.* Scattering reads are often useful when implementing network protocols or* file formats that, for example, group data into segments consisting of one* or more fixed-length headers followed by a variable-length body.  Similar* <i>gathering</i> write operations are defined in the {@link* GatheringByteChannel} interface.  </p>*** @author Mark Reinhold* @author JSR-51 Expert Group* @since 1.4*/public interface ScatteringByteChannelextends ReadableByteChannel
{/*** Reads a sequence of bytes from this channel into a subsequence of the* given buffers.** <p> An invocation of this method attempts to read up to <i>r</i> bytes* from this channel, where <i>r</i> is the total number of bytes remaining* the specified subsequence of the given buffer array, that is,** <blockquote><pre>* dsts[offset].remaining()*     + dsts[offset+1].remaining()*     + ... + dsts[offset+length-1].remaining()</pre></blockquote>** at the moment that this method is invoked.** <p> Suppose that a byte sequence of length <i>n</i> is read, where* <tt>0</tt>&nbsp;<tt>&lt;=</tt>&nbsp;<i>n</i>&nbsp;<tt>&lt;=</tt>&nbsp;<i>r</i>.* Up to the first <tt>dsts[offset].remaining()</tt> bytes of this sequence* are transferred into buffer <tt>dsts[offset]</tt>, up to the next* <tt>dsts[offset+1].remaining()</tt> bytes are transferred into buffer* <tt>dsts[offset+1]</tt>, and so forth, until the entire byte sequence* is transferred into the given buffers.  As many bytes as possible are* transferred into each buffer, hence the final position of each updated* buffer, except the last updated buffer, is guaranteed to be equal to* that buffer's limit.** <p> This method may be invoked at any time.  If another thread has* already initiated a read operation upon this channel, however, then an* invocation of this method will block until the first operation is* complete. </p>** @param  dsts*         The buffers into which bytes are to be transferred** @param  offset*         The offset within the buffer array of the first buffer into*         which bytes are to be transferred; must be non-negative and no*         larger than <tt>dsts.length</tt>** @param  length*         The maximum number of buffers to be accessed; must be*         non-negative and no larger than*         <tt>dsts.length</tt>&nbsp;-&nbsp;<tt>offset</tt>** @return The number of bytes read, possibly zero,*         or <tt>-1</tt> if the channel has reached end-of-stream** @throws  IndexOutOfBoundsException*          If the preconditions on the <tt>offset</tt> and <tt>length</tt>*          parameters do not hold** @throws  NonReadableChannelException*          If this channel was not opened for reading** @throws  ClosedChannelException*          If this channel is closed** @throws  AsynchronousCloseException*          If another thread closes this channel*          while the read operation is in progress** @throws  ClosedByInterruptException*          If another thread interrupts the current thread*          while the read operation is in progress, thereby*          closing the channel and setting the current thread's*          interrupt status** @throws  IOException*          If some other I/O error occurs*/public long read(ByteBuffer[] dsts, int offset, int length)throws IOException;/*** Reads a sequence of bytes from this channel into the given buffers.** <p> An invocation of this method of the form <tt>c.read(dsts)</tt>* behaves in exactly the same manner as the invocation** <blockquote><pre>* c.read(dsts, 0, dsts.length);</pre></blockquote>** @param  dsts*         The buffers into which bytes are to be transferred** @return The number of bytes read, possibly zero,*         or <tt>-1</tt> if the channel has reached end-of-stream** @throws  NonReadableChannelException*          If this channel was not opened for reading** @throws  ClosedChannelException*          If this channel is closed** @throws  AsynchronousCloseException*          If another thread closes this channel*          while the read operation is in progress** @throws  ClosedByInterruptException*          If another thread interrupts the current thread*          while the read operation is in progress, thereby*          closing the channel and setting the current thread's*          interrupt status** @throws  IOException*          If some other I/O error occurs*/public long read(ByteBuffer[] dsts) throws IOException;}
/** Copyright (c) 2000, 2001, Oracle and/or its affiliates. All rights reserved.* ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.*********************/package java.nio.channels;import java.io.IOException;
import java.nio.ByteBuffer;/*** A channel that can write bytes from a sequence of buffers.** <p> A <i>gathering</i> write operation writes, in a single invocation, a* sequence of bytes from one or more of a given sequence of buffers.* Gathering writes are often useful when implementing network protocols or* file formats that, for example, group data into segments consisting of one* or more fixed-length headers followed by a variable-length body.  Similar* <i>scattering</i> read operations are defined in the {@link* ScatteringByteChannel} interface.  </p>*** @author Mark Reinhold* @author JSR-51 Expert Group* @since 1.4*/public interface GatheringByteChannelextends WritableByteChannel
{/*** Writes a sequence of bytes to this channel from a subsequence of the* given buffers.** <p> An attempt is made to write up to <i>r</i> bytes to this channel,* where <i>r</i> is the total number of bytes remaining in the specified* subsequence of the given buffer array, that is,** <blockquote><pre>* srcs[offset].remaining()*     + srcs[offset+1].remaining()*     + ... + srcs[offset+length-1].remaining()</pre></blockquote>** at the moment that this method is invoked.** <p> Suppose that a byte sequence of length <i>n</i> is written, where* <tt>0</tt>&nbsp;<tt>&lt;=</tt>&nbsp;<i>n</i>&nbsp;<tt>&lt;=</tt>&nbsp;<i>r</i>.* Up to the first <tt>srcs[offset].remaining()</tt> bytes of this sequence* are written from buffer <tt>srcs[offset]</tt>, up to the next* <tt>srcs[offset+1].remaining()</tt> bytes are written from buffer* <tt>srcs[offset+1]</tt>, and so forth, until the entire byte sequence is* written.  As many bytes as possible are written from each buffer, hence* the final position of each updated buffer, except the last updated* buffer, is guaranteed to be equal to that buffer's limit.** <p> Unless otherwise specified, a write operation will return only after* writing all of the <i>r</i> requested bytes.  Some types of channels,* depending upon their state, may write only some of the bytes or possibly* none at all.  A socket channel in non-blocking mode, for example, cannot* write any more bytes than are free in the socket's output buffer.** <p> This method may be invoked at any time.  If another thread has* already initiated a write operation upon this channel, however, then an* invocation of this method will block until the first operation is* complete. </p>** @param  srcs*         The buffers from which bytes are to be retrieved** @param  offset*         The offset within the buffer array of the first buffer from*         which bytes are to be retrieved; must be non-negative and no*         larger than <tt>srcs.length</tt>** @param  length*         The maximum number of buffers to be accessed; must be*         non-negative and no larger than*         <tt>srcs.length</tt>&nbsp;-&nbsp;<tt>offset</tt>** @return  The number of bytes written, possibly zero** @throws  IndexOutOfBoundsException*          If the preconditions on the <tt>offset</tt> and <tt>length</tt>*          parameters do not hold** @throws  NonWritableChannelException*          If this channel was not opened for writing** @throws  ClosedChannelException*          If this channel is closed** @throws  AsynchronousCloseException*          If another thread closes this channel*          while the write operation is in progress** @throws  ClosedByInterruptException*          If another thread interrupts the current thread*          while the write operation is in progress, thereby*          closing the channel and setting the current thread's*          interrupt status** @throws  IOException*          If some other I/O error occurs*/public long write(ByteBuffer[] srcs, int offset, int length)throws IOException;/*** Writes a sequence of bytes to this channel from the given buffers.** <p> An invocation of this method of the form <tt>c.write(srcs)</tt>* behaves in exactly the same manner as the invocation** <blockquote><pre>* c.write(srcs, 0, srcs.length);</pre></blockquote>** @param  srcs*         The buffers from which bytes are to be retrieved** @return  The number of bytes written, possibly zero** @throws  NonWritableChannelException*          If this channel was not opened for writing** @throws  ClosedChannelException*          If this channel is closed** @throws  AsynchronousCloseException*          If another thread closes this channel*          while the write operation is in progress** @throws  ClosedByInterruptException*          If another thread interrupts the current thread*          while the write operation is in progress, thereby*          closing the channel and setting the current thread's*          interrupt status** @throws  IOException*          If some other I/O error occurs*/public long write(ByteBuffer[] srcs) throws IOException;}

从上图您可以看到,这两个接口都添加了两种以缓冲区阵列作为参数的新方法。另外,每种方 法都提供了一种带 offset 和 length 参数的形式。让我们先来理解一下怎样使用方法的简单形式。在 下面的代码中,我们假定 channel 连接到一个有 48 字节数据等待读取的 socket 上:

        ByteBuffer header = ByteBuffer.allocateDirect(10);ByteBuffer body = ByteBuffer.allocateDirect(80);ByteBuffer[] buffers = {header, body};int bytesRead = channel.read(buffers);

一旦 read( )方法返回,bytesRead 就被赋予值 48,header 缓冲区将包含前 10 个从通道读 取的字节而 body 缓冲区则包含接下来的 38 个字节。通道会自动地将数据 scatter 到这两个缓冲区 中。缓冲区已经被填充了(尽管此例中 body 缓冲区还有空间填充更多数据),那么将需要被 flip 以便其中数据可以被抽取。在类似这样的例子中,我们可能并不会费劲去 flip 这个 header 缓冲区 而是以绝对 get 的方式随机访问它以检查各种 header 字段;不过 body 缓冲区会被 flip 并传递到另 一个通道的 write( )方法上,然后在通道上发送出去。例如:

        switch (header.getShort(0)) {case TYPE_PING:break;case TYPE_FILE:body.flip();fileChannel.write(body);break;default:logUnknownPacket(header.getShort(0), header.getLong(2), body);break;}

同样,很简单地,我们可以用一个 gather 操作将多个缓冲区的数据组合并发送出去。使用相同 的缓冲区,我们可以像下面这样汇总数据并在一个 socket 通道上发送包:

        body.clear();body.put("FOO".getBytes()).flip(); // "FOO" as bytesheader.clear();header.putShort(TYPE_FILE).putLong(body.limit()).flip();long bytesWritten = channel.write(buffers);

以上代码从传递给 write( )方法的 buffers 阵列所引用的缓冲区中 gather 数据,然后沿着通道发送了总共 13 个字节。

图 3-5 描述了一个 gather 写操作。数据从缓冲区阵列引用的每个缓冲区中 gather 并被组合成沿 着通道发送的字节流。

图 3-6 描述了一个 scatter 读操作。从通道传输来的数据被 scatter 到所列缓冲区,依次填充每个 缓冲区(从缓冲区的 position 处开始到 limit 处结束)。这里显示的 position 和 limit 值是读操作开 始之前的。

带 offset 和 length 参数版本的 read( ) 和 write( )方法使得我们可以使用缓冲区阵列的子集 缓冲区。这里的 offset 值指哪个缓冲区将开始被使用,而不是指数据的 offset。这里的 length 参 数指示要使用的缓冲区数量。举个例子,假设我们有一个五元素的 fiveBuffers 阵列,它已经被 初始化并引用了五个缓冲区,下面的代码将会写第二个、第三个和第四个缓冲区的内容:

int bytesRead = channel.write (fiveBuffers, 1, 3);

使用得当的话,Scatter/Gather 会是一个极其强大的工具。它允许您委托操作系统来完成辛苦 活:将读取到的数据分开存放到多个存储桶(bucket)或者将不同的数据区块合并成一个整体。这 是一个巨大的成就,因为操作系统已经被高度优化来完成此类工作了。它节省了您来回移动数据的工作,也就避免了缓冲区拷贝和减少了您需要编写、调试的代码数量。既然您基本上通过提供数据 容器引用来组合数据,那么按照不同的组合构建多个缓冲区阵列引用,各种数据区块就可以以不同 的方式来组合了。例 3-2 很好地诠释了这一点:

例 3-2 以 gather 写操作来集合多个缓冲区的数据

package org.example;import java.io.FileOutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.GatheringByteChannel;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;/*** Demonstrate gathering write using many buffers.* <p>** @author Ron Hitchens (ron@ronsoft.com)*/
public class Marketing {private static final String DEMOGRAPHIC = "blahblah.txt";// "Leverage frictionless methodologies"public static void main(String[] argv) throws Exception {int reps = 10;if (argv.length > 0) {reps = Integer.parseInt(argv[0]);}FileOutputStream fos = new FileOutputStream(DEMOGRAPHIC);GatheringByteChannel gatherChannel = fos.getChannel();// Generate some brilliant marcom, er, repurposed contentByteBuffer[] bs = utterBS(reps);// Deliver the message to the waiting marketwhile (gatherChannel.write(bs) > 0) {// Empty body// Loop until write( ) returns zero}System.out.println("Mindshare paradigms synergized to " + DEMOGRAPHIC);fos.close();}// -----------------------------------------------// These are just representative; add your ownprivate static String[] col1 = {"Aggregate", "Enable", "Leverage","Facilitate", "Synergize", "Repurpose","Strategize", "Reinvent", "Harness"};private static String[] col2 = {"cross-platform", "best-of-breed", "frictionless","ubiquitous", "extensible", "compelling","mission-critical", "collaborative", "integrated"};private static String[] col3 = {"methodologies", "infomediaries", "platforms","schemas", "mindshare", "paradigms","functionalities", "web services", "infrastructures"};private static String newline = System.getProperty("line.separator");// The Marcom-atic 9000private static ByteBuffer[] utterBS(int howMany)throws Exception {List list = new LinkedList();for (int i = 0; i < howMany; i++) {list.add(pickRandom(col1, " "));list.add(pickRandom(col2, " "));list.add(pickRandom(col3, newline));}ByteBuffer[] bufs = new ByteBuffer[list.size()];list.toArray(bufs);return (bufs);} // The communications directorprivate static Random rand = new Random();// Pick one, make a buffer to hold it and the suffix, load it with// the byte equivalent of the strings (will not work properly for// non-Latin characters), then flip the loaded buffer so it's ready// to be drainedprivate static ByteBuffer pickRandom(String[] strings, String suffix) throws Exception {String string = strings[rand.nextInt(strings.length)];int total = string.length() + suffix.length();ByteBuffer buf = ByteBuffer.allocate(total);buf.put(string.getBytes("US-ASCII"));buf.put(suffix.getBytes("US-ASCII"));buf.flip();return (buf);}
}

下面是实现 Marketing 类的输出。虽然这种输出没什么意义,但是 gather 写操作却能让我们 非常高效地把它生成出来。

     Aggregate compelling methodologiesHarness collaborative platformsAggregate integrated schemasAggregate frictionless platformsEnable integrated platformsLeverage cross-platform functionalitiesHarness extensible paradigmsSynergize compelling infomediariesRepurpose cross-platform mindshare Facilitate cross-platform infomediaries

3.文件通道

直到现在,我们都还只是在泛泛地讨论通道,比如讨论那些对所有通道都适用的内容。是时候 具体点了,本节我们来讨论文件通道(socket 通道将在下一节讨论)。从图 3-7 可以发现, FileChannel 类可以实现常用的 read,write 以及 scatter/gather 操作,同时它也提供了很多专用于文 件的新方法。这些方法中的许多都是我们所熟悉的文件操作,不过其他的您可能之前并未接触过。 现在我们将在此对它们全部予以讨论。

文件通道总是阻塞式的,因此不能被置于非阻塞模式。现代操作系统都有复杂的缓存和预取机 制,使得本地磁盘 I/O 操作延迟很少。网络文件系统一般而言延迟会多些,不过却也因该优化而受 益。面向流的 I/O 的非阻塞范例对于面向文件的操作并无多大意义,这是由文件 I/O 本质上的不同 性质造成的。对于文件 I/O,最强大之处在于异步 I/O(asynchronous I/O),它允许一个进程可以 从操作系统请求一个或多个 I/O 操作而不必等待这些操作的完成。发起请求的进程之后会收到它请 求的 I/O 操作已完成的通知。异步 I/O 是一种高级性能,当前的很多操作系统都还不具备。以后的 NIO 增强也会把异步 I/O 纳入考虑范围。

我们在 3.1.1 节中提到,FileChannel对象不能直接创建。一个FileChannel实例只能通过在一个 打开的file对象(RandomAccessFile、FileInputStream或 FileOutputStream)上调用getChannel( )方法 获取 8 。调用getChannel( )方法会返回一个连接到相同文件的FileChannel对象且该FileChannel对象 具有与file对象相同的访问权限,然后您就可以使用该通道对象来利用强大的FileChannel API了:

package java.nio.channels;import java.nio.MappedByteBuffer;
import java.nio.channels.*;public abstract class FileChannel extends AbstractChannel implements ByteChannel, GatheringByteChannel, ScatteringByteChannel {// This is a partial API listing// All methods listed here can throw java.io.IOExceptionpublic abstract int read(ByteBuffer dst, long position)public abstract int write(ByteBuffer src, long position)public abstract long size()public abstract long position()public abstract void position(long newPosition)public abstract void truncate(long size)public abstract void force(boolean metaData)public final FileLock lock()public abstract FileLock lock(long position, long size, boolean shared)public final FileLock tryLock()public abstract FileLock tryLock(long position, long size, boolean shared)public abstract MappedByteBuffer map(MapMode mode, long position, long size)public static class MapMode {public static final MapMode READ_ONLYpublic static final MapMode READ_WRITEpublic static final MapMode PRIVATE}public abstract long transferTo(long position, long count, WritableByteChannel target)public abstract long transferFrom(ReadableByteChannel src, long position, long count)
}

上面的代码中给出了 FileChannel 类引入的新 API 方法。所有这些方法都可以抛出 java.io.IOException 异常,不过抛出语句并未在此列出。

同大多数通道一样,只要有可能,FileChannel 都会尝试使用本地 I/O 服务。FileChannel 类本身是抽象的,您从 getChannel( )方法获取的实际对象是一个具体子类(subclass)的一个实例 (instance),该子类可能使用本地代码来实现以上 API 方法中的一些或全部。

FileChannel 对象是线程安全(thread-safe)的。多个进程可以在同一个实例上并发调用方法而 不会引起任何问题,不过并非所有的操作都是多线程的(multithreaded)。影响通道位置或者影响 文件大小的操作都是单线程的(single-threaded)。如果有一个线程已经在执行会影响通道位置或文 件大小的操作,那么其他尝试进行此类操作之一的线程必须等待。并发行为也会受到底层的操作系 统或文件系统影响。

同大多数 I/O 相关的类一样,FileChannel 是一个反映 Java 虚拟机外部一个具体对象的抽象。 FileChannel 类保证同一个 Java 虚拟机上的所有实例看到的某个文件的视图均是一致的,但是 Java 虚拟机却不能对超出它控制范围的因素提供担保。通过一个 FileChannel 实例看到的某个文件的视 图同通过一个外部的非 Java 进程看到的该文件的视图可能一致,也可能不一致。多个进程发起的 并发文件访问的语义高度取决于底层的操作系统和(或)文件系统。一般而言,由运行在不同 Java 虚拟机上的 FileChannel 对象发起的对某个文件的并发访问和由非 Java 进程发起的对该文件的并发 访问是一致的。

1)访问文件

每个 FileChannel 对象都同一个文件描述符(file descriptor)有一对一的关系,所以上面列出的 API 方法与在您最喜欢的 POSIX(可移植操作系统接口)兼容的操作系统上的常用文件 I/O 系统调 用紧密对应也就不足为怪了。名称也许不尽相同,不过常见的 suspect(“可疑分子”)都被集中 起来了。您可能也注意到了上面列出的 API 方法同 java.io 包中 RandomAccessFile 类的方法的相 似之处了。本质上讲,RandomAccessFile 类提供的是同样的抽象内容。在通道出现之前,底层的文 件操作都是通过 RandomAccessFile 类的方法来实现的。FileChannel 模拟同样的 I/O 服务,因此它 的 API 自然也是很相似的。

为了便于比较,表 3-1 列出了 FileChannel、RandomAccessFile 和 POSIX I/O system calls 三者在 方法上的对应关系。

让我们来进一步看下基本的文件访问方法(请记住这些方法都可以抛出 java.io.IOException 异 常):

package java.nio.channels;import java.nio.channels.ByteChannel;
import java.nio.channels.GatheringByteChannel;
import java.nio.channels.ScatteringByteChannel;public abstract class FileChannel extends AbstractChannel implements ByteChannel, GatheringByteChannel, ScatteringByteChannel {// This is a partial API listingpublic abstract long position()public abstract void position(long newPosition)public abstract int read(ByteBuffer dst)public abstract int read(ByteBuffer dst, long position)public abstract int write(ByteBuffer src)public abstract int write(ByteBuffer src, long position)public abstract long size()public abstract void truncate(long size)public abstract void force(boolean metaData)
}

同底层的文件描述符一样,每个 FileChannel 都有一个叫“file position”的概念。这个 position 值 决定文件中哪一处的数据接下来将被读或者写。从这个方面看,FileChannel 类同缓冲区很类似, 并且 MappedByteBuffer 类使得我们可以通过 ByteBuffer API 来访问文件数据(我们会在后面的章节 中了解到这一点)。

您可以从前面的API清单中看到,有两种形式的position( )方法。第一种,不带参数的,返回当 前文件的position值。返回值是一个长整型(long),表示文件中的当前字节位置。

第二种形式的 position( )方法带一个 long(长整型)参数并将通道的 position 设置为指定值。 如果尝试将通道 position 设置为一个负值会导致 java.lang.IllegalArgumentException 异常,不过可以 把 position 设置到超出文件尾,这样做会把 position 设置为指定值而不改变文件大小。假如在将 position 设置为超出当前文件大小时实现了一个 read( )方法,那么会返回一个文件尾(end-of-file) 条件;倘若此时实现的是一个 write( )方法则会引起文件增长以容纳写入的字节,具体行为类似于 实现一个绝对 write( )并可能导致出现一个文件空洞(file hole,参见“文件空洞究竟是什么?”)。

文件空洞究竟是什么?

当磁盘上一个文件的分配空间小于它的文件大小时会出现“文件空洞”。对于内容稀疏的文 件,大多数现代文件系统只为实际写入的数据分配磁盘空间(更准确地说,只为那些写入数 据的文件系统页分配空间)。假如数据被写入到文件中非连续的位置上,这将导致文件出现 在逻辑上不包含数据的区域(即“空洞”)。例如,下面的代码可能产生一个如图 3-8 所示的文 件:

package org.example;import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;/*** Create a file with holes in it** @author Ron Hitchens (ron@ronsoft.com)*/
public class FileHole {public static void main(String[] argv) throws IOException {// Create a temp file, open for writing, and get// a FileChannelFile temp = File.createTempFile("holy", null);RandomAccessFile file = new RandomAccessFile(temp, "rw");FileChannel channel = file.getChannel();// Create a working bufferByteBuffer byteBuffer = ByteBuffer.allocateDirect(100);putData(0, byteBuffer, channel);putData(5000000, byteBuffer, channel);putData(50000, byteBuffer, channel);// Size will report the largest position written, but// there are two holes in this file. This file will// not consume 5 MB on disk (unless the filesystem is// extremely brain-damaged)System.out.println("Wrote temp file '" + temp.getPath() + "', size=" + channel.size());channel.close();file.close();}private static void putData(int position, ByteBuffer buffer, FileChannel channel) throws IOException {String string = "*<-- location " + position;buffer.clear();buffer.put(string.getBytes("US-ASCII"));buffer.flip();channel.position(position);channel.write(buffer);}
}

如果该文件被顺序读取的话,所有空洞都会被“0”填充但不占用磁盘空间。读取该文件的 进程会看到 5,000,021 个字节,大部分字节都以“0”表示。试试在该文件上运行 strings 命令, 看看您会得到什么。再试试将文件大小的值提高到 50 或 100MB,看看您的全部磁盘空间消耗 以及顺序扫描该文件所需时间会发生何种变化(前者不会改变,但是后者将有非常大的增 加)。

FileChannel 位置(position)是从底层的文件描述符获得的,该 position 同时被作为通道引用 获取来源的文件对象共享。这也就意味着一个对象对该 position 的更新可以被另一个对象看到:

        RandomAccessFile randomAccessFile = new RandomAccessFile("filename", "r");// Set the file positionrandomAccessFile.seek(1000);// Create a channel from the fileFileChannel fileChannel = randomAccessFile.getChannel();// This will print "1000"System.out.println("file pos: " + fileChannel.position());// Change the position using the RandomAccessFile objectrandomAccessFile.seek(500);// This will print "500"System.out.println("file pos: " + fileChannel.position());// Change the position using the FileChannel objectfileChannel.position(200);// This will print "200"System.out.println("file pos: " + randomAccessFile.getFilePointer());

类似于缓冲区的 get( ) 和 put( )方法,当字节被 read( )或 write( )方法传输时,文件 position 会 自动更新。如果 position 值达到了文件大小的值(文件大小的值可以通过 size( )方法返回),read( ) 方法会返回一个文件尾条件值(-1)。可是,不同于缓冲区的是,如果实现 write( )方法时 position 前进到超过文件大小的值,该文件会扩展以容纳新写入的字节。

同样类似于缓冲区,也有带 position 参数的绝对形式的 read( )和 write( )方法。这种绝对形式 的方法在返回值时不会改变当前的文件 position。由于通道的状态无需更新,因此绝对的读和写可 能会更加有效率,操作请求可以直接传到本地代码。更妙的是,多个线程可以并发访问同一个文件 而不会相互产生干扰。这是因为每次调用都是原子性的(atomic),并不依靠调用之间系统所记住 的状态。

类似于缓冲区的 get( ) 和 put( )方法,当字节被 read( )或 write( )方法传输时,文件 position 会 自动更新。如果 position 值达到了文件大小的值(文件大小的值可以通过 size( )方法返回),read( ) 方法会返回一个文件尾条件值(-1)。可是,不同于缓冲区的是,如果实现 write( )方法时 position 前进到超过文件大小的值,该文件会扩展以容纳新写入的字节。

同样类似于缓冲区,也有带 position 参数的绝对形式的 read( )和 write( )方法。这种绝对形式 的方法在返回值时不会改变当前的文件 position。由于通道的状态无需更新,因此绝对的读和写可 能会更加有效率,操作请求可以直接传到本地代码。更妙的是,多个线程可以并发访问同一个文件 而不会相互产生干扰。这是因为每次调用都是原子性的(atomic),并不依靠调用之间系统所记住 的状态。

package java.nio.channels;import java.nio.channels.ByteChannel;
import java.nio.channels.GatheringByteChannel;
import java.nio.channels.ScatteringByteChannel;public abstract class FileChannel extends AbstractChannel implements ByteChannel, GatheringByteChannel, ScatteringByteChannel {// This is a partial API listingpublic abstract void truncate(long size)public abstract void force(boolean metaData)
}

上面列出的最后一个 API 是 force( )。该方法告诉通道强制将全部待定的修改都应用到磁盘的 文件上。所有的现代文件系统都会缓存数据和延迟磁盘文件更新以提高性能。调用 force( )方法要 求文件的所有待定修改立即同步到磁盘。

如果文件位于一个本地文件系统,那么一旦 force( )方法返回,即可保证从通道被创建(或上 次调用 force( ))时起的对文件所做的全部修改已经被写入到磁盘。对于关键操作如事务 (transaction)处理来说,这一点是非常重要的,可以保证数据完整性和可靠的恢复。然而,如果 文件位于一个远程的文件系统,如 NFS 上,那么不能保证待定修改一定能同步到永久存储器 (permanent storage)上,因 Java 虚拟机不能做操作系统或文件系统不能实现的承诺。如果您的程 序在面临系统崩溃时必须维持数据完整性,先去验证一下您在使用的操作系统和(或)文件系统在 同步修改方面是可以依赖的。

force( )方法的布尔型参数表示在方法返回值前文件的元数据(metadata)是否也要被同步更新到磁盘。元数据指文件所有者、访问权限、最后一次修改时间等信息。大多数情形下,该信息对数 据恢复而言是不重要的。给 force( )方法传递 false 值表示在方法返回前只需要同步文件数据的更 改。大多数情形下,同步元数据要求操作系统进行至少一次额外的底层 I/O 操作。一些大数量事务 处理程序可能通过在每次调用 force( )方法时不要求元数据更新来获取较高的性能提升,同时也不 会牺牲数据完整性。

2)文件锁定

在 JDK 1.4 版本之前,Java I/O 模型都未能提供文件锁定(file locking),缺少这一特性让人们 很头疼。绝大多数现代操作系统早就有了文件锁定功能,而直到 JDK 1.4 版本发布时 Java 编程人员 才可以使用文件锁(file lock)。在集成许多其他非 Java 程序时,文件锁定显得尤其重要。此外, 它在判优(判断多个访问请求的优先级别)一个大系统的多个 Java 组件发起的访问时也很有价 值。

我们在第一章中讨论到,锁(lock)可以是共享的(shared)或独占的(exclusive)。本节中描 述的文件锁定特性在很大程度上依赖本地的操作系统实现。并非所有的操作系统和文件系统都支持 共享文件锁。对于那些不支持的,对一个共享锁的请求会被自动提升为对独占锁的请求。这可以保 证准确性却可能严重影响性能。举个例子,仅使用独占锁将会串行化图 1-7 中所列的全部 reader 进 程。如果您计划部署程序,请确保您了解所用操作系统和文件系统的文件锁定行为,因为这将严重 影响您的设计选择。

另外,并非所有平台都以同一个方式来实现基本的文件锁定。在不同的操作系统上,甚至在同 一个操作系统的不同文件系统上,文件锁定的语义都会有所差异。一些操作系统仅提供劝告锁定 (advisory locking),一些仅提供独占锁(exclusive locks),而有些操作系统可能两种锁都提供。 您应该总是按照劝告锁的假定来管理文件锁,因为这是最安全的。但是如能了解底层操作系统如何 执行锁定也是非常好的。例如,如果所有的锁都是强制性的(mandatory)而您不及时释放您获得 的锁的话,运行在同一操作系统上的其他程序可能会受到影响。

有关 FileChannel 实现的文件锁定模型的一个重要注意项是:锁的对象是文件而不是通道或线 程,这意味着文件锁不适用于判优同一台 Java 虚拟机上的多个线程发起的访问。

如果一个线程在某个文件上获得了一个独占锁,然后第二个线程利用一个单独打开的通道来请 求该文件的独占锁,那么第二个线程的请求会被批准。但如果这两个线程运行在不同的 Java 虚拟 机上,那么第二个线程会阻塞,因为锁最终是由操作系统或文件系统来判优的并且几乎总是在进程 级而非线程级上判优。锁都是与一个文件关联的,而不是与单个的文件句柄或通道关联。

文件锁旨在在进程级别上判优文件访问,比如在主要的程序组件之间或者在集成其他供应商的 组件时。如果您需要控制多个 Java 线程的并发访问,您可能需要实施您自己的、轻量级的锁定方 案。那种情形下,内存映射文件(本章后面会进行详述)可能是一个合适的选择。

现在让我们来看下与文件锁定有关的 FileChannel API 方法:

package java.nio.channels;import java.io.IOException;
import java.nio.channels.ByteChannel;
import java.nio.channels.FileLock;
import java.nio.channels.GatheringByteChannel;
import java.nio.channels.ScatteringByteChannel;public abstract class FileChannel extends AbstractChannel implements ByteChannel, GatheringByteChannel, ScatteringByteChannel {public abstract FileLock lock(long position, long size, boolean shared)throws IOException;public final FileLock lock() throws IOExceptionpublic abstract FileLock tryLock(long position, long size, boolean shared)throws IOException;public final FileLock tryLock() throws IOException
}

这次我们先看带参数形式的 lock( )方法。锁是在文件内部区域上获得的。调用带参数的 Lock( ) 方法会指定文件内部锁定区域的开始 position 以及锁定区域的 size。第三个参数 shared 表 示您想获取的锁是共享的(参数值为 true)还是独占的(参数值为 false)。要获得一个共享 锁,您必须先以只读权限打开文件,而请求独占锁时则需要写权限。另外,您提供的 position 和 size 参数的值不能是负数。

锁定区域的范围不一定要限制在文件的 size 值以内,锁可以扩展从而超出文件尾。因此,我们 可以提前把待写入数据的区域锁定,我们也可以锁定一个不包含任何文件内容的区域,比如文件最 后一个字节以外的区域。如果之后文件增长到达那块区域,那么您的文件锁就可以保护该区域的文 件内容了。相反地,如果您锁定了文件的某一块区域,然后文件增长超出了那块区域,那么新增加 的文件内容将不会受到您的文件锁的保护。

不带参数的简单形式的 lock( )方法是一种在整个文件上请求独占锁的便捷方法,锁定区域等于 它能达到的最大范围。该方法等价于:

fileChannel.lock (0L, Long.MAX_VALUE, false);

如果您正请求的锁定范围是有效的,那么 lock( )方法会阻塞,它必须等待前面的锁被释放。假 如您的线程在此情形下被暂停,该线程的行为受中断语义(类似我们在 3.1.3 节中所讨论的)控 制。如果通道被另外一个线程关闭,该暂停线程将恢复并产生一个 AsynchronousCloseException 异 常。假如该暂停线程被直接中断(通过调用它的 interrupt( )方法),它将醒来并产生一个 FileLockInterruptionException 异常。如果在调用 lock( )方法时线程的 interrupt status 已经被设置,也 会产生 FileLockInterruptionException 异常。

在上面的 API 列表中有两个名为 tryLock( )的方法,它们是 lock( )方法的非阻塞变体。这两个 tryLock( )和 lock( )方法起相同的作用,不过如果请求的锁不能立即获取到则会返回一个 null。

您可以看到,lock( )和 tryLock( )方法均返回一个 FileLock 对象。以下是完整的 FileLock API:

package java.nio.channels;import java.io.IOException;public abstract class FileLock implements AutoCloseable {public final FileChannel channel()public final long position()public final long size()public final boolean isShared()public final boolean overlaps(long position, long size)public abstract boolean isValid();public abstract void release() throws IOException;public final void close() throws IOException
}

FileLock 类封装一个锁定的文件区域。FileLock 对象由 FileChannel 创建并且总是关联到那个特定的通道实例。您可以通过调用 channel( )方法来查询一个 lock 对象以判断它是由哪个通道创建 的。

一个 FileLock 对象创建之后即有效,直到它的 release( )方法被调用或它所关联的通道被关闭或 Java 虚拟机关闭时才会失效。我们可以通过调用 isValid( )布尔方法来测试一个锁的有效性。一个锁 的有效性可能会随着时间而改变,不过它的其他属性——位置(position)、范围大小(size)和独 占性(exclusivity)——在创建时即被确定,不会随着时间而改变。

您可以通过调用 isShared( )方法来测试一个锁以判断它是共享的还是独占的。如果底层的操作 系统或文件系统不支持共享锁,那么该方法将总是返回 false 值,即使您申请锁时传递的参数值 是 true。假如您的程序依赖共享锁定行为,请测试返回的锁以确保您得到了您申请的锁类型。 FileLock 对象是线程安全的,多个线程可以并发访问一个锁对象。

最后,您可以通过调用 overlaps( )方法来查询一个 FileLock 对象是否与一个指定的文件区域重 叠。这将使您可以迅速判断您拥有的锁是否与一个感兴趣的区域(region of interest)有交叉。不过 即使返回值是 false 也不能保证您就一定能在期望的区域上获得一个锁,因为 Java 虚拟机上的其 他地方或者外部进程可能已经在该期望区域上有一个或多个锁了。您最好使用 tryLock( )方法确认 一下。

尽管一个 FileLock 对象是与某个特定的 FileChannel 实例关联的,它所代表的锁却是与一个底 层文件关联的,而不是与通道关联。因此,如果您在使用完一个锁后而不释放它的话,可能会导致 冲突或者死锁。请小心管理文件锁以避免出现此问题。一旦您成功地获取了一个文件锁,如果随后 在通道上出现错误的话,请务必释放这个锁。推荐使用类似下面的代码形式:

FileLock lock = fileChannel.lock()
try {<perform read/write/whatever on channel>
} catch (IOException) {<handle unexpected exception>
} finally { lock.release()
}

例 3-3 中的代码使用共享锁实现了 reader 进程,使用独占锁实现了 writer 进程,图 1-7 和图 1-8 对此有诠释。由于锁是与进程而不是 Java 线程关联的,您将需要运行该程序的多个拷贝。先从一 个 writer 和两个或更多的 readers 开始,我们来看下不同类型的锁是如何交互的。

例 3-3 共享锁同独占锁交互

package org.example;import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.IntBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.util.Random;/*** Test locking with FileChannel.* <p>* Run one copy of this code with arguments "-w /tmp/locktest.dat"* and one or more copies with "-r /tmp/locktest.dat" to see the* interactions of exclusive and shared locks. Note how too many* readers can starve out the writer.* <p>* Note: The filename you provide will be overwritten. Substitute* an appropriate temp filename for your favorite OS.* <p>* Created April, 2002** @author Ron Hitchens (ron@ronsoft.com)*/
public class LockTest {private static final int SIZEOF_INT = 4;private static final int INDEX_START = 0;private static final int INDEX_COUNT = 10;private static final int INDEX_SIZE = INDEX_COUNT * SIZEOF_INT;private ByteBuffer buffer = ByteBuffer.allocate(INDEX_SIZE);private IntBuffer indexBuffer = buffer.asIntBuffer();private Random rand = new Random();public static void main(String[] argv)throws Exception {boolean writer = false;String filename;if (argv.length != 2) {System.out.println("Usage: [ -r | -w ] filename");return;}writer = argv[0].equals("-w");filename = argv[1];RandomAccessFile raf = new RandomAccessFile(filename, (writer) ? "rw" : "r");FileChannel fc = raf.getChannel();LockTest lockTest = new LockTest();if (writer) {lockTest.doUpdates(fc);} else {lockTest.doQueries(fc);}}// ---------------------------------------------------------------// Simulate a series of read-only queries while// holding a shared lock on the index areavoid doQueries(FileChannel fc)throws Exception {while (true) {println("trying for shared lock...");FileLock lock = fc.lock(INDEX_START, INDEX_SIZE, true);int reps = rand.nextInt(60) + 20;for (int i = 0; i < reps; i++) {int n = rand.nextInt(INDEX_COUNT);int position = INDEX_START + (n * SIZEOF_INT);buffer.clear();fc.read(buffer, position);int value = indexBuffer.get(n);println("Index entry " + n + "=" + value);// Pretend to be doing some work Thread.sleep (100);}lock.release();println("<sleeping>");Thread.sleep(rand.nextInt(3000) + 500);}}// Simulate a series of updates to the index area// while holding an exclusive lockvoid doUpdates(FileChannel fc) throws Exception {while (true) {println("trying for exclusive lock...");FileLock lock = fc.lock(INDEX_START, INDEX_SIZE, false);updateIndex(fc);lock.release();println("<sleeping>");Thread.sleep(rand.nextInt(2000) + 500);}}// Write new values to the index slotsprivate int idxval = 1;private void updateIndex(FileChannel fc) throws Exception {// "indexBuffer" is an int view of "buffer"indexBuffer.clear();for (int i = 0; i < INDEX_COUNT; i++) {idxval++;println("Updating index " + i + "=" + idxval);indexBuffer.put(idxval);// Pretend that this is really hard workThread.sleep(500);}// leaves position and limit correct for whole bufferbuffer.clear();fc.write(buffer, INDEX_START);}// ---------------------------------------------------------------private int lastLineLen = 0;// Specialized println that repaints the current lineprivate void println(String msg) {System.out.print("\r ");System.out.print(msg);for (int i = msg.length();i < lastLineLen; i++) {System.out.print(" ");}System.out.print("\r");System.out.flush();lastLineLen = msg.length();}
}

以上代码直接忽略了我之前说给的用 try/catch/finally 来释放锁的建议,在您自己所写 的实际代码中请不要这么懒。

4.内存映射文件

新的 FileChannel 类提供了一个名为 map( )的方法,该方法可以在一个打开的文件和一个特殊 类型的 ByteBuffer 之间建立一个虚拟内存映射(第一章中已经归纳了什么是内存映射文件以及它们 如何同虚拟内存交互)。在 FileChannel 上调用 map( )方法会创建一个由磁盘文件支持的虚拟内存 映射(virtual memory mapping)并在那块虚拟内存空间外部封装一个 MappedByteBuffer 对象(参见 图 1-6)。

由 map( )方法返回的 MappedByteBuffer 对象的行为在多数方面类似一个基于内存的缓冲区,只 不过该对象的数据元素存储在磁盘上的一个文件中。调用 get( )方法会从磁盘文件中获取数据,此 数据反映该文件的当前内容,即使在映射建立之后文件已经被一个外部进程做了修改。通过文件映 射看到的数据同您用常规方法读取文件看到的内容是完全一样的。相似地,对映射的缓冲区实现一 个 put( )会更新磁盘上的那个文件(假设对该文件您有写的权限),并且您做的修改对于该文件的 其他阅读者也是可见的。

通过内存映射机制来访问一个文件会比使用常规方法读写高效得多,甚至比使用通道的效率都高。因为不需要做明确的系统调用,那会很消耗时间。更重要的是,操作系统的虚拟内存可以自动 缓存内存页(memory page)。这些页是用系统内存来缓存的,所以不会消耗 Java 虚拟机内存堆 (memory heap)。

一旦一个内存页已经生效(从磁盘上缓存进来),它就能以完全的硬件速度再次被访问而不需 要再次调用系统命令来获取数据。那些包含索引以及其他需频繁引用或更新的内容的巨大而结构化 文件能因内存映射机制受益非常多。如果同时结合文件锁定来保护关键区域和控制事务原子性,那 您将能了解到内存映射缓冲区如何可以被很好地利用。

下面让我们来看一下如何使用内存映射:

package java.nio.channels;import java.io.IOException;
import java.nio.MappedByteBuffer;
import java.nio.channels.GatheringByteChannel;
import java.nio.channels.ScatteringByteChannel;
import java.nio.channels.SeekableByteChannel;
import java.nio.channels.spi.AbstractInterruptibleChannel;public abstract class FileChannelextends AbstractInterruptibleChannelimplements SeekableByteChannel, GatheringByteChannel, ScatteringByteChannel {public abstract MappedByteBuffer map(MapMode mode, long position, long size) throws IOException;public static class MapMode {public static final MapMode READ_ONLYpublic static final MapMode READ_WRITEpublic static final MapMode PRIVATE}
}

可以看到,只有一种 map( )方法来创建一个文件映射。它的参数有 mode,position 和 size。参 数 position 和 size 同 lock( )方法的这两个参数是一样的(在前面的章节中已有讨论)。我们可 以创建一个 MappedByteBuffer 来代表一个文件中字节的某个子范围。例如,要映射 100 到 299(包 含 299)位置的字节,可以使用下面的代码:

buffer = fileChannel.map (FileChannel.MapMode.READ_ONLY, 100, 200);

如果要映射整个文件则使用:

buffer = fileChannel.map(FileChannel.MapMode.READ_ONLY, 0, fileChannel.size());

与文件锁的范围机制不一样,映射文件的范围不应超过文件的实际大小。如果您请求一个超出 文件大小的映射,文件会被增大以匹配映射的大小。假如您给 size 参数传递的值是Integer.MAX_VALUE,文件大小的值会膨胀到超过 2.1GB。即使您请求的是一个只读映射, map( )方法也会尝试这样做并且大多数情况下都会抛出一个 IOException 异常,因为底层的文件不 能被修改。该行为同之前讨论的文件“空洞”的行为是一致的。详情请参考 3.3.1 节。

FileChannel 类定义了代表映射模式的常量,且是使用一个类型安全的枚举而非数字值来定义 这些常量。这些常量是 FileChannel 内部定义的一个内部类(inner class)的静态字段,它们可以在 编译时被检查类型,不过您可以像使用一个数值型常量那样使用它们。

同常规的文件句柄类似,文件映射可以是可写的或只读的。前两种映射模式 MapMode.READ_ONLY 和 MapMode.READ_WRITE 意义是很明显的,它们表示您希望获取的映射 只读还是允许修改映射的文件。请求的映射模式将受被调用 map( )方法的 FileChannel 对象的访问 权限所限制。如果通道是以只读的权限打开的而您却请求 MapMode.READ_WRITE 模式,那么 map( )方法会抛出一个 NonWritableChannelException 异常;如果您在一个没有读权限的通道上请求 MapMode.READ_ONLY 映射模式,那么将产生 NonReadableChannelException 异常。不过在以 read/write 权限打开的通道上请求一个 MapMode.READ_ONLY 映射却是允许的。MappedByteBuffer 对象的可变性可以通过对它调用 isReadOnly( )方法来检查。

第三种模式 MapMode.PRIVATE 表示您想要一个写时拷贝(copy-on-write)的映射。这意味着 您通过 put( )方法所做的任何修改都会导致产生一个私有的数据拷贝并且该拷贝中的数据只有 MappedByteBuffer 实例可以看到。该过程不会对底层文件做任何修改,而且一旦缓冲区被施以垃圾 收集动作(garbage collected),那些修改都会丢失。尽管写时拷贝的映射可以防止底层文件被修 改,您也必须以 read/write 权限来打开文件以建立 MapMode.PRIVATE 映射。只有这样,返回的 MappedByteBuffer 对象才能允许使用 put( )方法。

写时拷贝这一技术经常被操作系统使用,以在一个进程生成另一个进程时管理虚拟地址空间 (virtual address spaces)。使用写时拷贝可以允许父进程和子进程共享内存页直到它们中的一方实 际发生修改行为。在处理同一文件的多个映射时也有相同的优势(当然,这需要底层操作系统的支 持)。假设一个文件被多个 MappedByteBuffer 对象映射并且每个映射都是 MapMode.PRIVATE 模 式,那么这份文件的大部分内容都可以被所有映射共享。

选择使用 MapMode.PRIVATE 模式并不会导致您的缓冲区看不到通过其他方式对文件所做的 修改。对文件某个区域的修改在使用 MapMode.PRIVATE 模式的缓冲区中都能反映出来,除非该 缓冲区已经修改了文件上的同一个区域。正如第一章中所描述的,内存和文件系统都被划分成了 页。当在一个写时拷贝的缓冲区上调用 put( )方法时,受影响的页会被拷贝,然后更改就会应用到 该拷贝中。具体的页面大小取决于具体实现,不过通常都是和底层文件系统的页面大小时一样的。 如果缓冲区还没对某个页做出修改,那么这个页就会反映被映射文件的相应位置上的内容。一旦某 个页因为写操作而被拷贝,之后就将使用该拷贝页,并且不能被其他缓冲区或文件更新所修改。例 3-5 的代码诠释了这一行为。

您应该注意到了没有 unmap( )方法。也就是说,一个映射一旦建立之后将保持有效,直到 MappedByteBuffer 对象被施以垃圾收集动作为止。同锁不一样的是,映射缓冲区没有绑定到创建它 们的通道上。关闭相关联的 FileChannel 不会破坏映射,只有丢弃缓冲区对象本身才会破坏该映 射。NIO 设计师们之所以做这样的决定是因为当关闭通道时破坏映射会引起安全问题,而解决该安 全问题又会导致性能问题。如果您确实需要知道一个映射是什么时候被破坏的,他们建议使用虚引 用(phantom references,参见 java.lang.ref.PhantomReference)和一个 cleanup 线程。不过有此需要 的概率是微乎其微的。

MemoryMappedBuffer 直接反映它所关联的磁盘文件。如果映射有效时文件被在结构上修改, 就会产生奇怪的行为(当然具体的行为是取决于操作系统和文件系统的)。MemoryMappedBuffer 有固定的大小,不过它所映射的文件却是弹性的。具体来说,如果映射有效时文件大小变化了,那 么缓冲区的部分或全部内容都可能无法访问,并将返回未定义的数据或者抛出未检查的异常。关于 被内存映射的文件如何受其他线程或外部进程控制这一点,请务必小心对待。

所有的 MappedByteBuffer 对象都是直接的,这意味着它们占用的内存空间位于 Java 虚拟机内 存堆之外(并且可能不会算作 Java 虚拟机的内存占用,不过这取决于操作系统的虚拟内存模 型)。

因为 MappedByteBuffers 也是 ByteBuffers,所以能够被传递 SocketChannel 之类通道的 read( )或 write( )以有效传输数据给被映射的文件或从被映射的文件读取数据。如能再结合 scatter/gather,那 么从内存缓冲区和被映射文件内容中组织数据就变得很容易了。例 3-4 就是以此方式写 HTTP 回应 的。3.4.1 节中将描述一个传输数据给通道或从其他通道读取数据的更加有效的方式。

到现在为止,我们已经讨论完了映射缓冲区同其他缓冲区相同的特性,这些也是您会用得最多 的。不过 MappedByteBuffer 还定义了几个它独有的方法:

package java.nio;public abstract class MappedByteBufferextends ByteBuffer {public final boolean isLoaded()public final MappedByteBuffer load()public final MappedByteBuffer force()
}

当我们为一个文件建立虚拟内存映射之后,文件数据通常不会因此被从磁盘读取到内存(这取 决于操作系统)。该过程类似打开一个文件:文件先被定位,然后一个文件句柄会被创建,当您准 备好之后就可以通过这个句柄来访问文件数据。对于映射缓冲区,虚拟内存系统将根据您的需要来把文件中相应区块的数据读进来。这个页验证或防错过程需要一定的时间,因为将文件数据读取到 内存需要一次或多次的磁盘访问。某些场景下,您可能想先把所有的页都读进内存以实现最小的缓 冲区访问延迟。如果文件的所有页都是常驻内存的,那么它的访问速度就和访问一个基于内存的缓 冲区一样了。

load( )方法会加载整个文件以使它常驻内存。正如我们在第一章所讨论的,一个内存映射缓冲 区会建立与某个文件的虚拟内存映射。此映射使得操作系统的底层虚拟内存子系统可以根据需要将 文件中相应区块的数据读进内存。已经在内存中或通过验证的页会占用实际内存空间,并且在它们 被读进 RAM 时会挤出最近较少使用的其他内存页。

在一个映射缓冲区上调用 load( )方法会是一个代价高的操作,因为它会导致大量的页调入 (page-in),具体数量取决于文件中被映射区域的实际大小。然而,load( )方法返回并不能保证文 件就会完全常驻内存,这是由于请求页面调入(demand paging)是动态的。具体结果会因某些因素 而有所差异,这些因素包括:操作系统、文件系统,可用 Java 虚拟机内存,最大 Java 虚拟机内 存,垃圾收集器实现过程等等。请小心使用 load( )方法,它可能会导致您不希望出现的结果。该方 法的主要作用是为提前加载文件埋单,以便后续的访问速度可以尽可能的快。

对于那些要求近乎实时访问(near-realtime access)的程序,解决方案就是预加载。但是请记 住,不能保证全部页都会常驻内存,不管怎样,之后可能还会有页调入发生。内存页什么时候以及 怎样消失受多个因素影响,这些因素中的许多都是不受 Java 虚拟机控制的。JDK 1.4 的 NIO 并没有 提供一个可以把页面固定到物理内存上的 API,尽管一些操作系统是支持这样做的。

对于大多数程序,特别是交互性的或其他事件驱动(event-driven)的程序而言,为提前加载文 件消耗资源是不划算的。在实际访问时分摊页调入开销才是更好的选择。让操作系统根据需要来调 入页意味着不访问的页永远不需要被加载。同预加载整个被映射的文件相比,这很容易减少 I/O 活 动总次数。操作系统已经有一个复杂的内存管理系统了,就让它来替您完成此工作吧!

我们可以通过调用 isLoaded( )方法来判断一个被映射的文件是否完全常驻内存了。如果该方法 返回 true 值,那么很大概率是映射缓冲区的访问延迟很少或者根本没有延迟。不过,这也是不能 保证的。同样地,返回 false 值并不一定意味着访问缓冲区将很慢或者该文件并未完全常驻内 存。isLoaded( )方法的返回值只是一个暗示,由于垃圾收集的异步性质、底层操作系统以及运行系 统的动态性等因素,想要在任意时刻准确判断全部映射页的状态是不可能的。

上面代码中列出的最后一个方法 force( )同 FileChannel 类中的同名方法相似(参见 3.3.1 节) 该方法会强制将映射缓冲区上的更改应用到永久磁盘存储器上。当用 MappedByteBuffer 对象来更新 一个文件,您应该总是使用 MappedByteBuffer.force( )而非 FileChannel.force( ),因为通道对象可能 不清楚通过映射缓冲区做出的文件的全部更改。MappedByteBuffer 没有不更新文件元数据的选项—元数据总是会同时被更新的。请注意,非本地文件系统也同样影响 MappedByteBuffer.force( )方 法,正如它会对 FileChannel.force( )方法有影响,在这里(参见 3.3.1 节)。

如果映射是以 MapMode.READ_ONLY 或 MAP_MODE.PRIVATE 模式建立的,那么调用 force( ) 方法将不起任何作用,因为永远不会有更改需要应用到磁盘上(但是这样做也是没有害处的)。

例 3-4 诠释了内存映射缓冲区如何同 scatter/gather 结合使用。

例 3-4 使用映射文件和 gathering 写操作来编写 HTTP 回复

package org.example;import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URLConnection;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;/*** Dummy HTTP server using MappedByteBuffers.* <p>* Given a filename on the command line, pretend to be* a web server and generate an HTTP response containing* the file content preceded by appropriate headers. The* data is sent with a gathering write.* <p>* * @author Ron Hitchens (ron@ronsoft.com)*/
public class MappedHttp {private static final String OUTPUT_FILE = "MappedHttp.out";private static final String LINE_SEP = "\r\n";private static final String SERVER_ID = "Server: Ronsoft Dummy Server";private static final String HTTP_HDR ="HTTP/1.0 200 OK" + LINE_SEP + SERVER_ID + LINE_SEP;private static final String HTTP_404_HDR ="HTTP/1.0 404 Not Found" + LINE_SEP + SERVER_ID + LINE_SEP;private static final String MSG_404 = "Could not open file: ";public static void main(String[] argv)throws Exception {if (argv.length < 1) {System.err.println("Usage: filename");return;}String file = argv[0];ByteBuffer header = ByteBuffer.wrap(bytes(HTTP_HDR));ByteBuffer dynhdrs = ByteBuffer.allocate(128);ByteBuffer[] gather = {header, dynhdrs, null};String contentType = "unknown/unknown";long contentLength = -1;try {FileInputStream fis = new FileInputStream(file);FileChannel fc = fis.getChannel();MappedByteBuffer filedata = fc.map(FileChannel.MapMode.READ_ONLY, 0, fc.size());gather[2] = filedata;contentLength = fc.size();contentType = URLConnection.guessContentTypeFromName(file);} catch (IOException e) {// file could not be opened; report problemByteBuffer buf = ByteBuffer.allocate(128);String msg = MSG_404 + e + LINE_SEP;buf.put(bytes(msg));buf.flip();// Use the HTTP error responsegather[0] = ByteBuffer.wrap(bytes(HTTP_404_HDR));gather[2] = buf;contentLength = msg.length();contentType = "text/plain";}StringBuffer sb = new StringBuffer();sb.append("Content-Length: " + contentLength);sb.append(LINE_SEP);sb.append("Content-Type: ").append(contentType);sb.append(LINE_SEP).append(LINE_SEP);dynhdrs.put(bytes(sb.toString()));dynhdrs.flip();FileOutputStream fos = new FileOutputStream(OUTPUT_FILE);FileChannel out = fos.getChannel();// All the buffers have been prepared; write 'em outwhile (out.write(gather) > 0) {// Empty body; loop until all buffers are empty}out.close();System.out.println("output written to " + OUTPUT_FILE);}// Convert a string to its constituent bytes// from the ASCII character set private static byte[] bytes(String string)throws Exception {return (string.getBytes("US-ASCII"));}
}

例 3-5 诠释了各种模式的内存映射如何交互。具体来说,例中代码诠释了写时拷贝是如何页导 向(page-oriented)的。当在使用 MAP_MODE.PRIVATE 模式创建的 MappedByteBuffer 对象上 调用 put( )方法而引发更改时,就会生成一个受影响页的拷贝。这份私有的拷贝不仅反映本地更 改,而且使缓冲区免受来自外部对原来页更改的影响。然而,对于被映射文件其他区域的更改还是 可以看到的。

例 3-5 三种类型的内存映射缓冲区

package org.example;import java.io.File;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;/*** Test behavior of Memory mapped buffer types. Create a file, write* some data to it, then create three different types of mappings* to it. Observe the effects of changes through the buffer APIs* and updating the file directly. The data spans page boundaries* to illustrate the page-oriented nature of Copy-On-Write mappings.** @author Ron Hitchens (ron@ronsoft.com)*/
public class MapFile {public static void main(String[] argv) throws Exception {// Create a temp file and get a channel connected to itFile tempFile = File.createTempFile("mmaptest", null);RandomAccessFile file = new RandomAccessFile(tempFile, "rw");FileChannel channel = file.getChannel();ByteBuffer temp = ByteBuffer.allocate(100);// Put something in the file, starting at location 0temp.put("This is the file content".getBytes());temp.flip();channel.write(temp, 0);// Put something else in the file, starting at location 8192.// 8192 is 8 KB, almost certainly a different memory/FS page.// This may cause a file hole, depending on the// filesystem page size.temp.clear();temp.put("This is more file content".getBytes());temp.flip();channel.write(temp, 8192);// Create three types of mappings to the same fileMappedByteBuffer ro = channel.map(FileChannel.MapMode.READ_ONLY, 0, channel.size());MappedByteBuffer rw = channel.map(FileChannel.MapMode.READ_WRITE, 0, channel.size());MappedByteBuffer cow = channel.map(FileChannel.MapMode.PRIVATE, 0, channel.size());// the buffer states before any modificationsSystem.out.println("Begin");showBuffers(ro, rw, cow);// Modify the copy-on-write buffercow.position(8);cow.put("COW".getBytes());System.out.println("Change to COW buffer");showBuffers(ro, rw, cow);// Modify the read/write bufferrw.position(9);rw.put(" R/W ".getBytes());rw.position(8194);rw.put(" R/W ".getBytes());rw.force();System.out.println("Change to R/W buffer");showBuffers(ro, rw, cow);// Write to the file through the channel; hit both pagestemp.clear();temp.put("Channel write ".getBytes());temp.flip();channel.write(temp, 0);temp.rewind();channel.write(temp, 8202);System.out.println("Write on channel");showBuffers(ro, rw, cow);// Modify the copy-on-write buffer againcow.position(8207);cow.put(" COW2 ".getBytes());System.out.println("Second change to COW buffer");showBuffers(ro, rw, cow);// Modify the read/write bufferrw.position(0);rw.put(" R/W2 ".getBytes());rw.position(8210);rw.put(" R/W2 ".getBytes());rw.force();System.out.println("Second change to R/W buffer");showBuffers(ro, rw, cow);// cleanupchannel.close();file.close();tempFile.delete();}// Show the current content of the three bufferspublic static void showBuffers(ByteBuffer ro, ByteBuffer rw, ByteBuffer cow)throws Exception {dumpBuffer("R/O", ro);dumpBuffer("R/W", rw);dumpBuffer("COW", cow);System.out.println("");}// Dump buffer content, counting and skipping nullspublic static void dumpBuffer(String prefix, ByteBuffer buffer) throws Exception {System.out.print(prefix + ": '");int nulls = 0;int limit = buffer.limit();for (int i = 0; i < limit; i++) {char c = (char) buffer.get(i);if (c == '\u0000') {nulls++;continue;}if (nulls != 0) {System.out.print("|[" + nulls+ " nulls]|");nulls = 0;}System.out.print(c);}System.out.println("'");}
}

以下是运行上面程序的输出:

Begin
R/O: 'This is the file content|[8168 nulls]|This is more file content'
R/W: 'This is the file content|[8168 nulls]|This is more file content'
COW: 'This is the file content|[8168 nulls]|This is more file content'Change to COW buffer
R/O: 'This is the file content|[8168 nulls]|This is more file content'
R/W: 'This is the file content|[8168 nulls]|This is more file content'
COW: 'This is COW file content|[8168 nulls]|This is more file content'Change to R/W buffer
R/O: 'This is t R/W le content|[8168 nulls]|Th R/W more file content'
R/W: 'This is t R/W le content|[8168 nulls]|Th R/W more file content'
COW: 'This is COW file content|[8168 nulls]|Th R/W more file content'Write on channel
R/O: 'Channel write le content|[8168 nulls]|Th R/W moChannel write t'
R/W: 'Channel write le content|[8168 nulls]|Th R/W moChannel write t'
COW: 'This is COW file content|[8168 nulls]|Th R/W moChannel write t'Second change to COW buffer
R/O: 'Channel write le content|[8168 nulls]|Th R/W moChannel write t'
R/W: 'Channel write le content|[8168 nulls]|Th R/W moChannel write t'
COW: 'This is COW file content|[8168 nulls]|Th R/W moChann COW2 te t'Second change to R/W buffer
R/O: ' R/W2 l write le content|[8168 nulls]|Th R/W moChannel R/W2 t'
R/W: ' R/W2 l write le content|[8168 nulls]|Th R/W moChannel R/W2 t'
COW: 'This is COW file content|[8168 nulls]|Th R/W moChann COW2 te t'

1)Channel-to-Channel 传输

由于经常需要从一个位置将文件数据批量传输到另一个位置,FileChannel 类添加了一些优化 方法来提高该传输过程的效率:

package java.nio.channels;import java.io.IOException;
import java.nio.channels.*;
import java.nio.channels.spi.AbstractInterruptibleChannel;public abstract class FileChannelextends AbstractInterruptibleChannelimplements SeekableByteChannel, GatheringByteChannel, ScatteringByteChannel {public abstract long transferTo(long position, long count, WritableByteChannel target) throws IOException;public abstract long transferFrom(ReadableByteChannel src, long position, long count) throws IOException;
}

transferTo( )和 transferFrom( )方法允许将一个通道交叉连接到另一个通道,而不需要通过一个 中间缓冲区来传递数据。只有 FileChannel 类有这两个方法,因此 channel-to-channel 传输中通道之 一必须是 FileChannel。您不能在 socket 通道之间直接传输数据,不过 socket 通道实现 WritableByteChannel 和 ReadableByteChannel 接口,因此文件的内容可以用 transferTo( )方法传输给 一个 socket 通道,或者也可以用 transferFrom( )方法将数据从一个 socket 通道直接读取到一个文件 中。

直接的通道传输不会更新与某个 FileChannel 关联的 position 值。请求的数据传输将从 position 参数指定的位置开始,传输的字节数不超过 count 参数的值。实际传输的字节数会由 方法返回,可能少于您请求的字节数。

对于传输数据来源是一个文件的 transferTo( )方法,如果 position + count 的值大于文件 的 size 值,传输会在文件尾的位置终止。假如传输的目的地是一个非阻塞模式的 socket 通道,那么 当发送队列(send queue)满了之后传输就可能终止,并且如果输出队列(output queue)已满的话 可能不会发送任何数据。类似地,对于 transferFrom( )方法:如果来源 src 是另外一个 FileChannel 并且已经到达文件尾,那么传输将提早终止;如果来源 src 是一个非阻塞 socket 通道,只有当前 处于队列中的数据才会被传输(可能没有数据)。由于网络数据传输的非确定性,阻塞模式的 socket 也可能会执行部分传输,这取决于操作系统。许多通道实现都是提供它们当前队列中已有的 数据而不是等待您请求的全部数据都准备好。

此外,请记住:如果传输过程中出现问题,这些方法也可能抛出 java.io.IOException 异常。

Channel-to-channel 传输是可以极其快速的,特别是在底层操作系统提供本地支持的时候。某些 操作系统可以不必通过用户空间传递数据而进行直接的数据传输。对于大量的数据传输,这会是一 个巨大的帮助(参见例 3-6)。

例 3-6 使用通道传输进行文件连结

package org.example;import java.io.FileInputStream;
import java.nio.channels.Channels;
import java.nio.channels.FileChannel;
import java.nio.channels.WritableByteChannel;/*** Test channel transfer. This is a very simplistic concatenation* program. It takes a list of file names as arguments, opens each* in turn and transfers (copies) their content to the given* WritableByteChannel (in this case, stdout).* <p>* Created April 2002** @author Ron Hitchens (ron@ronsoft.com)*/
public class ChannelTransfer {public static void main(String[] argv)throws Exception {if (argv.length == 0) {System.err.println("Usage: filename ...");return;}catFiles(Channels.newChannel(System.out), argv);}// Concatenate the content of each of the named files to // the given channel. A very dumb version of 'cat'.private static void catFiles(WritableByteChannel target, String[] files) throws Exception {for (int i = 0; i < files.length; i++) {FileInputStream fis = new FileInputStream(files[i]);FileChannel channel = fis.getChannel();channel.transferTo(0, channel.size(), target);channel.close();fis.close();}}
}

5.Socket 通道

现在让我们来学习模拟网络套接字的通道类。Socket 通道有与文件通道不同的特征。

新的 socket 通道类可以运行非阻塞模式并且是可选择的。这两个性能可以激活大程序(如网络 服务器和中间件组件)巨大的可伸缩性和灵活性。本节中我们会看到,再也没有为每个 socket 连接使用一个线程的必要了,也避免了管理大量线程所需的上下文交换总开销。借助新的 NIO 类,一 个或几个线程就可以管理成百上千的活动 socket 连接了并且只有很少甚至可能没有性能损失。

从图 3-9 可知,全部 socket 通道类(DatagramChannel、SocketChannel 和 ServerSocketChannel)都是由位于 java.nio.channels.spi 包中的 AbstractSelectableChannel 引 申而来。这意味着我们可以用一个 Selector 对象来执行 socket 通道的有条件的选择(readiness selection)。选择和多路复用 I/O 会在第四章中讨论。

请注意 DatagramChannel 和 SocketChannel 实现定义读和写功能的接口而 ServerSocketChannel 不实现。ServerSocketChannel 负责监听传入的连接和创建新的 SocketChannel 对象,它本身从不传 输数据。

在我们具体讨论每一种 socket 通道前,您应该了解 socket 和 socket 通道之间的关系。之前的章 节中有写道,通道是一个连接 I/O 服务导管并提供与该服务交互的方法。就某个 socket 而言,它不 会再次实现与之对应的 socket 通道类中的 socket 协议 API,而 java.net 中已经存在的 socket 通 道都可以被大多数协议操作重复使用。

全部 socket 通道类(DatagramChannel、SocketChannel 和 ServerSocketChannel)在被实例化时 都会创建一个对等 socket 对象。这些是我们所熟悉的来自 java.net 的类(Socket、ServerSocket 和 DatagramSocket),它们已经被更新以识别通道。对等 socket 可以通过调用 socket( )方法从一个 通道上获取。此外,这三个 java.net 类现在都有 getChannel( )方法。

虽然每个 socket 通道(在 java.nio.channels 包中)都有一个关联的 java.net socket 对 象,却并非所有的 socket 都有一个关联的通道。如果您用传统方式(直接实例化)创建了一个 Socket 对象,它就不会有关联的 SocketChannel 并且它的 getChannel( )方法将总是返回 null。

Socket 通道委派协议操作给对等 socket 对象。如果在通道类中存在似乎重复的 socket 方法,那 么将有某个新的或者不同的行为同通道类上的这个方法相关联。

1)非阻塞模式

Socket 通道可以在非阻塞模式下运行。这个陈述虽然简单却有着深远的含义。传统 Java socket 的阻塞性质曾经是 Java 程序可伸缩性的最重要制约之一。非阻塞 I/O 是许多复杂的、高性能的程序 构建的基础。

要把一个 socket 通道置于非阻塞模式,我们要依靠所有 socket 通道类的公有超级类: SelectableChannel。下面的方法就是关于通道的阻塞模式的:

package java.nio.channels;import java.io.IOException;
import java.nio.channels.spi.AbstractInterruptibleChannel;public abstract class SelectableChannel extends AbstractInterruptibleChannel implements Channel {public abstract SelectableChannel configureBlocking(boolean block) throws IOException;public abstract boolean isBlocking();public abstract Object blockingLock();
}

有条件的选择(readiness selection)是一种可以用来查询通道的机制,该查询可以判断通道是 否准备好执行一个目标操作,如读或写。非阻塞 I/O 和可选择性是紧密相连的,那也正是管理阻塞 模式的 API 代码要在 SelectableChannel 超级类中定义的原因。SelectableChannel 的剩余 API 将在第 四章中讨论。

设置或重新设置一个通道的阻塞模式是很简单的,只要调用 configureBlocking( )方法即可,传 递参数值为 true 则设为阻塞模式,参数值为 false 值设为非阻塞模式。真的,就这么简单!您 可以通过调用 isBlocking( )方法来判断某个 socket 通道当前处于哪种模式:

SocketChannel sc = SocketChannel.open( );
sc.configureBlocking (false); // nonblocking
...
if (!sc.isBlocking()) { doSomething (cs);
}

服务器端的使用经常会考虑到非阻塞 socket 通道,因为它们使同时管理很多 socket 通道变得更 容易。但是,在客户端使用一个或几个非阻塞模式的 socket 通道也是有益处的,例如,借助非阻塞 socket 通道,GUI 程序可以专注于用户请求并且同时维护与一个或多个服务器的会话。在很多程序 上,非阻塞模式都是有用的。

偶尔地,我们也会需要防止 socket 通道的阻塞模式被更改。API 中有一个 blockingLock( )方 法,该方法会返回一个非透明的对象引用。返回的对象是通道实现修改阻塞模式时内部使用的。只 有拥有此对象的锁的线程才能更改通道的阻塞模式(对象的锁是用同步的 Java 密码获取的,它不 同于我们在 3.3 节中介绍的 lock( )方法)。对于确保在执行代码的关键部分时 socket 通道的阻塞模 式不会改变以及在不影响其他线程的前提下暂时改变阻塞模式来说,这个方法都是非常方便的。

        Socket socket = null;Object lockObj = serverChannel.blockingLock();// have a handle to the lock object, but haven't locked it yet// may block here until lock is acquiredsynchronized (lockObj) {// This thread now owns the lock; mode can't be changedboolean prevState = serverChannel.isBlocking();serverChannel.configureBlocking(false);socket = serverChannel.accept();serverChannel.configureBlocking(prevState);}// lock is now released, mode is allowed to changeif (socket != null) {doSomethingWithTheSocket(socket);}

2)ServerSocketChannel

让我们从最简单的 ServerSocketChannel 来开始对 socket 通道类的讨论。以下是 ServerSocketChannel 的完整 API:

package java.nio.channels;import java.io.IOException;
import java.net.ServerSocket;
import java.nio.channels.NetworkChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.AbstractSelectableChannel;public abstract class ServerSocketChannel extends AbstractSelectableChannel implements NetworkChannel {public static ServerSocketChannel open() throws IOExceptionpublic abstract ServerSocket socket();public abstract SocketChannel accept() throws IOException;public final int validOps()
}

ServerSocketChannel 是一个基于通道的 socket 监听器。它同我们所熟悉的 java.net.ServerSocket 执行相同的基本任务,不过它增加了通道语义,因此能够在非阻塞模式下运行。

用静态的 open( )工厂方法创建一个新的 ServerSocketChannel 对象,将会返回同一个未绑定的 java.net.ServerSocket 关联的通道。该对等 ServerSocket 可以通过在返回的 ServerSocketChannel 上调 用 socket( )方法来获取。作为 ServerSocketChannel 的对等体被创建的 ServerSocket 对象依赖通道实 现。这些 socket 关联的 SocketImpl 能识别通道。通道不能被封装在随意的 socket 对象外面。

由于 ServerSocketChannel 没有 bind( )方法,因此有必要取出对等的 socket 并使用它来绑定到一 个端口以开始监听连接。我们也是使用对等 ServerSocket 的 API 来根据需要设置其他的 socket 选 项。

        ServerSocketChannel ssc = ServerSocketChannel.open();ServerSocket serverSocket = ssc.socket();// Listen on port 1234serverSocket.bind(new InetSocketAddress(1234));

同它的对等体 java.net.ServerSocket 一样,ServerSocketChannel 也有 accept( )方法。一旦您创建 了一个 ServerSocketChannel 并用对等 socket 绑定了它,然后您就可以在其中一个上调用 accept( )。 如果您选择在 ServerSocket 上调用 accept( )方法,那么它会同任何其他的 ServerSocket 表现一样的 行为:总是阻塞并返回一个 java.net.Socket 对象。如果您选择在 ServerSocketChannel 上调用 accept( ) 方法则会返回 SocketChannel 类型的对象,返回的对象能够在非阻塞模式下运行。假设系统已经有 一个安全管理器(security manager),两种形式的方法调用都执行相同的安全检查。

如果以非阻塞模式被调用,当没有传入连接在等待时,ServerSocketChannel.accept( )会立即返 回 null。正是这种检查连接而不阻塞的能力实现了可伸缩性并降低了复杂性。可选择性也因此得 到实现。我们可以使用一个选择器实例来注册一个 ServerSocketChannel 对象以实现新连接到达时自 动通知的功能。例 3-7 演示了如何使用一个非阻塞的 accept( )方法:

例 3-7 使用 ServerSocketChannel 的非阻塞 accept( )方法

package org.example;import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;/*** Test nonblocking accept( ) using ServerSocketChannel.* Start this program, then "telnet localhost 1234" to* connect to it.* <p>** @author Ron Hitchens (ron@ronsoft.com)*/
public class ChannelAccept {public static final String GREETING = "Hello I must be going.\r\n";public static void main(String[] argv)throws Exception {int port = 1234; // defaultif (argv.length > 0) {port = Integer.parseInt(argv[0]);}ByteBuffer buffer = ByteBuffer.wrap(GREETING.getBytes());ServerSocketChannel ssc = ServerSocketChannel.open();ssc.socket().bind(new InetSocketAddress(port));ssc.configureBlocking(false);while (true) {System.out.println("Waiting for connections");SocketChannel sc = ssc.accept();if (sc == null) {// no connections, snooze a whileThread.sleep(2000);} else {System.out.println("Incoming connection from: " + sc.socket().getRemoteSocketAddress());buffer.rewind();sc.write(buffer);sc.close();}}}
}

前面列出的最后一个方法 validOps( )是同选择器一起使用的。关于选择器,我们将在第四章中 予以详细讨论并且会介绍到 validOps( )方法。

3)SocketChannel

下面开始学习 SocketChannel,它是使用最多的 socket 通道类:

package org.example;import java.io.IOException;
import java.net.Socket;
import java.net.SocketAddress;
import java.nio.channels.ByteChannel;
import java.nio.channels.GatheringByteChannel;
import java.nio.channels.NetworkChannel;
import java.nio.channels.ScatteringByteChannel;
import java.nio.channels.spi.AbstractSelectableChannel;public abstract class SocketChannel extends AbstractSelectableChannel implements ByteChannel, ScatteringByteChannel, GatheringByteChannel, NetworkChannel {public static SocketChannel open() throws IOExceptionpublic static SocketChannel open(SocketAddress remote) throws IOExceptionpublic abstract Socket socket();public abstract boolean connect(SocketAddress remote) throws IOException;public abstract boolean isConnectionPending();public abstract boolean finishConnect() throws IOException;public abstract boolean isConnected();public final int validOps()
}

Socket 和 SocketChannel 类封装点对点、有序的网络连接,类似于我们所熟知并喜爱的 TCP/IP 网络连接。SocketChannel 扮演客户端发起同一个监听服务器的连接。直到连接成功,它才能收到 数据并且只会从连接到的地址接收。(对于 ServerSocketChannel,由于涉及到 validOps( )方法,我 们将在第四章检查选择器时进行讨论。通用的 read/write 方法也未在此列出,详情请参考 3.1.2 节。)

每个 SocketChannel 对象创建时都是同一个对等的 java.net.Socket 对象串联的。静态的 open( )方 法可以创建一个新的 SocketChannel 对象,而在新创建的 SocketChannel 上调用 socket( )方法能返回 它对等的 Socket 对象;在该 Socket 上调用 getChannel( )方法则能返回最初的那个 SocketChannel。

新创建的 SocketChannel 虽已打开却是未连接的。在一个未连接的 SocketChannel 对象上尝试一 个 I/O 操作会导致 NotYetConnectedException 异常。我们可以通过在通道上直接调用 connect( )方法 或在通道关联的 Socket 对象上调用 connect( )来将该 socket 通道连接。一旦一个 socket 通道被连 接,它将保持连接状态直到被关闭。您可以通过调用布尔型的 isConnected( )方法来测试某个 SocketChannel 当前是否已连接。

第二种带 InetSocketAddress 参数形式的 open( )是在返回之前进行连接的便捷方法。这段代码:

SocketChannel socketChannel = SocketChannel.open (new InetSocketAddress ("somehost", somePort));

等价于下面这段代码:

SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect (new InetSocketAddress ("somehost", somePort));

如果您选择使用传统方式进行连接——通过在对等 Socket 对象上调用 connect( )方法,那么传 统的连接语义将适用于此。线程在连接建立好或超时过期之前都将保持阻塞。如果您选择通过在通道上直接调用 connect( )方法来建立连接并且通道处于阻塞模式(默认模式),那么连接过程实际 上是一样的。

在 SocketChannel 上并没有一种 connect( )方法可以让您指定超时(timeout)值,当 connect( )方 法在非阻塞模式下被调用时 SocketChannel 提供并发连接:它发起对请求地址的连接并且立即返回 值。如果返回值是 true,说明连接立即建立了(这可能是本地环回连接);如果连接不能立即建 立,connect( )方法会返回 false 且并发地继续连接建立过程。

面向流的的 socket 建立连接状态需要一定的时间,因为两个待连接系统之间必须进行包对话以 建立维护流 socket 所需的状态信息。跨越开放互联网连接到远程系统会特别耗时。假如某个 SocketChannel 上当前正由一个并发连接,isConnectPending( )方法就会返回 true 值。

  • 调用 finishConnect( )方法来完成连接过程,该方法任何时候都可以安全地进行调用。假如在一 个非阻塞模式的 SocketChannel 对象上调用 finishConnect( )方法,将可能出现下列情形之一:

    • connect( )方法尚未被调用。那么将产生 NoConnectionPendingException 异常。

    • 连接建立过程正在进行,尚未完成。那么什么都不会发生,finishConnect( )方法会立即返回false 值。

    • 在非阻塞模式下调用 connect( )方法之后,SocketChannel 又被切换回了阻塞模式。那么如果有必要的话,调用线程会阻塞直到连接建立完成,finishConnect( )方法接着就会返回 true值。

    • 在初次调用 connect( )或最后一次调用 finishConnect( )之后,连接建立过程已经完成。那么SocketChannel 对象的内部状态将被更新到已连接状态,finishConnect( )方法会返回 true值,然后 SocketChannel 对象就可以被用来传输数据了。

    • 连接已经建立。那么什么都不会发生,finishConnect( )方法会返回 true 值。

当通道处于中间的连接等待(connection-pending)状态时,您只可以调用 finishConnect( )、 isConnectPending( )或 isConnected( )方法。一旦连接建立过程成功完成,isConnected( )将返回 true值。

        InetSocketAddress addr = new InetSocketAddress(host, port);SocketChannel sc = SocketChannel.open();sc.configureBlocking(false);sc.connect(addr);while (!sc.finishConnect()) {doSomethingElse();}doSomethingWithChannel(sc);sc.close();

例 3-8 是一段用来管理异步连接的可用代码。

例 3-8 建立并发连接

package org.example;import java.net.InetSocketAddress;/*** Demonstrate asynchronous connection of a SocketChannel.** @author Ron Hitchens (ron@ronsoft.com)*/
public class ConnectAsync {public static void main(String[] argv) throws Exception {String host = "localhost";int port = 80;if (argv.length == 2) {host = argv[0];port = Integer.parseInt(argv[1]);}InetSocketAddress addr = new InetSocketAddress(host, port);SocketChannel sc = SocketChannel.open();sc.configureBlocking(false);System.out.println("initiating connection");sc.connect(addr);while (!sc.finishConnect()) {doSomethingUseful();}System.out.println("connection established");// Do something with the connected socket// The SocketChannel is still nonblocking sc.close();}private static void doSomethingUseful() {System.out.println("doing something useless");}
}

如果尝试异步连接失败,那么下次调用 finishConnect( )方法会产生一个适当的经检查的异常以 指出问题的性质。通道然后就会被关闭并将不能被连接或再次使用。

与连接相关的方法使得我们可以对一个通道进行轮询并在连接进行过程中判断通道所处的状 态。第四章中,我们将了解到如何使用选择器来避免进行轮询并在异步连接建立之后收到通知。

Socket 通道是线程安全的。并发访问时无需特别措施来保护发起访问的多个线程,不过任何时 候都只有一个读操作和一个写操作在进行中。请记住,sockets 是面向流的而非包导向的。它们可 以保证发送的字节会按照顺序到达但无法承诺维持字节分组。某个发送器可能给一个 socket 写入了 20 个字节而接收器调用 read( )方法时却只收到了其中的 3 个字节。剩下的 17 个字节还是传输中。 由于这个原因,让多个不配合的线程共享某个流 socket 的同一侧绝非一个好的设计选择。

connect( )和 finishConnect( )方法是互相同步的,并且只要其中一个操作正在进行,任何读或写 的方法调用都会阻塞,即使是在非阻塞模式下。如果此情形下您有疑问或不能承受一个读或写操作 在某个通道上阻塞,请用 isConnected( )方法测试一下连接状态。

4)DatagramChannel

最后一个 socket 通道是 DatagramChannel。正如 SocketChannel 对应 Socket, ServerSocketChannel 对应 ServerSocket,每一个 DatagramChannel 对象也有一个关联的 DatagramSocket 对象。不过原命名模式在此并未适用:“DatagramSocketChannel”显得有点笨拙, 因此采用了简洁的“DatagramChannel”名称。

正如 SocketChannel 模拟连接导向的流协议(如 TCP/IP),DatagramChannel 则模拟包导向的 无连接协议(如 UDP/IP):

package java.nio.channels;import java.io.IOException;
import java.net.DatagramSocket;
import java.net.ProtocolFamily;
import java.net.SocketAddress;
import java.net.SocketOption;
import java.nio.ByteBuffer;
import java.nio.channels.ByteChannel;
import java.nio.channels.GatheringByteChannel;
import java.nio.channels.MulticastChannel;
import java.nio.channels.ScatteringByteChannel;
import java.nio.channels.spi.AbstractSelectableChannel;public abstract class DatagramChannelextends AbstractSelectableChannelimplements ByteChannel, ScatteringByteChannel, GatheringByteChannel, MulticastChannel {public static DatagramChannel open() throws IOExceptionpublic static DatagramChannel open(ProtocolFamily family) throws IOExceptionpublic final int validOps()public abstract DatagramChannel bind(SocketAddress local) throws IOException;public abstract <T> DatagramChannel setOption(SocketOption<T> name, T value) throws IOException;public abstract DatagramSocket socket();public abstract boolean isConnected();public abstract DatagramChannel connect(SocketAddress remote) throws IOException;public abstract DatagramChannel disconnect() throws IOException;public abstract SocketAddress getRemoteAddress() throws IOException;public abstract SocketAddress receive(ByteBuffer dst) throws IOException;public abstract int send(ByteBuffer src, SocketAddress target) throws IOException;public abstract int read(ByteBuffer dst) throws IOException;public abstract long read(ByteBuffer[] dsts, int offset, int length) throws IOException;public final long read(ByteBuffer[] dsts) throws IOExceptionpublic abstract int write(ByteBuffer src) throws IOException;public abstract long write(ByteBuffer[] srcs, int offset, int length) throws IOException;public final long write(ByteBuffer[] srcs) throws IOException
}

创建 DatagramChannel 的模式和创建其他 socket 通道是一样的:调用静态的 open( )方法来创建 一个新实例。新 DatagramChannel 会有一个可以通过调用 socket( )方法获取的对等 DatagramSocket 对象。DatagramChannel 对象既可以充当服务器(监听者)也可以充当客户端(发送者)。如果您 希望新创建的通道负责监听,那么通道必须首先被绑定到一个端口或地址/端口组合上。绑定 DatagramChannel 同绑定一个常规的 DatagramSocket 没什么区别,都是委托对等 socket 对象上的 API 实现的:

        DatagramChannel channel = DatagramChannel.open();DatagramSocket socket = channel.socket();socket.bind(new InetSocketAddress(portNumber));

DatagramChannel 是无连接的。每个数据报(datagram)都是一个自包含的实体,拥有它自己 的目的地址及不依赖其他数据报的数据净荷。与面向流的的 socket 不同,DatagramChannel 可以发 送单独的数据报给不同的目的地址。同样,DatagramChannel 对象也可以接收来自任意地址的数据 包。每个到达的数据报都含有关于它来自何处的信息(源地址)。

一个未绑定的 DatagramChannel 仍能接收数据包。当一个底层 socket 被创建时,一个动态生成 的端口号就会分配给它。绑定行为要求通道关联的端口被设置为一个特定的值(此过程可能涉及安 全检查或其他验证)。不论通道是否绑定,所有发送的包都含有 DatagramChannel 的源地址 (带端口号)。未绑定的 DatagramChannel 可以接收发送给它的端口的包,通常是来回应该通道之 前发出的一个包。已绑定的通道接收发送给它们所绑定的熟知端口(wellknown port)的包。数据 的实际发送或接收是通过 send( )和 receive( )方法来实现的:

package java.nio.channels;import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ByteChannel;
import java.nio.channels.GatheringByteChannel;
import java.nio.channels.MulticastChannel;
import java.nio.channels.ScatteringByteChannel;
import java.nio.channels.spi.AbstractSelectableChannel;public abstract class DatagramChannelextends AbstractSelectableChannelimplements ByteChannel, ScatteringByteChannel, GatheringByteChannel, MulticastChannel {// This is a partial API listingpublic abstract SocketAddress receive(ByteBuffer dst) throws IOException;public abstract int send(ByteBuffer src, SocketAddress target) throws IOException;
}

receive( )方法将下次将传入的数据报的数据净荷复制到预备好的 ByteBuffer 中并返回一个 SocketAddress 对象以指出数据来源。如果通道处于阻塞模式,receive( )可能无限期地休眠直到有包 到达。如果是非阻塞模式,当没有可接收的包时则会返回 null。如果包内的数据超出缓冲区能承 受的范围,多出的数据都会被悄悄地丢弃。

调用 send( )会发送给定 ByteBuffer 对象的内容到给定 SocketAddress 对象所描述的目的地址和端 口,内容范围为从当前 position 开始到末尾处结束。如果 DatagramChannel 对象处于阻塞模式,调 用线程可能会休眠直到数据报被加入传输队列。如果通道是非阻塞的,返回值要么是字节缓冲区的 字节数,要么是“0”。发送数据报是一个全有或全无(all-or-nothing)的行为。如果传输队列没有 足够空间来承载整个数据报,那么什么内容都不会被发送。

如果安装了安全管理器,那么每次调用 send( )或 receive( )时安全管理器的 checkConnect( )方法 都会被调用以验证目的地址,除非通道处于已连接的状态(本节后面会讨论到)。

请注意,数据报协议的不可靠性是固有的,它们不对数据传输做保证。send( )方法返回的非零 值并不表示数据报到达了目的地,仅代表数据报被成功加到本地网络层的传输队列。此外,传输过 程中的协议可能将数据报分解成碎片。例如,以太网不能传输超过 1,500 个字节左右的包。如果您 的数据报比较大,那么就会存在被分解成碎片的风险,成倍地增加了传输过程中包丢失的几率。被 分解的数据报在目的地会被重新组合起来,接收者将看不到碎片。但是,如果有一个碎片不能按时 到达,那么整个数据报将被丢弃。

DatagramChannel 有一个 connect( )方法:

package java.nio.channels;import java.io.IOException;
import java.net.SocketAddress;
import java.nio.channels.ByteChannel;
import java.nio.channels.GatheringByteChannel;
import java.nio.channels.MulticastChannel;
import java.nio.channels.ScatteringByteChannel;
import java.nio.channels.spi.AbstractSelectableChannel;public abstract class DatagramChannelextends AbstractSelectableChannelimplements ByteChannel, ScatteringByteChannel, GatheringByteChannel, MulticastChannel {// This is a partial API listingpublic abstract DatagramChannel connect(SocketAddress remote) throws IOException;public abstract boolean isConnected();public abstract DatagramChannel disconnect() throws IOException;
}

DatagramChannel 对数据报 socket 的连接语义不同于对流 socket 的连接语义。有时候,将 数据报对话限制为两方是很可取的。将 DatagramChannel 置于已连接的状态可以使除了它所“连 接”到的地址之外的任何其他源地址的数据报被忽略。这是很有帮助的,因为不想要的包都已经被 网络层丢弃了,从而避免了使用代码来接收、检查然后丢弃包的麻烦。

当 DatagramChannel 已连接时,使用同样的令牌,您不可以发送包到除了指定给 connect( )方 法的目的地址以外的任何其他地址。试图一定要这样做的话会导致一个 SecurityException 异常。

我们可以通过调用带 SocketAddress 对象的 connect( )方法来连接一个 DatagramChannel,该 SocketAddress 对象描述了 DatagramChannel 远程对等体的地址。如果已经安装了一个安全管理器, 那么它会进行权限检查。之后,每次 send/receive 时就不会再有安全检查了,因为来自或去到任何 其他地址的包都是不允许的。

已连接通道会发挥作用的使用场景之一是一个客户端/服务器模式、使用 UDP 通讯协议的实时 游戏。每个客户端都只和同一台服务器进行会话而希望忽视任何其他来源地数据包。将客户端的 DatagramChannel 实例置于已连接状态可以减少按包计算的总开销(因为不需要对每个包进行安全 检查)和剔除来自欺骗玩家的假包。服务器可能也想要这样做,不过需要每个客户端都有一个 DatagramChannel 对象。

不同于流 socket,数据报 socket 的无状态性质不需要同远程系统进行对话来建立连接状态。没 有实际的连接,只有用来指定允许的远程地址的本地状态信息。由于此原因,DatagramChannel 上 也就没有单独的 finishConnect( )方法。我们可以使用 isConnected( )方法来测试一个数据报通道的连 接状态。

不同于 SocketChannel(必须连接了才有用并且只能连接一次),DatagramChannel 对象可以任 意次数地进行连接或断开连接。每次连接都可以到一个不同的远程地址。调用 disconnect( )方法可 以配置通道,以便它能再次接收来自安全管理器(如果已安装)所允许的任意远程地址的数据或发 送数据到这些地址上。

当一个 DatagramChannel 处于已连接状态时,发送数据将不用提供目的地址而且接收时的源地 址也是已知的。这意味着 DatagramChannel 已连接时可以使用常规的 read( )和 write( )方法,包括scatter/gather 形式的读写来组合或分拆包的数据:

package java.nio.channels;import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ByteChannel;
import java.nio.channels.GatheringByteChannel;
import java.nio.channels.MulticastChannel;
import java.nio.channels.ScatteringByteChannel;
import java.nio.channels.spi.AbstractSelectableChannel;public abstract class DatagramChannelextends AbstractSelectableChannelimplements ByteChannel, ScatteringByteChannel, GatheringByteChannel, MulticastChannel {// This is a partial API listingpublic abstract int read(ByteBuffer dst) throws IOException;public abstract long read(ByteBuffer[] dsts, int offset, int length) throws IOException;public final long read(ByteBuffer[] dsts) throws IOExceptionpublic abstract int write(ByteBuffer src) throws IOException;public abstract long write(ByteBuffer[] srcs, int offset, int length) throws IOException;public final long write(ByteBuffer[] srcs) throws IOException
}

read( )方法返回读取字节的数量,如果通道处于非阻塞模式的话这个返回值可能是“0”。 write( )方法的返回值同 send( )方法一致:要么返回您的缓冲区中的字节数量,要么返回“0”(如 果由于通道处于非阻塞模式而导致数据报不能被发送)。当通道不是已连接状态时调用 read( )或 write( )方法,都将产生 NotYetConnectedException 异常。

数据报通道不同于流 socket。由于它们的有序而可靠的数据传输特性,流 socket 非常得有用。 大多数网络连接都是流 socket(TCP/IP 就是一个显著的例子)。但是,像 TCP/IP 这样面向流的的 协议为了在包导向的互联网基础设施上维护流语义必然会产生巨大的开销,并且流隐喻不能适用所 有的情形。数据报的吞吐量要比流协议高很多,并且数据报可以做很多流无法完成的事情。

  • 下面列出了一些选择数据报 socket 而非流 socket 的理由:

    • 您的程序可以承受数据丢失或无序的数据。

    • 您希望“发射后不管”(fire and forget)而不需要知道您发送的包是否已接收。

    • 数据吞吐量比可靠性更重要。

    • 您需要同时发送数据给多个接受者(多播或者广播)。

    • 包隐喻比流隐喻更适合手边的任务。

如果以上特征中的一个或多个适用于您的程序,那么数据报设计对您来说就是合适的。

例 3-9 显示了如何使用 DatagramChannel 发送请求到多个地址上的时间服务器。 DatagramChannel 接着会等待回复(reply)的到达。对于每个返回的回复,远程时间会同本地时间 进行比较。由于数据报传输不保证一定成功,有些回复可能永远不会到达。大多数 Linux 和 Unix 系统都默认提供时间服务。互联网上也有一个公共时间服务器,如 time.nist.gov。防火墙或者您的 ISP 可能会干扰数据报传输,这是因人而异的。

例 3-10 DatagramChannel 时间服务器

package org.example;import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.DatagramChannel;
import java.util.Date;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;/*** Request time service, per RFC 868. RFC 868* (http://www.ietf.org/rfc/rfc0868.txt) is a very simple time protocol* whereby one system can request the current time from another system.* Most Linux, BSD and Solaris systems provide RFC 868 time service* on port 37. This simple program will inter-operate with those.* <p>* The National Institute of Standards and Technology (NIST) operates* a public time server at time.nist.gov.* <p>* The RFC 868 protocol specifies a 32 bit unsigned value be sent,* representing the number of seconds since Jan 1, 1900. The Java* epoch begins on Jan 1, 1970 (same as unix) so an adjustment is* made by adding or subtracting 2,208,988,800 as appropriate. To* avoid shifting and masking, a four-byte slice of an* eight-byte buffer is used to send/recieve. But getLong( )* is done on the full eight bytes to get a long value.* <p>* When run, this program will issue time requests to each hostname* given on the command line, then enter a loop to receive packets.* Note that some requests or replies may be lost, which means* this code could block forever.* <p>* * @author Ron Hitchens (ron@ronsoft.com)*/
public class TimeClient {private static final int DEFAULT_TIME_PORT = 37;private static final long DIFF_1900 = 2208988800L;protected int port = DEFAULT_TIME_PORT;protected List remoteHosts;protected DatagramChannel channel;public TimeClient(String[] argv) throws Exception {if (argv.length == 0) {throw new Exception("Usage: [ -p port ] host ...");}parseArgs(argv);this.channel = DatagramChannel.open();}protected InetSocketAddress receivePacket(DatagramChannel channel, ByteBuffer buffer) throws Exception {buffer.clear();// Receive an unsigned 32-bit, big-endian valuereturn ((InetSocketAddress) channel.receive(buffer));}// Send time requests to all the supplied hostsprotected void sendRequests() throws Exception {ByteBuffer buffer = ByteBuffer.allocate(1);Iterator it = remoteHosts.iterator();while (it.hasNext()) {InetSocketAddress sa = (InetSocketAddress) it.next();System.out.println("Requesting time from " + sa.getHostName() + ":" + sa.getPort());// Make it empty (see RFC868)buffer.clear().flip();// Fire and forgetchannel.send(buffer, sa);}}// Receive any replies that arrivepublic void getReplies() throws Exception {// Allocate a buffer to hold a long valueByteBuffer longBuffer = ByteBuffer.allocate(8);// Assure big-endian (network) byte orderlongBuffer.order(ByteOrder.BIG_ENDIAN);// Zero the whole buffer to be surelongBuffer.putLong(0, 0);// Position to first byte of the low-order 32 bitslongBuffer.position(4);// Slice the buffer; gives view of the low-order 32 bitsByteBuffer buffer = longBuffer.slice();int expect = remoteHosts.size();int replies = 0;System.out.println("");System.out.println("Waiting for replies...");while (true) {InetSocketAddress sa;sa =receivePacket(channel, buffer);buffer.flip();replies++;printTime(longBuffer.getLong(0), sa);if (replies == expect) {System.out.println("All packets answered");break;}// Some replies haven't shown up yetSystem.out.println("Received " + replies + " of " + expect + " replies");}}// Print info about a received time replyprotected void printTime(long remote1900, InetSocketAddress sa) {// local time as seconds since Jan 1, 1970long local = System.currentTimeMillis() / 1000;// remote time as seconds since Jan 1, 1970long remote = remote1900 - DIFF_1900;Date remoteDate = new Date(remote * 1000);Date localDate = new Date(local * 1000);long skew = remote - local;System.out.println("Reply from "+ sa.getHostName() + ":" + sa.getPort());System.out.println(" there: " + remoteDate);System.out.println(" here: " + localDate);System.out.print(" skew: ");if (skew == 0) {System.out.println("none");} else if (skew > 0) {System.out.println(skew + " seconds ahead");} else {System.out.println((-skew) + " seconds behind");}}protected void parseArgs(String[] argv) {remoteHosts = new LinkedList();for (int i = 0; i < argv.length; i++) {String arg = argv[i];// Send client requests to the given portif (arg.equals("-p")) {i++;this.port = Integer.parseInt(argv[i]);continue;}// Create an address object for the hostnameInetSocketAddress sa = new InetSocketAddress(arg, port);// Validate that it has an addressif (sa.getAddress() == null) {System.out.println("Cannot resolve address: " + arg);continue;}remoteHosts.add(sa);}}// -------------------------------------------------------------public static void main(String[] argv)throws Exception {TimeClient client = new TimeClient(argv);client.sendRequests();client.getReplies();}
}

6.管道

java.nio.channels 包中含有一个名为 Pipe(管道)的类。广义上讲,管道就是一个用来 在两个实体之间单向传输数据的导管。管道的概念对于 Unix(和类 Unix)操作系统的用户来说早 就很熟悉了。Unix 系统中,管道被用来连接一个进程的输出和另一个进程的输入。Pipe 类实现一 个管道范例,不过它所创建的管道是进程内(在 Java 虚拟机进程内部)而非进程间使用的。参见 图 3-10。

Pipe 类创建一对提供环回机制的 Channel 对象。这两个通道的远端是连接起来的,以便任何写 在 SinkChannel 对象上的数据都能出现在 SourceChannel 对象上。图 3-11 显示了 Pipe 的类层级。

package java.nio.channels;import java.io.IOException;
import java.nio.channels.GatheringByteChannel;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.ScatteringByteChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.channels.spi.AbstractSelectableChannel;public abstract class Pipe {public static abstract class SourceChannel extends AbstractSelectableChannel implements ReadableByteChannel, ScatteringByteChannelpublic static abstract class SinkChannel extends AbstractSelectableChannel implements WritableByteChannel, GatheringByteChannelpublic abstract SourceChannel source();public abstract SinkChannel sink();public static Pipe open() throws IOException
}

Pipe 实例是通过调用不带参数的 Pipe.open( )工厂方法来创建的。Pipe 类定义了两个嵌套的通 道类来实现管路。这两个类是 Pipe.SourceChannel(管道负责读的一端)和 Pipe.SinkChannel(管道 负责写的一端)。这两个通道实例是在 Pipe 对象创建的同时被创建的,可以通过在 Pipe 对象上分 别调用 source( )和 sink( )方法来取回。

此时,您可能在想管道到底有什么作用。您不能使用 Pipe 在操作系统级的进程间建立一个类 Unix 管道(您可以使用 SocketChannel 来建立)。Pipe 的 source 通道和 sink 通道提供类似 java.io.PipedInputStream 和 java.io.PipedOutputStream 所提供的功能,不过它们可以执行全部的通道 语义。请注意,SinkChannel 和 SourceChannel 都由 AbstractSelectableChannel 引申而来(所以也是 从 SelectableChannel 引申而来),这意味着 pipe 通道可以同选择器一起使用(参见第四章)。

管道可以被用来仅在同一个 Java 虚拟机内部传输数据。虽然有更加有效率的方式来在线程之 间传输数据,但是使用管道的好处在于封装性。生产者线程和用户线程都能被写道通用的 Channel API 中。根据给定的通道类型,相同的代码可以被用来写数据到一个文件、socket 或管道。选择器 可以被用来检查管道上的数据可用性,如同在 socket 通道上使用那样地简单。这样就可以允许单个 用户线程使用一个 Selector 来从多个通道有效地收集数据,并可任意结合网络连接或本地工作线程 使用。因此,这些对于可伸缩性、冗余度以及可复用性来说无疑都是意义重大的。

Pipes 的另一个有用之处是可以用来辅助测试。一个单元测试框架可以将某个待测试的类连接 到管道的“写”端并检查管道的“读”端出来的数据。它也可以将被测试的类置于通道的“读”端 并将受控的测试数据写进其中。两种场景对于回归测试都是很有帮助的。

管路所能承载的数据量是依赖实现的(implementation-dependent)。唯一可保证的是写到 SinkChannel 中的字节都能按照同样的顺序在 SourceChannel 上重现。例 3-11 诠释了如何使用管 道。

例 3-11 工作线程对一个管道进行写操作

package org.example;import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.Pipe;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.util.Random;/*** Test Pipe objects using a worker thread.* <p>* Created April, 2002** @author Ron Hitchens (ron@ronsoft.com)*/
public class PipeTest {public static void main(String[] argv)throws Exception {// Wrap a channel around stdoutWritableByteChannel out = Channels.newChannel(System.out);// Start worker and get read end of channelReadableByteChannel workerChannel = startWorker(10);ByteBuffer buffer = ByteBuffer.allocate(100);while (workerChannel.read(buffer) >= 0) {buffer.flip();out.write(buffer);buffer.clear();}}// This method could return a SocketChannel or// FileChannel instance just as easilyprivate static ReadableByteChannel startWorker(int reps)throws Exception {Pipe pipe = Pipe.open();Worker worker = new Worker(pipe.sink(), reps);worker.start();return (pipe.source());}// ----------------------------------------------------------------/*** A worker thread object which writes data down a channel.* Note:this object knows  nothing about Pipe, uses only a* generic WritableByteChannel.*/private static class Worker extends Thread {WritableByteChannel channel;private int reps;Worker(WritableByteChannel channel, int reps) {this.channel = channel;this.reps = reps;}// Thread execution begins herepublic void run() {ByteBuffer buffer = ByteBuffer.allocate(100);try {for (int i = 0; i < this.reps; i++) {doSomeWork(buffer);// channel may not take it all at oncewhile (channel.write(buffer) > 0) {// empty}}this.channel.close();} catch (Exception e) {// easy way out; this is demo codee.printStackTrace();}}private String[] products = {"No good deed goes unpunished","To be, or what?","No matter where you go, there you are","Just say \"Yo\"","My karma ran over my dogma"};private Random rand = new Random();private void doSomeWork(ByteBuffer buffer) {int product = rand.nextInt(products.length);buffer.clear();buffer.put(products[product].getBytes());buffer.put("\r\n".getBytes());buffer.flip();}}
}

7.通道工具类

回忆一下,常规的流仅传输字节,readers 和 writers 则作用于字符数据。表 3-2 的前四行描述 了用于连接流、通道的方法。因为流和通道都是运行在字节流基础上的,所以这四个方法直接将流 封装在通道上,反之亦然。

Readers 和 Writers 运行在字符的基础上,在 Java 的世界里字符同字节是完全不同的。将一个 通道(仅了解字节)连接到一个 reader 或 writer 需要一个中间对话来处理字节/字符 (byte/char)阻抗失配。为此,表 3-2 的后半部分描述的工厂方法使用了字符集编码器和解码 器。字符集以及字符集转码将在第六章中详细讨论。

这些方法返回的包封 Channel 对象可能会也可能不会实现 InterruptibleChannel 接口,它们也可 能不是从 SelectableChannel 引申而来。因此,可能无法将这些包封通道同 java.nio.channels 包中定义的其他通道类型交换使用。细节是依赖实现的。如果您的程序依赖这些语义,那么请使用 操作器实例测试一下返回的通道对象。

总结

本章中我们讨论了通道的很多方面的内容。通道组成了基础设施或者说管道设施,该设施在操 作系统(或通道连接到的任意东西)的 ByteBuffers 和 I/O 服务之间传输数据。本章中讨论到的关键 概念有:

1.基本的通道操作

在 3.1 节中我们学习了通道的基本操作,具体包括:怎样使用所有通道都通用的 API 方法调用 来打开一个通道以及完成操作时如何关闭通道。

2.Scatter/Gather 通道

在 3.2 节中我们介绍了如何使用通道来 scatter/gather I/O。矢量化的 I/O 使您可以在多个缓冲区 上自动执行一个 I/O 操作。

3.文件通道

在 3.3 节中我们讨论了多层面的 FileChannel 类。这个强大的新通道提供了对高级文件操作的 访问,以前这是不对 Java 编程开放的。新的功能特性包括:文件锁定、内存映射文件以及 channel-to-channel 传输。

4.Socket 通道

在 3.5 节中我们覆盖了几种类型的 socket 通道。同时,我们也讨论了 socket 通道所支持的一个 重要新特性——非阻塞模式。

5.管道

在 3.6 节中我们看了一下 Pipe 类,这是一个使用专门的通道实现的新循环机制,非常有用。

6.通道工具类

通道类中包含了工具方法,这些方法用于交叉连接通道和常规的字节流以及字符读写器对象。 参见 3.7 节。

摘自JAVA NIO(中文版)

3.JAVA NIO通道相关推荐

  1. 2 Java NIO Channel-翻译

    Java NIO通道跟流类似,但有以下不同之处. 通道可以同时读写,但流只是单向的. 通道可以异步地进行读写. 通道都是从缓存中进行读或写操作的. 正如以上所提到的,你可以将数据从Channel写入到 ...

  2. Java NIO Channel

    Java NIO 通道(Channel) Java NIO的Channel和流(Streams)很相似,但是也有一些区别: Channel是双向的,你可以从Channel中读取,也可以向Channel ...

  3. 【java】关于Java NIO的一切

    1.概述 概述:关于Java NIO的『一切』 建议看原文 太长了 本文译自Jakob Jenkov的Java NIO.注意,并非逐字翻译,删除了原文中碎碎念的部分,有些地方也加入了自己的理解. Ja ...

  4. 详述 Java NIO 以及 Socket 处理粘包和断包方法

    文章目录 Java NIO 通道 缓冲区 代码示例 第一部分 第二部分 选择器 Socket 处理粘包 & 断包问题 第一个问题:对于粘包问题的解决 第二个问题:对于断包问题的解决 示例代码 ...

  5. 海纳百川而来的一篇相当全面的Java NIO教程

    目录 零.NIO包 一.Java NIO Channel通道 Channel的实现(Channel Implementations) Channel的基础示例(Basic Channel Exampl ...

  6. java nio channel原理_Java NIO 选择器(Selector)与通道(Channel) 原理 | 学步园

    NIO底层实现poll, epoll(jdk1.5update 9  和jdk1.6  仅限于 linux 2.6以上 ) Java NIO 选择器(Selector) 知识预备 (linux epo ...

  7. Java NIO之Channel(通道)

    **Java高级特性增强-NIO 本部分网络上有大量的资源可以参考,在这里做了部分整理并做了部分勘误,感谢前辈的付出,每节文章末尾有引用列表~ 写在所有文字的前面:作者在此特别推荐Google排名第一 ...

  8. Java NIO之套接字通道

    1.简介 前面一篇文章讲了文件通道,本文继续来说说另一种类型的通道 – 套接字通道.在展开说明之前,咱们先来聊聊套接字的由来.套接字即 socket,最早由伯克利大学的研究人员开发,所以经常被称为Be ...

  9. JAVA NIO之文件通道

    1.简介 通道是 Java NIO 的核心内容之一,在使用上,通道需和缓存类(ByteBuffer)配合完成读写等操作.与传统的流式 IO 中数据单向流动不同,通道中的数据可以双向流动.通道既可以读, ...

最新文章

  1. hdu 1879 继续畅通工程
  2. go语言实现图片合成
  3. use stacks能够把很多相似的文件叠加在macos的桌面上
  4. python之装饰器详解
  5. SharePoint 2013的100个新功能之搜索(一)
  6. 快收下这份“数据库运作实践”秘制口诀
  7. [2018.07.21 T2] 离家出走
  8. centos os u盘启动盘_怎么制作linux系统安装盘,U盘启动盘
  9. matlab 图像噪声去除,基于Matlab的图像去除噪声的研究
  10. AWVS12 防止反复注册
  11. 下载yaml appium
  12. SAS数据步与过程步,数据步语句
  13. mysql4.0做主从时主库的备份脚本
  14. AKSHARE 上获取股票数据用于盘后分析以及自己的交易模型的测试。
  15. PHP 将xml文件解析为数组
  16. java求30度的正弦值_获取Java中给定值的双曲正弦值
  17. 微软发布补丁封杀允许Surface RT安装Linux的“漏洞”
  18. 使用Android Studio创建简易版“答案之书”~~~
  19. 使用 chatgpt 给英文视频添加字幕
  20. C++ 内连接与外连接

热门文章

  1. 各种花茶的种类及其功效及副作用
  2. 台式电脑显示屏显示html,台式机电脑屏幕突然出现彩色条纹原因及解决方法
  3. 物业养老,智慧养老新方式
  4. yolov3网络(DarkNet53)结构详解以及Pytorch代码实现
  5. PHP实现水仙花算法
  6. 无验证码不扫码拼多多微信批量登陆思路
  7. 量子计算机科学家,再破纪录!中国科学家实现18个量子比特纠缠 为量子计算机奠基...
  8. 《TCP/IP详解》卷一:The-internet-protocol-IP
  9. 做一次完美的数据迁移
  10. 竞价推广方案怎么写,这些点你get到了吗?