okio 的使用及源码分析

简介

okio 补充了 java.io 和 java.nio 的内容,使得数据访问、存储和处理更加便捷。本文将简单介绍一下 okio 的使用以及基于 okio-1.17.0 版本,对 okio 的源码进行分析,最后总结一下 okio 的优点。

一些重要的类

ByteString 和 Buffer

ByteString 和 Buffer 是 okio 的两个核心基础类

  • ByteString 是一个不可变的字节序列。String 提供了对于字符数据的各种操作,但对于二进制数据,却没有这样的存在。ByteString 的出现填补了这个空缺,它提供了对二进制数据的各种操作,例如提取子串、判等、获取位置等,也能将数据编解码为十六进制、base64 和 UTF-8 格式。

  • Buffer 是一个可变的字节序列,就像 ArrayList 一样,不需要考虑它的容量。在写入和读取元素的时候,就像队列一样,从它的头部读取数据,尾部写入数据。Buffer 实现了 BufferSource 和 BufferSink,提供了访问数据缓冲区所需的一切 API。

其他:

  • 把一个 String 编码为 utf8 时,会引用原 String,后面解码时就可以直接返回了

Source 和 Sink

Source 和 Sink 类似于 InputStream 和 OutputStream,都是 IO 操作的顶级接口。其中 Source 为输入流、Sink 为输出流。它们还有一些新特性:

  1. 提供超时机制
  2. API 更加简洁,易于实现,例如 Source 仅仅声明了 read, close, timeout 方法
  3. 为了更方便地处理数据,还提供了 BufferedSource 和 BufferedSink 接口
  4. 不再区分字节流和字符流,它们都是数据,可以按照各种类型进行读写
  5. 便于测试,Buffer 同时实现了 BufferedSource 和 BufferedSink,便于编写测试代码

RealBufferedSource 和 RealBufferedSink

RealBufferedSource 和 RealBufferedSink 分别实现了 BufferedSource 和 BufferedSink 接口。

它们都有一个 Buffer 成员变量,在执行方法时,它们并没有做什么,实际调用的是 Buffer 的该方法。这体现了装饰模式。所以这也说明了 BufferedSource 和 BufferedSink 接口的真正实现都在 Buffer 中

为什么不各自实现呢?

因为 BufferedSource 和 BufferedSink 的高效实现有很大部分是共通的,为了避免同样的逻辑重复两遍,就把读写操作都在 Buffer 中实现,这样逻辑更加紧凑,也方便修改。

另一方面,Buffer 的实现可以满足“两用数据缓冲区”的需求,即可以从头部读取数据,也可以向尾部添加数据。对于单独的读和写,也提供了两个委托类:RealBufferedSource 和 RealBufferedSink。

简单使用

读取文本

以官方 demo 为例,这里读取的是一个 File 文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public void readLines(File file) throws IOException {
try (Source fileSource = Okio.source(file);
BufferedSource bufferedSource = Okio.buffer(fileSource)) {

while (true) {
String line = bufferedSource.readUtf8Line();
if (line == null) break;

if (line.contains("square")) {
System.out.println(line);
}
}

}
}

可以看到,主要步骤如下:

1
2
3
4
5
6
7
// 1. 构建 Source
Source fileSource = Okio.source(file);
// 2. 构建 BufferedSource(RealBufferedSource)
BufferedSource bufferedSource = Okio.buffer(fileSource);
// 3. 从 BufferedSource 中读取文本
// 在 dmeo 中是按 utf8 格式逐行读取
String line = bufferedSource.readUtf8Line();

  1. 根据 InputStream、File、Path 或 Socket(最终都转化为 InputStream)构建 Source
  2. 根据 Source 构建 BufferedSource(RealBufferedSource)
  3. 从 BufferedSource 中读取文本,demo 中是按 utf8 格式逐行读取。如此之外,还可以按字节读取。如果文件是自动定义的特殊结构,还可以使用 readInt、readLong 等方法。

写入文本

继续看官方 demo:

1
2
3
4
5
6
7
8
9
10
11
12
public void writeEnv(File file) throws IOException {
try (Sink fileSink = Okio.sink(file);
BufferedSink bufferedSink = Okio.buffer(fileSink)) {

for (Map.Entry<String, String> entry : System.getenv().entrySet()) {
bufferedSink.writeUtf8(entry.getKey());
bufferedSink.writeUtf8("=");
bufferedSink.writeUtf8(entry.getValue());
bufferedSink.writeUtf8("\n");
}

}

可以看到,主要步骤和读取文本时类似:

1
2
3
4
5
6
// 1. 构建 Sink
Sink fileSink = Okio.sink(file);
// 2. 构建 BufferedSink
BufferedSink bufferedSink = Okio.buffer(fileSink);
// 3. 向 BufferedSink 写入文本
bufferedSink.writeUtf8("要写入的文本");

  1. 根据 OutputStream、File、Path 或 Socket(最终都转化为 OutputStream)构建 Sink
  2. 根据 Sink 构建 BufferedSink(RealBufferedSink)
  3. 向 BufferedSink 写入文本,除了写入 Stirng,还可以写入 byte[]、int、long。

源码分析

Segment

Segment 的字面意思是片段。okio 将 Buffer 分割成一个个 Segment,Segment 内部维护着固定长度的 byte 数组,数据存储在 byte 数组中,同时 Segment 拥有前面节点和后面节点的引用,是一个双向链表。

成员变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 每个 Segment 所含的字节数量
static final int SIZE = 8192;

// 要进行分割时,如果要分割的字节数达到该值,才进行 Segment 共享
static final int SHARE_MINIMUM = 1024;

// 存储数据
final byte[] data;

// 下一可读位置
int pos;

// 第一个可写位置
int limit;

// 当前 Segment 存储的数据是否被其他 Segment 或 ByteString 使用
boolean shared;

// 是否为当前数据的拥有者并可以继续写入数据
boolean owner;

// 指向下一个 Segment
Segment next;

// 指向前一个 Segment
Segment prev;

构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
Segment() {
this.data = new byte[SIZE];
this.owner = true;
this.shared = false;
}

Segment(byte[] data, int pos, int limit, boolean shared, boolean owner) {
this.data = data;
this.pos = pos;
this.limit = limit;
this.shared = shared;
this.owner = owner;
}

有两个构造方法,其中无参构造方法 owner 为 true、shared 为 false,说明数据的拥有者只有自己,没有被共享。

另一构造方法,则可以根据需要,返回相应的 Segment:

1
2
3
4
5
6
7
8
9
/**
* Returns a new segment that shares the underlying byte array with this. Adjusting pos and limit
* are safe but writes are forbidden. This also marks the current segment as shared, which
* prevents it from being pooled.
*/
final Segment sharedCopy() {
shared = true;
return new Segment(data, pos, limit, true, false);
}
1
2
3
4
/** Returns a new segment that its own private copy of the underlying byte array. */
final Segment unsharedCopy() {
return new Segment(data.clone(), pos, limit, false, true);
}

下面来看下 Segment 的几个方法:

pop

1
2
3
4
5
6
7
8
public final @Nullable Segment pop() {
Segment result = next != this ? next : null;
prev.next = next;
next.prev = prev;
next = null;
prev = null;
return result;
}

pop 方法将自己移除出链表,并将自己的前后两个节点连接起来,最后返回下一个 Segment。

既然有 pop 方法,那当然还有 push 方法:

push

1
2
3
4
5
6
7
public final Segment push(Segment segment) {
segment.prev = this;
segment.next = next;
next.prev = segment;
next = segment;
return segment;
}

push 方法传入一个 Segment,将该 Segment 插入到自己后面并返回插入的 Segment。

writeTo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
 public final void writeTo(Segment sink, int byteCount) {
// 不能对无法写入数据的 Segment 操作
if (!sink.owner) throw new IllegalArgumentException();
if (sink.limit + byteCount > SIZE) {
// 正在共享的 Segment
if (sink.shared) throw new IllegalArgumentException();
if (sink.limit + byteCount - sink.pos > SIZE) throw new IllegalArgumentException();
// 可能由于前面使用了 read 方法取出数据时导致 pos 后移(pos > 0)
// 这里先将从 pos 开始的数据移回到开头,即索引为 0 处,并更新 pos 和 limit
System.arraycopy(sink.data, sink.pos, sink.data, 0, sink.limit - sink.pos);
sink.limit -= sink.pos;
sink.pos = 0;
}
// 本 Segment 从 pos 开始取 byteCount 个字节写入到 sink
System.arraycopy(data, pos, sink.data, sink.limit, byteCount);
// 更新 sink 的可写位置和自己的下一读取位置
sink.limit += byteCount;
pos += byteCount;
}

该方法从自己的 pos 位置开始,读取一定数量的字节并写入到另一 Segment 中。这是一种数据的转移,主要用于 Segment 的压缩。

compact(压缩机制)

因为每个 Segment 的存储的数据长度是固定的,如果经过一段时间后,每个 Segment 的数据长度不一,可能有些 Segment 只有很小的数据。这时就可以通过 Segment 的压缩机制进行优化,该机制通过 compact 方法实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
 public final void compact() {
// 上一节点就是自己,说明只有一个节点,无法压缩
if (prev == this) throw new IllegalStateException();
// 如果上一节点的数据不只自己拥有,那么不能压缩
if (!prev.owner) return; // Cannot compact: prev isn't writable.
// 当前自己拥有的数据量
int byteCount = limit - pos;
// 计算上一节点的可写入数据量
int availableByteCount = SIZE - prev.limit + (prev.shared ? 0 : prev.pos);
// 上一节点不足以容纳自己的数据,无法压缩
if (byteCount > availableByteCount) return;
// 将自己的数据写入到上一节点
writeTo(prev, byteCount);
// 将自己从链表中删除
pop();
// 回收
SegmentPool.recycle(this);
}

在进行压缩时,如果上一个 Segment 可以写入数据并足以容纳自己的数据,就将自己的数据写入到上一节点,然后将自己从链表中删除并回收。

split(共享机制)

split 方法可以从某个 Segment 中分割出一个新的 Segment。其中新 Segment 包含 [pos, pos + byteCount) 的数据,而原 Segment 只剩下 [pos + byteCount, limit) 的数据。最后新 Segment 会插入到原 Segment 的前面。具体看下该方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
 public final Segment split(int byteCount) {
if (byteCount <= 0 || byteCount > limit - pos) throw new IllegalArgumentException();

Segment prefix;

// 只有当数据量比较大时,才共享当前 Segment
if (byteCount >= SHARE_MINIMUM) {
prefix = sharedCopy();
}
// 数据量较少时不共享,将数据复制到新的 Segment
else {
prefix = SegmentPool.take();
System.arraycopy(data, pos, prefix.data, 0, byteCount);
}

// 更新新 Segment 的可写位置
prefix.limit = prefix.pos + byteCount;
// 更新本 Segment 的下一读取位置
pos += byteCount;
// 将新 Segment 插入到自己前面
prev.push(prefix);
// 返回新的 Segment
return prefix;
}

在进行分割时,如果要分割的数据量比较巨大,那么将进行数据共享而不是数据复制,也就是说新 Segment 和原 Segment 引用同一个 byte 数组。而数据量比较小时,不会进行共享,因为太多的共享会导致 Segment 链变长,这时会进行复制。

如果是进行复制的话,新 Segment 的实例是通过 SegmentPool.take() 获得的。下面就分析一些 SegmentPool:

SegmentPool

SegmentPool 主要复制 Segment 的回收和闲置 Segment 的管理。Buffer 使用的 Segment 是从 SegmentPool 中获取的,这样可以避免频繁地创建和销毁 Segment。

这个类比较简单,只有三个成员变量,分别如下:

1
2
3
4
5
6
7
8
9
// SegmentPool 的最大存储字节数
// 一个 Segment 存储 8 * 1024 个字节,所以 SegmentPool 只能存 8 个 Segment
static final long MAX_SIZE = 64 * 1024; // 64 KiB.

// 一个存储回收 Segment 的单向链表
static @Nullable Segment next;

// 当前所有 Segment 存储的总字节数
static long byteCount;

SegmentPool 只有两个方法,一个是回收 Segment,一个是获取 Segment。

take

take 方法用于获取 Segment:

1
2
3
4
5
6
7
8
9
10
11
12
static Segment take() {
synchronized (SegmentPool.class) {
if (next != null) {
Segment result = next;
next = result.next;
result.next = null;
byteCount -= Segment.SIZE;
return result;
}
}
return new Segment();
}

如果当前池里没有 Segment 就新建一个 Segment 返回,否则就从单向链表中取一个 Segment 返回。

recycle

recycle 方法用于回收 Segment:

1
2
3
4
5
6
7
8
9
10
11
12
 static void recycle(Segment segment) {
if (segment.next != null || segment.prev != null) throw new IllegalArgumentException();
// 不能回收共享的 Segment
if (segment.shared) return;
synchronized (SegmentPool.class) {
if (byteCount + Segment.SIZE > MAX_SIZE) return; // Pool is full.
byteCount += Segment.SIZE;
segment.next = next;
segment.pos = segment.limit = 0;
next = segment;
}
}

共享状态的 Segment 不能回收,如果池没满就将回收该 Segment,将其加入单向链表中。

Buffer

Buffer 存储的是可变字节序列,内部使用 Segment 来存储元素,Segment 是一个双向循环链表,它的内部有一个 byte 数组用于存储元素。

Buffer 使用 Segment 的好处是:当元素要从一个 Buffer 移到另一个 Buffer 的时候,并不用进行数组元素的拷贝,只要改变 Segment 的所有者即可。

Buffer 既可以读,也可以写,下面分别调两个方法看一下读写的过程。

首先看 readByte 方法,该方法读取一个字节:

readByte

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
 @Override 
public byte readByte() {
if (size == 0) throw new IllegalStateException("size == 0");

// 从头开始读
Segment segment = head;
int pos = segment.pos;
int limit = segment.limit;

byte[] data = segment.data;
byte b = data[pos++];
size -= 1;

// 读取元素后,若头 Segment 没有元素了,就从链表中删除并回收该 Segment
if (pos == limit) {
head = segment.pop();
SegmentPool.recycle(segment);
} else {
segment.pos = pos;
}

return b;
}

可以看到,读取元素的时候是从头开始读的。如果读取元素后,头 Segment 没有元素了,就从链表中删除并回收该 Segment。

继续看一个写的方法,这里看 writeByte:

writeByte

1
2
3
4
5
6
7
8
9
 @Override 
public Buffer writeByte(int b) {
// 返回一个满足所需容量的尾 Segment
Segment tail = writableSegment(1);
// 写入字节
tail.data[tail.limit++] = (byte) b;
size += 1;
return this;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
 Segment writableSegment(int minimumCapacity) {
if (minimumCapacity < 1 || minimumCapacity > Segment.SIZE) throw new IllegalArgumentException();

// 头 Segment 为空,则先创建一个双向循环链表
if (head == null) {
head = SegmentPool.take();
return head.next = head.prev = head;
}

// 找到尾 Segment,从尾部写入
Segment tail = head.prev;
// 若不满足容量,或者不能写入数据,就在尾部添加一个 Segment
if (tail.limit + minimumCapacity > Segment.SIZE || !tail.owner) {
tail = tail.push(SegmentPool.take()); // Append a new empty segment to fill up.
}
return tail;
}

可以看到,写入时是从尾部写入的,如果此时头结点为空,就会新建一个双向循环链表。如果尾 Segment 容量不足或不能写入数据,就会在尾部添加一个 Segment。

超时机制

Timeout

以输出流 Sink 为例,当我们用下面的方法包装输出流的时候:

1
2
3
public static Sink sink(OutputStream out) {
return sink(out, new Timeout());
}

真正调用的是带两个参数的 sink 方法,第二个参数就是 TimeOut:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private static Sink sink(final OutputStream out, final Timeout timeout) {
if (out == null) throw new IllegalArgumentException("out == null");
if (timeout == null) throw new IllegalArgumentException("timeout == null");

return new Sink() {
@Override public void write(Buffer source, long byteCount) throws IOException {

while (byteCount > 0) {
// 进行了超时判断
timeout.throwIfReached();

// ...
}
}

@Override public Timeout timeout() {
return timeout;
}

// ...

};
}

在写入数据的时候,通过调用 Timeout 的 throwIfReached 方法进行了超时判断,该方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
 public void throwIfReached() throws IOException {
// 线程被 interrupted
if (Thread.interrupted()) {
Thread.currentThread().interrupt(); // Retain interrupted status.
throw new InterruptedIOException("interrupted");
}

// 到达超时时间
if (hasDeadline && deadlineNanoTime - System.nanoTime() <= 0) {
throw new InterruptedIOException("deadline reached");
}
}

除了 Timeout 外,还有一个 AsyncTimeout。

AsyncTimeout

AsyncTimeout 继承于 Timeout,TimeOut 用于同步计时,即在同一个线程中执行 IO 操作和计时,而 AsyncTimeout 提供了异步计时的功能。

首先看一下 AsyncTimeOut 的成员变量:

1
2
3
4
5
6
7
8
9
10
11
12
13
// 一次最多可以写入 64K 数据,超过该容量可能会导致在慢连接中超时,故做出限制
private static final int TIMEOUT_WRITE_SIZE = 64 * 1024;

// AsyncTimeout 单链表的头结点
static @Nullable AsyncTimeout head;
// 下一节点
private @Nullable AsyncTimeout next;

// 判断当前节点是否已经入队
private boolean inQueue;

// 超时时间
private long timeoutAt;

可以看到,AsyncTimeout 是一个单链表。

什么时候会使用到 AsyncTimeout 呢?

当我们对 Socket 进行包装时就引入了异步计时机制,之所以在对 Socket 写操作时采取异步超时,是由 Socket 自身的性质决定的,Socket 经常会阻塞自己,无法同步计时,只能采取异步方式。

该过程从 Okio 的sink方法看起:

1
2
3
4
5
6
7
8
9
10
public static Sink sink(Socket socket) throws IOException {
if (socket == null) throw new IllegalArgumentException("socket == null");
if (socket.getOutputStream() == null) throw new IOException("socket's output stream == null");
// 把 Socket 封装到 AsyncTimeout 中
AsyncTimeout timeout = timeout(socket);
// 得到 Sink
Sink sink = sink(socket.getOutputStream(), timeout);
// AsyncTimeout 对 Sink 进行包装
return timeout.sink(sink);
}

该方法中,首先把 Socket 封装到 AsyncTimeout 中,得到 Sink 后,AsyncTimeout 会对 Sink 进行包装。

AsyncTimeout#sink

包装过程调用的是 AsyncTimeout 的 sink 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public final Sink sink(final Sink sink) {
return new Sink() {
@Override public void write(Buffer source, long byteCount) throws IOException {
boolean throwOnTimeout = false;
enter();
try {
sink.write(source, toWrite);
byteCount -= toWrite;
throwOnTimeout = true;
} catch (IOException e) {
throw exit(e);
} finally {
exit(throwOnTimeout);
}
}
}

// ...

};
}

这里只分析 write 过程,首先看 enter 方法:

AsyncTimeout#enter

1
2
3
4
5
6
7
8
9
10
public final void enter() {
if (inQueue) throw new IllegalStateException("Unbalanced enter/exit");
long timeoutNanos = timeoutNanos();
boolean hasDeadline = hasDeadline();
if (timeoutNanos == 0 && !hasDeadline) {
return; // No timeout and no deadline? Don't bother with the queue.
}
inQueue = true;
scheduleTimeout(this, timeoutNanos, hasDeadline);
}

先判断了一下入队状态,真正的调用是在 scheduleTimeout 方法中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
 private static synchronized void scheduleTimeout(
AsyncTimeout node, long timeoutNanos, boolean hasDeadline) {

// 还未开启 Watchdog 线程,先开启 Watchdog 线程
if (head == null) {
head = new AsyncTimeout();
new Watchdog().start();
}

// 设置超时时间
long now = System.nanoTime();
if (timeoutNanos != 0 && hasDeadline) {
node.timeoutAt = now + Math.min(timeoutNanos, node.deadlineNanoTime() - now);
} else if (timeoutNanos != 0) {
node.timeoutAt = now + timeoutNanos;
} else if (hasDeadline) {
node.timeoutAt = node.deadlineNanoTime();
} else {
throw new AssertionError();
}

// 当前 AsyncTimeout 的剩余时间
long remainingNanos = node.remainingNanos(now);
for (AsyncTimeout prev = head; true; prev = prev.next) {
// 找到插入位置(如果某节点的下一个节点为 null或下一节点的剩余时间更长,就插入到该节点后面)
if (prev.next == null || remainingNanos < prev.next.remainingNanos(now)) {
node.next = prev.next;
prev.next = node;
// 如果插入到了最前面
if (prev == head) {
AsyncTimeout.class.notify(); // Wake up the watchdog when inserting at the front.
}
break;
}
}
}

该方法中,如果还未开启 Watchdog 线程,先开启 Watchdog 线程。然后设定超时时间以及将当前 AsyncTimeout 插入到链表的合适位置。

先看一下 Watchdog 线程:

AsyncTimeout#Watchdog

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private static final class Watchdog extends Thread {

public void run() {
while (true) {
try {
AsyncTimeout timedOut;
synchronized (AsyncTimeout.class) {
// 得到超时的节点
timedOut = awaitTimeout();

// 未找到超时节点,但链表还有节点,再次尝试
if (timedOut == null) continue;

// 链表没有节点了,将 head 置空,并退出线程
if (timedOut == head) {
head = null;
return;
}
}

// 回调,外部重新改方法进行超时处理,例如关闭 Socket
timedOut.timedOut();
} catch (InterruptedException ignored) {
}
}
}
}

可以看到,真正进行超时判断的是 awaitTimeout 方法:

AsyncTimeout#awaitTimeout

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
 static @Nullable AsyncTimeout awaitTimeout() throws InterruptedException {

// 拿到下一节点
AsyncTimeout node = head.next;

if (node == null) {
long startNanos = System.nanoTime();
// 等待一段时间
AsyncTimeout.class.wait(IDLE_TIMEOUT_MILLIS);
// 经过一段时间后还没有节点,就会返回 head,否则返回 null
return head.next == null && (System.nanoTime() - startNanos) >= IDLE_TIMEOUT_NANOS
? head // The idle timeout elapsed.
: null; // The situation has changed.
}

// 该节点剩余的时间
long waitNanos = node.remainingNanos(System.nanoTime());

// 如果该节点还未超时,继续等待,等待时间结束后返回 null
if (waitNanos > 0) {
long waitMillis = waitNanos / 1000000L;
waitNanos -= (waitMillis * 1000000L);
AsyncTimeout.class.wait(waitMillis, (int) waitNanos);
return null;
}

// 移除并返回超时的节点
head.next = node.next;
node.next = null;
return node;
}

该方法主要是为了得到超时的节点,并将超时的节点移除出链表。

下面回到 sink,继续看 write 方法,在调用完 enter 方法后,会调用 Buffer 的 write 方法进行写入,无论是写入过程抛出了异常,还是最后正常结束,都会在 finally 块中调用 exit(boolean) 方法:

AsyncTimeout#exit

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
final void exit(boolean throwOnTimeout) throws IOException {
// 是否超时
boolean timedOut = exit();
if (timedOut && throwOnTimeout) throw newTimeoutException(null);
}

public final boolean exit() {
if (!inQueue) return false;
inQueue = false;
return cancelScheduledTimeout(this);
}

/** Returns true if the timeout occurred. */
private static synchronized boolean cancelScheduledTimeout(AsyncTimeout node) {
// 从链表找到该节点并移除,能找到说明未超时
for (AsyncTimeout prev = head; prev != null; prev = prev.next) {
if (prev.next == node) {
prev.next = node.next;
node.next = null;
return false;
}
}

// 未找到该节点,说明发生了超时,在 Watchdog 线程中删除了该节点
return true;
}

在该方法中,如果发现超时的话,就会抛出异常,没有超时的话,就将 AsyncTimeout 移除出单链表。

小结

AsyncTimeout 用于异步计时,采取异步是为了应对某些读写时会发生阻塞的情况,例如对 Socket 进行写操作时,Socket 经常会阻塞自己,无法同步计时,只能采取异步方式。

在开始读写操作前,会给 AsyncTimeout 设置超时时间,并将其加入到一个单链表中,单链表中的节点按照剩余时间由短到长排列。在另一个线程里,会对这条链表进行监控,找到超时的节点,找到后回调给外界,外界就可以进行相应的回收处理,例如关闭 Socket。在读写操作完成后,如果没有发生超时,就将该 AsyncTimeout 从单链表中删除;如果发生超时,就会抛出异常。

okio 的优点

  1. 使用方便
  • 不再区分字节流和字符流,只有 Source 和 Sink,分别对应输入流和输出流。
  • Buffer 实现了 BufferedSource 和 BufferedSink,同时具有读和写的功能,提供了读写所需的一切 API。
  • 提供了 ByteString,可以很方便地对二进制数据进行各种操作和进行各种值的转换。
  1. 速度快
  • okio 对 Buffer 存储的数据进行了分块处理,每个 Segment 用一个双向循环链表连接起来,这样在进行 IO 操作时就可以以块为单位进行操作,提高吞吐量。
  • 由于 Buffer 是以 Segment 链表来管理元素,所以在 Buffer 间要转移元素的时候,并不用进行数组元素的拷贝,只要改变链表头指针的所有者即可。
  • ByteString 会保留一份对原来 String 的引用,这样当你把 UTF-8 的 String 转换为 ByteString 后,下次再要解码出 String 时就直接返回之前的 String,速度非常快。
  1. 提供超时机制
  • 内部会根据不同情况进行同步计时或异步计时,超时后会抛出异常,不会阻塞线程。而且不仅在 IO 操作有超时判断,在 flush、close 等方法也有超时判断。
  1. 内存复用
  • okio 有一个 SegmentPool,可以用来回收和获取空闲 Segment。在高频率通信时,可以有效避免频繁的 GC。

参考

-------------    本文到此结束  感谢您的阅读    -------------
0%