1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.oio;
17
18 import io.netty.channel.Channel;
19 import io.netty.channel.ChannelConfig;
20 import io.netty.channel.ChannelPipeline;
21
22 import java.io.IOException;
23 import java.util.ArrayList;
24 import java.util.List;
25
26
27
28
29 public abstract class AbstractOioMessageChannel extends AbstractOioChannel {
30
31 private final List<Object> readBuf = new ArrayList<Object>();
32
33 protected AbstractOioMessageChannel(Channel parent) {
34 super(parent);
35 }
36
37 @Override
38 protected void doRead() {
39 final ChannelConfig config = config();
40 final ChannelPipeline pipeline = pipeline();
41 boolean closed = false;
42 final int maxMessagesPerRead = config.getMaxMessagesPerRead();
43
44 Throwable exception = null;
45 int localRead = 0;
46 int totalRead = 0;
47 try {
48 for (;;) {
49
50 localRead = doReadMessages(readBuf);
51 if (localRead == 0) {
52 break;
53 }
54 if (localRead < 0) {
55 closed = true;
56 break;
57 }
58
59
60 int size = readBuf.size();
61 for (int i = 0; i < size; i ++) {
62 pipeline.fireChannelRead(readBuf.get(i));
63 }
64 readBuf.clear();
65
66
67
68 totalRead += localRead;
69 if (totalRead >= maxMessagesPerRead || !config.isAutoRead()) {
70 break;
71 }
72 }
73 } catch (Throwable t) {
74 exception = t;
75 }
76
77 pipeline.fireChannelReadComplete();
78
79 if (exception != null) {
80 if (exception instanceof IOException) {
81 closed = true;
82 }
83
84 pipeline().fireExceptionCaught(exception);
85 }
86
87 if (closed) {
88 if (isOpen()) {
89 unsafe().close(unsafe().voidPromise());
90 }
91 } else if (localRead == 0 && isActive()) {
92
93
94
95
96
97
98 read();
99 }
100 }
101
102
103
104
105 protected abstract int doReadMessages(List<Object> msgs) throws Exception;
106 }