在之前的文章Okio—— 更加高效易用的IO库中参考官方的demo简单学习了一下Okio的使用方法,这篇就来简要分析学习一下Okio的源码实现。


  • SourceSink
  • Segment
  • Buffer
  • ByteString
  • Timeout


Source & Sink

这两个是Okio中最基本的两个接口,分别对应java的InputStreamOutputStream即输入流和输出流,Source 是输入流,Sink是输出流:

actual interface Source {actual fun read(sink: Buffer, byteCount: Long): Longactual fun timeout(): Timeoutactual fun close()
actual interface Sink {actual fun write(source: Buffer, byteCount: Long)actual fun flush()actual fun timeout(): Timeoutactual fun close()


Okio.source(File file);
Okio.source(InputStream in);
Okio.source(Socket socket);
Okio.source(Path path, OpenOption... options);


Okio.sink(File file);
Okio.appendingSink(File file);//内容可追加
Okio.sink(OutputStream out);
Okio.sink(Socket socket);
Okio.sink(Path path, OpenOption... options);


fun File.source(): Source = inputStream().source()
fun InputStream.source(): Source = InputStreamSource(this, Timeout())
fun Socket.source(): Source {val timeout = SocketAsyncTimeout(this)val source = InputStreamSource(getInputStream(), timeout)return timeout.source(source)


private class InputStreamSource(private val input: InputStream,private val timeout: Timeout
) : Source {override fun read(sink: Buffer, byteCount: Long): Long {if (byteCount == 0L) return 0require(byteCount >= 0) { "byteCount < 0: $byteCount" }try {timeout.throwIfReached()val tail = sink.writableSegment(1)val maxToCopy = minOf(byteCount, Segment.SIZE - tail.limit).toInt()//从输入流读到sink buffer的尾节点segment对象中val bytesRead = input.read(tail.data, tail.limit, maxToCopy)if (bytesRead == -1) return -1tail.limit += bytesReadsink.size += bytesReadreturn bytesRead.toLong()} catch (e: AssertionError) {if (e.isAndroidGetsocknameError) throw IOException(e)throw e}}override fun close() = input.close()override fun timeout() = timeoutoverride fun toString() = "source($input)"



fun Source.buffer(): BufferedSource = RealBufferedSource(this)
fun Sink.buffer(): BufferedSink = RealBufferedSink(this)





  override fun read(sink: ByteArray, offset: Int, byteCount: Int): Int {checkOffsetAndCount(sink.size.toLong(), offset.toLong(), byteCount.toLong())//如果buffer的数据为空,则先调用被装饰的对象(也就是InputStreamSource)将数据读到buffer当中if (buffer.size == 0L) {val read = source.read(buffer, Segment.SIZE.toLong())if (read == -1L) return -1}val toRead = minOf(byteCount, buffer.size).toInt()return buffer.read(sink, offset, toRead)}
  override fun write(source: ByteArray): BufferedSink {check(!closed) { "closed" }buffer.write(source)return emitCompleteSegments()}

此外,Sink和Source它门还各自有一个支持gzip压缩的实现类GzipSinkGzipSource;一个具有委托功能的抽象类ForwardingSinkForwardingSource;还有一个实现类便是InflaterSource和DeflaterSink,这两个类主要用于压缩,为GzipSink和GzipSource服务,这里就不详细看了。Sink和Source还有其他的类,如HashingSink, HashingSource, 也是装饰者,这里就不一一列举了,其实Okio的Source和Sink装饰者家族类似于java的InputStream和OutStream家族。



在Buffer中的每一个Segment都是双向循环链表中的一个节点,该节点分别拥有指向前驱节点的Segment对象引用以及指向后驱节点的Segment对象引用。而在Segment池中的Segment则是一个单向链表的节点,Segment池持有对下一个Segment节点对象的引用。如果Segment中的字节数据是在buffer和byte string间共享的,那么该Segment对象是不可以被回收的,也是不能修改其中的数据的,除非是它的持有者。


/*** A segment of a buffer.** <p>Each segment in a buffer is a circularly-linked list node referencing the following and* preceding segments in the buffer.** <p>Each segment in the pool is a singly-linked list node referencing the rest of segments in the* pool.** <p>The underlying byte arrays of segments may be shared between buffers and byte strings. When a* segment's byte array is shared the segment may not be recycled, nor may its byte data be changed.* The lone exception is that the owner segment is allowed to append to the segment, writing data at* {@code limit} and beyond. There is a single owning segment for each byte array. Positions,* limits, prev, and next references are not shared.*/
final class Segment {/** The size of all segments in bytes. */static final int SIZE = 8192;/** Segments will be shared when doing so avoids {@code arraycopy()} of this many bytes. */static final int SHARE_MINIMUM = 1024;final byte[] data;/** The next byte of application data byte to read in this segment. */int pos;/** The first byte of available data ready to be written to. */int limit;/** True if other segments or byte strings use the same byte array. */boolean shared;/** True if this segment owns the byte array and can append to it, extending {@code limit}. */boolean owner;/** Next segment in a linked or circularly-linked list. */Segment next;/** Previous segment in a circularly-linked list. */Segment prev;Segment() {this.data = new byte[SIZE];this.owner = true;this.shared = false;}Segment(Segment shareFrom) {this(shareFrom.data, shareFrom.pos, shareFrom.limit);shareFrom.shared = true;}Segment(byte[] data, int pos, int limit) {this.data = data;this.pos = pos;this.limit = limit;this.owner = false;this.shared = true;}/*** Removes this segment of a circularly-linked list and returns its successor.* Returns null if the list is now empty.*/public @Nullable Segment pop() {Segment result = next != this ? next : null;prev.next = next;next.prev = prev;next = null;prev = null;return result;}/*** Appends {@code segment} after this segment in the circularly-linked list.* Returns the pushed segment.*/public Segment push(Segment segment) {segment.prev = this;segment.next = next;next.prev = segment;next = segment;return segment;}/*** Splits this head of a circularly-linked list into two segments. The first* segment contains the data in {@code [pos..pos+byteCount)}. The second* segment contains the data in {@code [pos+byteCount..limit)}. This can be* useful when moving partial segments from one buffer to another.** <p>Returns the new head of the circularly-linked list.*/public Segment split(int byteCount) {if (byteCount <= 0 || byteCount > limit - pos) throw new IllegalArgumentException();Segment prefix;// We have two competing performance goals://  - Avoid copying data. We accomplish this by sharing segments.//  - Avoid short shared segments. These are bad for performance because they are readonly and//    may lead to long chains of short segments.// To balance these goals we only share segments when the copy will be large.if (byteCount >= SHARE_MINIMUM) {prefix = new Segment(this);} else {prefix = SegmentPool.take();System.arraycopy(data, pos, prefix.data, 0, byteCount);}prefix.limit = prefix.pos + byteCount;pos += byteCount;prev.push(prefix);return prefix;}/*** Call this when the tail and its predecessor may both be less than half* full. This will copy data so that segments can be recycled.*/public void compact() {if (prev == this) throw new IllegalStateException();if (!prev.owner) return; // Cannot compact: prev isn't writable.int byteCount = limit - pos;int availableByteCount = SIZE - prev.limit + (prev.shared ? 0 : prev.pos);if (byteCount > availableByteCount) return; // Cannot compact: not enough writable space.writeTo(prev, byteCount);pop();SegmentPool.recycle(this);}/** Moves {@code byteCount} bytes from this segment to {@code sink}. */public void writeTo(Segment sink, int byteCount) {if (!sink.owner) throw new IllegalArgumentException();if (sink.limit + byteCount > SIZE) {// We can't fit byteCount bytes at the sink's current position. Shift sink first.if (sink.shared) throw new IllegalArgumentException();if (sink.limit + byteCount - sink.pos > SIZE) throw new IllegalArgumentException();System.arraycopy(sink.data, sink.pos, sink.data, 0, sink.limit - sink.pos);sink.limit -= sink.pos;sink.pos = 0;}System.arraycopy(data, pos, sink.data, sink.limit, byteCount);sink.limit += byteCount;pos += byteCount;}

首先,Segment中有几个成员变量:Segment.SIZE这个值是8192,也就是8kb, 是一个Segment对象能处理的数据的大小,byte[] data这个就是真正的存储数据的字节数组,pos这个是读取数据的起始位置,limit是写数据的起始位置,shared表示当前Segment的字节数组data是否可以共享的,owner表示当前Segment是否是data对象的持有者(只有data对象的持有者才能对data进行修改), 只有share为false即表示owner为true是当前的持有者。这里有个概念就是share “共享”,Segment中的data数组是可以在Buffer和ByteString对象之间共享的,怎么来确认这个共享呢,我们看到Segment对象有三个构造函数,其中有参的构造函数:

  Segment(Segment shareFrom) {this(shareFrom.data, shareFrom.pos, shareFrom.limit);shareFrom.shared = true;}Segment(byte[] data, int pos, int limit) {this.data = data;this.pos = pos;this.limit = limit;this.owner = false;this.shared = true;}


  Segment() {this.data = new byte[SIZE];this.owner = true;this.shared = false;}




/*** A collection of unused segments, necessary to avoid GC churn and zero-fill.* This pool is a thread-safe static singleton.*/
final class SegmentPool {/** The maximum number of bytes to pool. */// TODO: Is 64 KiB a good maximum size? Do we ever have that many idle segments?static final long MAX_SIZE = 64 * 1024; // 64 KiB./** Singly-linked list of segments. */static @Nullable Segment next;/** Total bytes in this pool. */static long byteCount;private SegmentPool() {}static Segment take() {synchronized (SegmentPool.class) {if (next != null) {Segment result = next;next = result.next;result.next = null;byteCount -= Segment.SIZE;return result;}}return new Segment(); // Pool is empty. Don't zero-fill while holding a lock.}static void recycle(Segment segment) {if (segment.next != null || segment.prev != null) throw new IllegalArgumentException();if (segment.shared) return; // This segment cannot be recycled.synchronized (SegmentPool.class) {if (byteCount + Segment.SIZE > MAX_SIZE) return; // Pool is full.byteCount += Segment.SIZE;segment.next = next;segment.pos = segment.limit = 0;next = segment;}}

SegmentPool可以理解为一个缓存Segment的池,它只有两个方法,一个take(),一个recycle(),在SegmentPool中维护的是一个Segment 的单链表,并且它的最大值为MAX_SIZE = 64 * 1024也就是64kb8个Segment的长度,next就是单链表中的头结点。

take()方法的作用是取出单链表的头结点Segment对象,然后将取出的对象与链表断开并将链表往后移动一个单位,如果是第一次调用take, next为null, 则会直接new一个Segment对象返回,并且这里创建的Segment是不共享的。

recycle()方法的作用则是回收一个Segment对象,被回收的Segment对象将会被插入到SegmentPool中的单链表的头部,以便后面继续复用,并且这里源码我们也可以看到如果是shared的对象是不处理的,如果是第一次调用recycle()方法则链表会由空变为拥有一个节点的链表, 每次回收就会插入一个到表头,直到超过最大容量。





    @Overridepublic Buffer write(byte[] source, int offset, int byteCount) {if (source == null) throw new IllegalArgumentException("source == null");// 检测参数的合法性checkOffsetAndCount(source.length, offset, byteCount);// 计算 source 要写入的最后一个字节的 index 值int limit = offset + byteCount;while (offset < limit) {// 获取循环链表尾部的一个 SegmentSegment tail = writableSegment(1);// 计算最多可写入的字节int toCopy = Math.min(limit - offset, Segment.SIZE - tail.limit);// 把 source 复制到 data 中System.arraycopy(source, offset, tail.data, tail.limit, toCopy);// 调整写入的起始位置offset += toCopy;// 调整尾部Segment 的 limit 位置tail.limit += toCopy;}// 调整 Buffer 的 size 大小size += byteCount;return this;}


  /*** Returns a tail segment that we can write at least {@code minimumCapacity}* bytes to, creating it if necessary.*/Segment writableSegment(int minimumCapacity) {if (minimumCapacity < 1 || minimumCapacity > Segment.SIZE) throw new IllegalArgumentException();// 如果链表的头指针为null,就会SegmentPool中取出一个if (head == null) {head = SegmentPool.take(); // Acquire a first segment.return head.next = head.prev = head;}// 获取前驱结点,也就是尾部结点Segment tail = head.prev;// 如果能写的字节数限制超过了8192,或者不是拥有者if (tail.limit + minimumCapacity > Segment.SIZE || !tail.owner) {// 从SegmentPool中获取一个Segment,插入到循环双链表当前结点的后面tail = tail.push(SegmentPool.take()); // Append a new empty segment to fill up.}return tail;}

这里有个head对象,就是Segment链表的头结点的引用,这个方法中可以看到如果写的时候头结点head为空,则会调用 SegmentPool.take() 方法从Segment池中获取一个 Segment缓存对象,并以此形成一个双向链表的初始节点:

    if (head == null) {head = SegmentPool.take(); // Acquire a first segment.return head.next = head.prev = head;}


这时头结点和尾节点其实是同一个节点,然后取得head.prev也就是tail尾节点返回,但是如果此时tail能写的字节数限制超过了8k或者尾节点不是data的拥有者,就会调用tail.push(SegmentPool.take());也就是再调用一次SegmentPool.take()取到Segment池中下一个Segment. 通过tail. push() 方法插入到循环链表的尾部。这时Segment中的链表会变成下面这样:



  @Override public int read(byte[] sink, int offset, int byteCount) {checkOffsetAndCount(sink.length, offset, byteCount);//取到Segment循环链表的表头Segment s = head;if (s == null) return -1;// 计算最多可写入的字节int toCopy = Math.min(byteCount, s.limit - s.pos);//将数据拷贝到链头的data字节数组当中System.arraycopy(s.data, s.pos, sink, offset, toCopy);//调整链头的data数组的起始postion和Buffer的sizes.pos += toCopy;size -= toCopy;//pos等于limit的时候,从循环链表中移除该Segment并从SegmentPool中回收复用if (s.pos == s.limit) {head = s.pop();//移除的同时返回下一个Segment作为表头SegmentPool.recycle(s);}return toCopy;}

读操作内部也是调用System.arraycopy进行字节数组的复制,这里是直接对head头结点进行读取,也就是说Buffer在每次读数据的时候都是从链表的头部进行读取的,如果读取的头结点的pos等于limit, 这里就会调用s.pop()将头节点从链表中删除,并返回下一个节点作为新的头结点引用,然后将删除的节点通过SegmentPool.recycle(s)进行回收复用。这时链表中的变化如下:


Buffer除了读写基础数据以外,还有一个比较重要的功能就是Buffer之间的数据交换, 还记得在官方对Buffer的介绍中写到的:


这里说在Buffer缓冲区之间移动数据的时候,是重新分配片段也就是Segment的持有关系,而不是跨片段的复制数据,那么它说的这个比较牛逼的过程是如何实现的呢, 来看一下实现的方法:

public void write(Buffer source, long byteCount) {// Move bytes from the head of the source buffer to the tail of this buffer// while balancing two conflicting goals: don't waste CPU and don't waste// memory.////// Don't waste CPU (ie. don't copy data around).//// Copying large amounts of data is expensive. Instead, we prefer to// reassign entire segments from one buffer to the other.////// Don't waste memory.//// As an invariant, adjacent pairs of segments in a buffer should be at// least 50% full, except for the head segment and the tail segment.//// The head segment cannot maintain the invariant because the application is// consuming bytes from this segment, decreasing its level.//// The tail segment cannot maintain the invariant because the application is// producing bytes, which may require new nearly-empty tail segments to be// appended.////// Moving segments between buffers//// When writing one buffer to another, we prefer to reassign entire segments// over copying bytes into their most compact form. Suppose we have a buffer// with these segment levels [91%, 61%]. If we append a buffer with a// single [72%] segment, that yields [91%, 61%, 72%]. No bytes are copied.//// Or suppose we have a buffer with these segment levels: [100%, 2%], and we// want to append it to a buffer with these segment levels [99%, 3%]. This// operation will yield the following segments: [100%, 2%, 99%, 3%]. That// is, we do not spend time copying bytes around to achieve more efficient// memory use like [100%, 100%, 4%].//// When combining buffers, we will compact adjacent buffers when their// combined level doesn't exceed 100%. For example, when we start with// [100%, 40%] and append [30%, 80%], the result is [100%, 70%, 80%].////// Splitting segments//// Occasionally we write only part of a source buffer to a sink buffer. For// example, given a sink [51%, 91%], we may want to write the first 30% of// a source [92%, 82%] to it. To simplify, we first transform the source to// an equivalent buffer [30%, 62%, 82%] and then move the head segment,// yielding sink [51%, 91%, 30%] and source [62%, 82%].if (source == null) throw new IllegalArgumentException("source == null");if (source == this) throw new IllegalArgumentException("source == this");checkOffsetAndCount(source.size, 0, byteCount);while (byteCount > 0) {// Is a prefix of the source's head segment all that we need to move?// 如果 Source Buffer 的头结点可用字节数大于要写出的字节数if (byteCount < (source.head.limit - source.head.pos)) {//取到当前buffer的尾节点Segment tail = head != null ? head.prev : null;// 如果尾部结点有足够空间可以写数据,并且这个结点是底层数组的拥有者if (tail != null && tail.owner&& (byteCount + tail.limit - (tail.shared ? 0 : tail.pos) <= Segment.SIZE)) {// Our existing segments are sufficient. Move bytes from source's head to our tail.//source头结点的数据写入到当前尾节点中,然后就直接结束返回了source.head.writeTo(tail, (int) byteCount);source.size -= byteCount;size += byteCount;return;} else {// We're going to need another segment. Split the source's head// segment in two, then move the first of those two to this buffer.//如果尾节点空间不足或者不是持有者,这时就需要把 Source Buffer 的头结点分割为两个 Segment,//然后将source的头指针更新为分割后的第一个Segment, 如[92%, 82%]变成[30%, 62%, 82%]这样source.head = source.head.split((int) byteCount);}}// Remove the source's head segment and append it to our tail.//从 Source Buffer 的链表中移除头结点, 并加入到当前Buffer的链尾Segment segmentToMove = source.head;long movedByteCount = segmentToMove.limit - segmentToMove.pos;//移除操作,并移动更新source中的headsource.head = segmentToMove.pop();// 如果当前buffer的头结点为 null,则头结点直接指向source的头结点,初始化双向链表if (head == null) {head = segmentToMove;head.next = head.prev = head;} else {//否则就把Source Buffer的 head 加入到当前Buffer的链尾Segment tail = head.prev;tail = tail.push(segmentToMove);//压入链尾,并更新尾节点tail.compact();//尾节点尝试合并,如果合并成功,则尾节点会被SegmentPool回收掉}source.size -= movedByteCount;size += movedByteCount;byteCount -= movedByteCount;}}

主要就是在这个write(Buffer source, long byteCount)方法中实现的,这个方法前面有大段的英文注释,我从源码中直接复制过来的,我们可以翻译过来理解一下说的是啥:

将字节数据从source buffer的头节点复制到当前buffer的尾节点中,这里主要需要平衡两个相互冲突的目标:CPU内存




Segment作为一个不可变量,缓冲区中除了头节点和尾节点的片段以外,相邻的片段,至少应该保证50%以上的数据负载量(指的是Segment中的data数据, Okio认为data数据量在50%以上才算是被有效利用的)。由于头结点中需要读取消耗字节数据,而尾节点中需要写入产生字节数据,因此头结点和尾节点是不能保持不变性的。






有时我们只想将source buffer中的一部分写入到sink buffer当中,例如,给定一个sink为 [51%,91%],现在我们想要将一个source[92%,82%]的前30%写入到这个sink buffer当中。为了简化,我们首先将source buffer转换为等效缓冲区[30%,62%,82%](即拆分Segment),然后移动source的头结点Segment即可,最终生成sink[51%,91%,30%]和source[62%,82%]








现在要从第二个Buffer中取前30%的数据写入到第一个Buffer当中,那么首先会将第二个Buffer的头结点Segment进行分割,分割为两个负载为30%62%Segment, 接下来移动这个新的30%Segment节点到第一个Buffer的链表的尾部:




  final byte[] data;transient String utf8; // Lazily computed.

分别存储字节数据和utf-8形式的字符串数据,它有很多方法类似于java的String 如substring()startsWith()endsWith()indexOf()等,它拥有一个传递字节数组的构造函数:

  ByteString(byte[] data) {this.data = data; // Trusted internal constructor doesn't clone data.}


  /** Constructs a new {@code String} by decoding the bytes as {@code UTF-8}. */public String utf8() {String result = utf8;// We don't care if we double-allocate in racy code.return result != null ? result : (utf8 = new String(data, Util.UTF_8));}public String base64() {return Base64.encode(data);}/** Returns the 128-bit MD5 hash of this byte string. */public ByteString md5() {return digest("MD5");}/** Returns the 160-bit SHA-1 hash of this byte string. */public ByteString sha1() {return digest("SHA-1");}/** Returns the 256-bit SHA-256 hash of this byte string. */public ByteString sha256() {return digest("SHA-256");}/** Returns the 512-bit SHA-512 hash of this byte string. */public ByteString sha512() {return digest("SHA-512");}/** Returns this byte string encoded in hexadecimal. */public String hex() {char[] result = new char[data.length * 2];int c = 0;for (byte b : data) {result[c++] = HEX_DIGITS[(b >> 4) & 0xf];result[c++] = HEX_DIGITS[b & 0xf];}return new String(result);}





fun InputStream.source(): Source = InputStreamSource(this, Timeout())


override fun read(sink: Buffer, byteCount: Long): Long {if (byteCount == 0L) return 0require(byteCount >= 0) { "byteCount < 0: $byteCount" }try {timeout.throwIfReached()val tail = sink.writableSegment(1)val maxToCopy = minOf(byteCount, Segment.SIZE - tail.limit).toInt()val bytesRead = input.read(tail.data, tail.limit, maxToCopy)if (bytesRead == -1) return -1tail.limit += bytesReadsink.size += bytesReadreturn bytesRead.toLong()} catch (e: AssertionError) {if (e.isAndroidGetsocknameError) throw IOException(e)throw e}}


  public void throwIfReached() throws IOException {if (Thread.interrupted()) {throw new InterruptedIOException("thread interrupted");}if (hasDeadline && deadlineNanoTime - System.nanoTime() <= 0) {throw new InterruptedIOException("deadline reached");}}

这里在两种情况下都会抛出异常,一个是当前线程被中断,另一个是满足了设置的超时时间条件。这里的hasDeadline 以及 deadlineNanoTime都是Timeout类的成员,它总共有三个成员变量:

  private boolean hasDeadline;private long deadlineNanoTime;private long timeoutNanos;

其中timeoutNanos的含义是超时的时间,如10s, deadlineNanoTime的含义是截止时间,这个是一个确定的未来时间点,这两个单位都是微秒,当设置deadlineNanoTime的时候,hasDeadline的值会为true。在Okio的source()默认实现中直接new了一个空的Timeout对象,这三个都是默认值,因此默认的读写文件和stream流是不会超时的除非线程被中断。


/*** Returns a source that reads from `socket`. Prefer this over [source]* because this method honors timeouts. When the socket* read times out, the socket is asynchronously closed by a watchdog thread.*/
fun Socket.source(): Source {val timeout = SocketAsyncTimeout(this)val source = InputStreamSource(getInputStream(), timeout)return timeout.source(source)
}private class SocketAsyncTimeout(private val socket: Socket) : AsyncTimeout() {private val logger = Logger.getLogger("okio.Okio")override fun newTimeoutException(cause: IOException?): IOException {val ioe = SocketTimeoutException("timeout")if (cause != null) {ioe.initCause(cause)}return ioe}override fun timedOut() {try {socket.close()} catch (e: Exception) {logger.log(Level.WARNING, "Failed to close timed out socket $socket", e)} }


public final Source source(final Source source) {return new Source() {@Override public long read(Buffer sink, long byteCount) throws IOException {boolean throwOnTimeout = false;enter();try {long result = source.read(sink, byteCount);throwOnTimeout = true;return result;} catch (IOException e) {throw exit(e);} finally {exit(throwOnTimeout);}}//.....};}


  public final void enter() {if (inQueue) throw new IllegalStateException("Unbalanced enter/exit");long timeoutNanos = timeoutNanos();boolean hasDeadline = hasDeadline();//如果没有设置超时时间也没有设置超时截止时间,这里就直接返回了if (timeoutNanos == 0 && !hasDeadline) {return; // No timeout and no deadline? Don't bother with the queue.}inQueue = true;scheduleTimeout(this, timeoutNanos, hasDeadline);}


  private static synchronized void scheduleTimeout(AsyncTimeout node, long timeoutNanos, boolean hasDeadline) {// 第一次运行的时候会创建头结点并启动Watchdog线程if (head == null) {head = new AsyncTimeout();new Watchdog().start();}//....省略部分代码// 按顺序插入节点long remainingNanos = node.remainingNanos(now);for (AsyncTimeout prev = head; true; prev = prev.next) {//这里说明会形成一个超时时间递增的Timeout单链表if (prev.next == null || remainingNanos < prev.next.remainingNanos(now)) {node.next = prev.next;prev.next = node;if (prev == head) {AsyncTimeout.class.notify(); // 当在头部插入的时候,唤醒 watchdog}break;}}}


private static final class Watchdog extends Thread {Watchdog() {super("Okio Watchdog");setDaemon(true);}public void run() {while (true) {try {AsyncTimeout timedOut;synchronized (AsyncTimeout.class) {timedOut = awaitTimeout();// Didn't find a node to interrupt. Try again.if (timedOut == null) continue;// The queue is completely empty. Let this thread exit and let another watchdog thread// get created on the next call to scheduleTimeout().if (timedOut == head) {head = null;return;}}// Close the timed out node.timedOut.timedOut();} catch (InterruptedException ignored) {}}}}

Watchdog线程中一直在跑一个while死循环,并且会锁住AsyncTimeout.class,在这个死循环中主要处理的就是一个由AsyncTimeout 组成的单链表,这个链表中的每个AsyncTimeout对象是按照超时时间递增的顺序排列的,越靠近链表的头部超时时间越短,在AsyncTimeout内部有三个成员变量:

  static @Nullable AsyncTimeout head;/** True if this node is currently in the queue. */private boolean inQueue;/** The next node in the linked list. */private @Nullable AsyncTimeout next;



 static @Nullable AsyncTimeout awaitTimeout() throws InterruptedException {// Get the next eligible node.AsyncTimeout node = head.next;// 如果链表为空,则一直等待新的超时节点的插入 或者一个idle timeout 的发生(60s).if (node == null) {long startNanos = System.nanoTime();AsyncTimeout.class.wait(IDLE_TIMEOUT_MILLIS);return head.next == null && (System.nanoTime() - startNanos) >= IDLE_TIMEOUT_NANOS? head  // The idle timeout elapsed.: null; // The situation has changed.}long waitNanos = node.remainingNanos(System.nanoTime());// 如果结点没有超时完毕,则等待if (waitNanos > 0) {long waitMillis = waitNanos / 1000000L;waitNanos -= (waitMillis * 1000000L);AsyncTimeout.class.wait(waitMillis, (int) waitNanos);return null;}// 超时完毕了就移除这个结点head.next = node.next;node.next = null;return node;}


  /** Returns true if the timeout occurred. */public final boolean exit() {if (!inQueue) return false;inQueue = false;return cancelScheduledTimeout(this);}

exit()方法返回true表示超时发生了,如果当前AsyncTimeout 对象没有在链表中,则返回false不处理(这种情况是没有设置超时时间则inQueue默认为false),否则以cancelScheduledTimeout()作为最终的返回值,再看这个方法:

  /** Returns true if the timeout occurred. */private static synchronized boolean cancelScheduledTimeout(AsyncTimeout node) {// Remove the node from the linked list.for (AsyncTimeout prev = head; prev != null; prev = prev.next) {//找到这个节点的同时会把它从链表移除if (prev.next == node) {prev.next = node.next;node.next = null;return false;}}// The node wasn't found in the linked list: it must have timed out!return true;}




以上,部分源码是截取的kotlin的,部分源码是截取的java版的,因为最新版的Okio源码是完全用kotlin写的,还是java的源码看着方便一些。Okio的源码短小精悍,尤其是链表的应用是非常值得学习的,而且它对于内存和CPU的精打细算让人称赞。正是这种接近强迫症的“斤斤计较”的态度使得Okio在效率上要高于原生的java IO库。



