1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.oio;
17
18 import io.netty.channel.Channel;
19 import io.netty.channel.ChannelConfig;
20 import io.netty.channel.ChannelPipeline;
21 import io.netty.channel.RecvByteBufAllocator;
22
23 import java.io.IOException;
24 import java.util.ArrayList;
25 import java.util.List;
26
27
28
29
30
31
32 @Deprecated
33 public abstract class AbstractOioMessageChannel extends AbstractOioChannel {
34
35 private final List<Object> readBuf = new ArrayList<Object>();
36
37 protected AbstractOioMessageChannel(Channel parent) {
38 super(parent);
39 }
40
41 @Override
42 protected void doRead() {
43 if (!readPending) {
44
45
46 return;
47 }
48
49
50 readPending = false;
51
52 final ChannelConfig config = config();
53 final ChannelPipeline pipeline = pipeline();
54 final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
55 allocHandle.reset(config);
56
57 boolean closed = false;
58 Throwable exception = null;
59 try {
60 do {
61
62 int localRead = doReadMessages(readBuf);
63 if (localRead == 0) {
64 break;
65 }
66 if (localRead < 0) {
67 closed = true;
68 break;
69 }
70
71 allocHandle.incMessagesRead(localRead);
72 } while (allocHandle.continueReading());
73 } catch (Throwable t) {
74 exception = t;
75 }
76
77 boolean readData = false;
78 int size = readBuf.size();
79 if (size > 0) {
80 readData = true;
81 for (int i = 0; i < size; i++) {
82 readPending = false;
83 pipeline.fireChannelRead(readBuf.get(i));
84 }
85 readBuf.clear();
86 allocHandle.readComplete();
87 pipeline.fireChannelReadComplete();
88 }
89
90 if (exception != null) {
91 if (exception instanceof IOException) {
92 closed = true;
93 }
94
95 pipeline.fireExceptionCaught(exception);
96 }
97
98 if (closed) {
99 if (isOpen()) {
100 unsafe().close(unsafe().voidPromise());
101 }
102 } else if (readPending || config.isAutoRead() || !readData && isActive()) {
103
104
105 read();
106 }
107 }
108
109
110
111
112 protected abstract int doReadMessages(List<Object> msgs) throws Exception;
113 }