简介
okio 补充了 java.io 和 java.nio 的内容,使得数据访问、存储和处理更加便捷。本文将简单介绍一下 okio 的使用以及基于 okio-1.17.0 版本,对 okio 的源码进行分析,最后总结一下 okio 的优点。
一些重要的类
ByteString 和 Buffer
ByteString 和 Buffer 是 okio 的两个核心基础类
ByteString 是一个不可变的字节序列。String 提供了对于字符数据的各种操作,但对于二进制数据,却没有这样的存在。ByteString 的出现填补了这个空缺,它提供了对二进制数据的各种操作,例如提取子串、判等、获取位置等,也能将数据编解码为十六进制、base64 和 UTF-8 格式。
Buffer 是一个可变的字节序列,就像 ArrayList 一样,不需要考虑它的容量。在写入和读取元素的时候,就像队列一样,从它的头部读取数据,尾部写入数据。Buffer 实现了 BufferSource 和 BufferSink,提供了访问数据缓冲区所需的一切 API。
其他:
- 把一个 String 编码为 utf8 时,会引用原 String,后面解码时就可以直接返回了
Source 和 Sink
Source 和 Sink 类似于 InputStream 和 OutputStream,都是 IO 操作的顶级接口。其中 Source 为输入流、Sink 为输出流。它们还有一些新特性:
- 提供超时机制
- API 更加简洁,易于实现,例如 Source 仅仅声明了 read, close, timeout 方法
- 为了更方便地处理数据,还提供了 BufferedSource 和 BufferedSink 接口
- 不再区分字节流和字符流,它们都是数据,可以按照各种类型进行读写
- 便于测试,Buffer 同时实现了 BufferedSource 和 BufferedSink,便于编写测试代码
RealBufferedSource 和 RealBufferedSink
RealBufferedSource 和 RealBufferedSink 分别实现了 BufferedSource 和 BufferedSink 接口。
它们都有一个 Buffer 成员变量,在执行方法时,它们并没有做什么,实际调用的是 Buffer 的该方法。这体现了装饰模式。所以这也说明了 BufferedSource 和 BufferedSink 接口的真正实现都在 Buffer 中。
为什么不各自实现呢?
因为 BufferedSource 和 BufferedSink 的高效实现有很大部分是共通的,为了避免同样的逻辑重复两遍,就把读写操作都在 Buffer 中实现,这样逻辑更加紧凑,也方便修改。
另一方面,Buffer 的实现可以满足“两用数据缓冲区”的需求,即可以从头部读取数据,也可以向尾部添加数据。对于单独的读和写,也提供了两个委托类:RealBufferedSource 和 RealBufferedSink。
简单使用
读取文本
以官方 demo 为例,这里读取的是一个 File 文件:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15public void readLines(File file) throws IOException {
try (Source fileSource = Okio.source(file);
BufferedSource bufferedSource = Okio.buffer(fileSource)) {
while (true) {
String line = bufferedSource.readUtf8Line();
if (line == null) break;
if (line.contains("square")) {
System.out.println(line);
}
}
}
}
可以看到,主要步骤如下:1
2
3
4
5
6
7// 1. 构建 Source
Source fileSource = Okio.source(file);
// 2. 构建 BufferedSource(RealBufferedSource)
BufferedSource bufferedSource = Okio.buffer(fileSource);
// 3. 从 BufferedSource 中读取文本
// 在 dmeo 中是按 utf8 格式逐行读取
String line = bufferedSource.readUtf8Line();
- 根据 InputStream、File、Path 或 Socket(最终都转化为 InputStream)构建 Source
- 根据 Source 构建 BufferedSource(RealBufferedSource)
- 从 BufferedSource 中读取文本,demo 中是按 utf8 格式逐行读取。如此之外,还可以按字节读取。如果文件是自动定义的特殊结构,还可以使用 readInt、readLong 等方法。
写入文本
继续看官方 demo:1
2
3
4
5
6
7
8
9
10
11
12public void writeEnv(File file) throws IOException {
try (Sink fileSink = Okio.sink(file);
BufferedSink bufferedSink = Okio.buffer(fileSink)) {
for (Map.Entry<String, String> entry : System.getenv().entrySet()) {
bufferedSink.writeUtf8(entry.getKey());
bufferedSink.writeUtf8("=");
bufferedSink.writeUtf8(entry.getValue());
bufferedSink.writeUtf8("\n");
}
}
可以看到,主要步骤和读取文本时类似:1
2
3
4
5
6// 1. 构建 Sink
Sink fileSink = Okio.sink(file);
// 2. 构建 BufferedSink
BufferedSink bufferedSink = Okio.buffer(fileSink);
// 3. 向 BufferedSink 写入文本
bufferedSink.writeUtf8("要写入的文本");
- 根据 OutputStream、File、Path 或 Socket(最终都转化为 OutputStream)构建 Sink
- 根据 Sink 构建 BufferedSink(RealBufferedSink)
- 向 BufferedSink 写入文本,除了写入 Stirng,还可以写入 byte[]、int、long。
源码分析
Segment
Segment 的字面意思是片段。okio 将 Buffer 分割成一个个 Segment,Segment 内部维护着固定长度的 byte 数组,数据存储在 byte 数组中,同时 Segment 拥有前面节点和后面节点的引用,是一个双向链表。
成员变量
1 | // 每个 Segment 所含的字节数量 |
构造方法
1 | Segment() { |
有两个构造方法,其中无参构造方法 owner 为 true、shared 为 false,说明数据的拥有者只有自己,没有被共享。
另一构造方法,则可以根据需要,返回相应的 Segment:
1 | /** |
1 | /** Returns a new segment that its own private copy of the underlying byte array. */ |
下面来看下 Segment 的几个方法:
pop
1 | public final Segment pop() { |
pop 方法将自己移除出链表,并将自己的前后两个节点连接起来,最后返回下一个 Segment。
既然有 pop 方法,那当然还有 push 方法:
push
1 | public final Segment push(Segment segment) { |
push 方法传入一个 Segment,将该 Segment 插入到自己后面并返回插入的 Segment。
writeTo
1 | public final void writeTo(Segment sink, int byteCount) { |
该方法从自己的 pos 位置开始,读取一定数量的字节并写入到另一 Segment 中。这是一种数据的转移,主要用于 Segment 的压缩。
compact(压缩机制)
因为每个 Segment 的存储的数据长度是固定的,如果经过一段时间后,每个 Segment 的数据长度不一,可能有些 Segment 只有很小的数据。这时就可以通过 Segment 的压缩机制进行优化,该机制通过 compact 方法实现:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 public final void compact() {
// 上一节点就是自己,说明只有一个节点,无法压缩
if (prev == this) throw new IllegalStateException();
// 如果上一节点的数据不只自己拥有,那么不能压缩
if (!prev.owner) return; // Cannot compact: prev isn't writable.
// 当前自己拥有的数据量
int byteCount = limit - pos;
// 计算上一节点的可写入数据量
int availableByteCount = SIZE - prev.limit + (prev.shared ? 0 : prev.pos);
// 上一节点不足以容纳自己的数据,无法压缩
if (byteCount > availableByteCount) return;
// 将自己的数据写入到上一节点
writeTo(prev, byteCount);
// 将自己从链表中删除
pop();
// 回收
SegmentPool.recycle(this);
}
在进行压缩时,如果上一个 Segment 可以写入数据并足以容纳自己的数据,就将自己的数据写入到上一节点,然后将自己从链表中删除并回收。
split(共享机制)
split 方法可以从某个 Segment 中分割出一个新的 Segment。其中新 Segment 包含 [pos, pos + byteCount) 的数据,而原 Segment 只剩下 [pos + byteCount, limit) 的数据。最后新 Segment 会插入到原 Segment 的前面。具体看下该方法:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24 public final Segment split(int byteCount) {
if (byteCount <= 0 || byteCount > limit - pos) throw new IllegalArgumentException();
Segment prefix;
// 只有当数据量比较大时,才共享当前 Segment
if (byteCount >= SHARE_MINIMUM) {
prefix = sharedCopy();
}
// 数据量较少时不共享,将数据复制到新的 Segment
else {
prefix = SegmentPool.take();
System.arraycopy(data, pos, prefix.data, 0, byteCount);
}
// 更新新 Segment 的可写位置
prefix.limit = prefix.pos + byteCount;
// 更新本 Segment 的下一读取位置
pos += byteCount;
// 将新 Segment 插入到自己前面
prev.push(prefix);
// 返回新的 Segment
return prefix;
}
在进行分割时,如果要分割的数据量比较巨大,那么将进行数据共享而不是数据复制,也就是说新 Segment 和原 Segment 引用同一个 byte 数组。而数据量比较小时,不会进行共享,因为太多的共享会导致 Segment 链变长,这时会进行复制。
如果是进行复制的话,新 Segment 的实例是通过 SegmentPool.take() 获得的。下面就分析一些 SegmentPool:
SegmentPool
SegmentPool 主要复制 Segment 的回收和闲置 Segment 的管理。Buffer 使用的 Segment 是从 SegmentPool 中获取的,这样可以避免频繁地创建和销毁 Segment。
这个类比较简单,只有三个成员变量,分别如下:1
2
3
4
5
6
7
8
9// SegmentPool 的最大存储字节数
// 一个 Segment 存储 8 * 1024 个字节,所以 SegmentPool 只能存 8 个 Segment
static final long MAX_SIZE = 64 * 1024; // 64 KiB.
// 一个存储回收 Segment 的单向链表
static Segment next;
// 当前所有 Segment 存储的总字节数
static long byteCount;
SegmentPool 只有两个方法,一个是回收 Segment,一个是获取 Segment。
take
take 方法用于获取 Segment:1
2
3
4
5
6
7
8
9
10
11
12static Segment take() {
synchronized (SegmentPool.class) {
if (next != null) {
Segment result = next;
next = result.next;
result.next = null;
byteCount -= Segment.SIZE;
return result;
}
}
return new Segment();
}
如果当前池里没有 Segment 就新建一个 Segment 返回,否则就从单向链表中取一个 Segment 返回。
recycle
recycle 方法用于回收 Segment:1
2
3
4
5
6
7
8
9
10
11
12 static void recycle(Segment segment) {
if (segment.next != null || segment.prev != null) throw new IllegalArgumentException();
// 不能回收共享的 Segment
if (segment.shared) return;
synchronized (SegmentPool.class) {
if (byteCount + Segment.SIZE > MAX_SIZE) return; // Pool is full.
byteCount += Segment.SIZE;
segment.next = next;
segment.pos = segment.limit = 0;
next = segment;
}
}
共享状态的 Segment 不能回收,如果池没满就将回收该 Segment,将其加入单向链表中。
Buffer
Buffer 存储的是可变字节序列,内部使用 Segment 来存储元素,Segment 是一个双向循环链表,它的内部有一个 byte 数组用于存储元素。
Buffer 使用 Segment 的好处是:当元素要从一个 Buffer 移到另一个 Buffer 的时候,并不用进行数组元素的拷贝,只要改变 Segment 的所有者即可。
Buffer 既可以读,也可以写,下面分别调两个方法看一下读写的过程。
首先看 readByte 方法,该方法读取一个字节:
readByte
1 |
|
可以看到,读取元素的时候是从头开始读的。如果读取元素后,头 Segment 没有元素了,就从链表中删除并回收该 Segment。
继续看一个写的方法,这里看 writeByte:
writeByte
1 |
|
1 | Segment writableSegment(int minimumCapacity) { |
可以看到,写入时是从尾部写入的,如果此时头结点为空,就会新建一个双向循环链表。如果尾 Segment 容量不足或不能写入数据,就会在尾部添加一个 Segment。
超时机制
Timeout
以输出流 Sink 为例,当我们用下面的方法包装输出流的时候:1
2
3public static Sink sink(OutputStream out) {
return sink(out, new Timeout());
}
真正调用的是带两个参数的 sink 方法,第二个参数就是 TimeOut:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23private static Sink sink(final OutputStream out, final Timeout timeout) {
if (out == null) throw new IllegalArgumentException("out == null");
if (timeout == null) throw new IllegalArgumentException("timeout == null");
return new Sink() {
public void write(Buffer source, long byteCount) throws IOException {
while (byteCount > 0) {
// 进行了超时判断
timeout.throwIfReached();
// ...
}
}
public Timeout timeout() {
return timeout;
}
// ...
};
}
在写入数据的时候,通过调用 Timeout 的 throwIfReached 方法进行了超时判断,该方法如下:1
2
3
4
5
6
7
8
9
10
11
12 public void throwIfReached() throws IOException {
// 线程被 interrupted
if (Thread.interrupted()) {
Thread.currentThread().interrupt(); // Retain interrupted status.
throw new InterruptedIOException("interrupted");
}
// 到达超时时间
if (hasDeadline && deadlineNanoTime - System.nanoTime() <= 0) {
throw new InterruptedIOException("deadline reached");
}
}
除了 Timeout 外,还有一个 AsyncTimeout。
AsyncTimeout
AsyncTimeout 继承于 Timeout,TimeOut 用于同步计时,即在同一个线程中执行 IO 操作和计时,而 AsyncTimeout 提供了异步计时的功能。
首先看一下 AsyncTimeOut 的成员变量:1
2
3
4
5
6
7
8
9
10
11
12
13// 一次最多可以写入 64K 数据,超过该容量可能会导致在慢连接中超时,故做出限制
private static final int TIMEOUT_WRITE_SIZE = 64 * 1024;
// AsyncTimeout 单链表的头结点
static AsyncTimeout head;
// 下一节点
private AsyncTimeout next;
// 判断当前节点是否已经入队
private boolean inQueue;
// 超时时间
private long timeoutAt;
可以看到,AsyncTimeout 是一个单链表。
什么时候会使用到 AsyncTimeout 呢?
当我们对 Socket 进行包装时就引入了异步计时机制,之所以在对 Socket 写操作时采取异步超时,是由 Socket 自身的性质决定的,Socket 经常会阻塞自己,无法同步计时,只能采取异步方式。
该过程从 Okio 的sink方法看起:1
2
3
4
5
6
7
8
9
10public static Sink sink(Socket socket) throws IOException {
if (socket == null) throw new IllegalArgumentException("socket == null");
if (socket.getOutputStream() == null) throw new IOException("socket's output stream == null");
// 把 Socket 封装到 AsyncTimeout 中
AsyncTimeout timeout = timeout(socket);
// 得到 Sink
Sink sink = sink(socket.getOutputStream(), timeout);
// AsyncTimeout 对 Sink 进行包装
return timeout.sink(sink);
}
该方法中,首先把 Socket 封装到 AsyncTimeout 中,得到 Sink 后,AsyncTimeout 会对 Sink 进行包装。
AsyncTimeout#sink
包装过程调用的是 AsyncTimeout 的 sink 方法:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21public final Sink sink(final Sink sink) {
return new Sink() {
public void write(Buffer source, long byteCount) throws IOException {
boolean throwOnTimeout = false;
enter();
try {
sink.write(source, toWrite);
byteCount -= toWrite;
throwOnTimeout = true;
} catch (IOException e) {
throw exit(e);
} finally {
exit(throwOnTimeout);
}
}
}
// ...
};
}
这里只分析 write 过程,首先看 enter 方法:
AsyncTimeout#enter
1 | public final void enter() { |
先判断了一下入队状态,真正的调用是在 scheduleTimeout 方法中:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36 private static synchronized void scheduleTimeout(
AsyncTimeout node, long timeoutNanos, boolean hasDeadline) {
// 还未开启 Watchdog 线程,先开启 Watchdog 线程
if (head == null) {
head = new AsyncTimeout();
new Watchdog().start();
}
// 设置超时时间
long now = System.nanoTime();
if (timeoutNanos != 0 && hasDeadline) {
node.timeoutAt = now + Math.min(timeoutNanos, node.deadlineNanoTime() - now);
} else if (timeoutNanos != 0) {
node.timeoutAt = now + timeoutNanos;
} else if (hasDeadline) {
node.timeoutAt = node.deadlineNanoTime();
} else {
throw new AssertionError();
}
// 当前 AsyncTimeout 的剩余时间
long remainingNanos = node.remainingNanos(now);
for (AsyncTimeout prev = head; true; prev = prev.next) {
// 找到插入位置(如果某节点的下一个节点为 null或下一节点的剩余时间更长,就插入到该节点后面)
if (prev.next == null || remainingNanos < prev.next.remainingNanos(now)) {
node.next = prev.next;
prev.next = node;
// 如果插入到了最前面
if (prev == head) {
AsyncTimeout.class.notify(); // Wake up the watchdog when inserting at the front.
}
break;
}
}
}
该方法中,如果还未开启 Watchdog 线程,先开启 Watchdog 线程。然后设定超时时间以及将当前 AsyncTimeout 插入到链表的合适位置。
先看一下 Watchdog 线程:
AsyncTimeout#Watchdog
1 | private static final class Watchdog extends Thread { |
可以看到,真正进行超时判断的是 awaitTimeout 方法:
AsyncTimeout#awaitTimeout
1 | static AsyncTimeout awaitTimeout() throws InterruptedException { |
该方法主要是为了得到超时的节点,并将超时的节点移除出链表。
下面回到 sink,继续看 write 方法,在调用完 enter 方法后,会调用 Buffer 的 write 方法进行写入,无论是写入过程抛出了异常,还是最后正常结束,都会在 finally 块中调用 exit(boolean) 方法:
AsyncTimeout#exit
1 | final void exit(boolean throwOnTimeout) throws IOException { |
在该方法中,如果发现超时的话,就会抛出异常,没有超时的话,就将 AsyncTimeout 移除出单链表。
小结
AsyncTimeout 用于异步计时,采取异步是为了应对某些读写时会发生阻塞的情况,例如对 Socket 进行写操作时,Socket 经常会阻塞自己,无法同步计时,只能采取异步方式。
在开始读写操作前,会给 AsyncTimeout 设置超时时间,并将其加入到一个单链表中,单链表中的节点按照剩余时间由短到长排列。在另一个线程里,会对这条链表进行监控,找到超时的节点,找到后回调给外界,外界就可以进行相应的回收处理,例如关闭 Socket。在读写操作完成后,如果没有发生超时,就将该 AsyncTimeout 从单链表中删除;如果发生超时,就会抛出异常。
okio 的优点
- 使用方便
- 不再区分字节流和字符流,只有 Source 和 Sink,分别对应输入流和输出流。
- Buffer 实现了 BufferedSource 和 BufferedSink,同时具有读和写的功能,提供了读写所需的一切 API。
- 提供了 ByteString,可以很方便地对二进制数据进行各种操作和进行各种值的转换。
- 速度快
- okio 对 Buffer 存储的数据进行了分块处理,每个 Segment 用一个双向循环链表连接起来,这样在进行 IO 操作时就可以以块为单位进行操作,提高吞吐量。
- 由于 Buffer 是以 Segment 链表来管理元素,所以在 Buffer 间要转移元素的时候,并不用进行数组元素的拷贝,只要改变链表头指针的所有者即可。
- ByteString 会保留一份对原来 String 的引用,这样当你把 UTF-8 的 String 转换为 ByteString 后,下次再要解码出 String 时就直接返回之前的 String,速度非常快。
- 提供超时机制
- 内部会根据不同情况进行同步计时或异步计时,超时后会抛出异常,不会阻塞线程。而且不仅在 IO 操作有超时判断,在 flush、close 等方法也有超时判断。
- 内存复用
- okio 有一个 SegmentPool,可以用来回收和获取空闲 Segment。在高频率通信时,可以有效避免频繁的 GC。