1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.nio;
17
18 import io.netty.channel.Channel;
19 import io.netty.channel.ChannelConfig;
20 import io.netty.channel.ChannelOutboundBuffer;
21 import io.netty.channel.ChannelPipeline;
22 import io.netty.channel.RecvByteBufAllocator;
23 import io.netty.channel.ServerChannel;
24
25 import java.io.IOException;
26 import java.net.PortUnreachableException;
27 import java.nio.channels.SelectableChannel;
28 import java.util.ArrayList;
29 import java.util.List;
30
31
32
33
34 public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
35 boolean inputShutdown;
36
37
38
39
40 protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
41 super(parent, ch, readInterestOp);
42 }
43
44 protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, NioIoOps readOps) {
45 super(parent, ch, readOps);
46 }
47
48 @Override
49 protected AbstractNioUnsafe newUnsafe() {
50 return new NioMessageUnsafe();
51 }
52
53 @Override
54 protected void doBeginRead() throws Exception {
55 if (inputShutdown) {
56 return;
57 }
58 super.doBeginRead();
59 }
60
61 protected boolean continueReading(RecvByteBufAllocator.Handle allocHandle) {
62 return allocHandle.continueReading();
63 }
64
65 private final class NioMessageUnsafe extends AbstractNioUnsafe {
66
67 private final List<Object> readBuf = new ArrayList<Object>();
68
69 @Override
70 public void read() {
71 assert eventLoop().inEventLoop();
72 final ChannelConfig config = config();
73 final ChannelPipeline pipeline = pipeline();
74 final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
75 allocHandle.reset(config);
76
77 boolean closed = false;
78 Throwable exception = null;
79 try {
80 try {
81 do {
82 int localRead = doReadMessages(readBuf);
83 if (localRead == 0) {
84 break;
85 }
86 if (localRead < 0) {
87 closed = true;
88 break;
89 }
90
91 allocHandle.incMessagesRead(localRead);
92 } while (continueReading(allocHandle));
93 } catch (Throwable t) {
94 exception = t;
95 }
96
97 int size = readBuf.size();
98 for (int i = 0; i < size; i ++) {
99 readPending = false;
100 pipeline.fireChannelRead(readBuf.get(i));
101 }
102 readBuf.clear();
103 allocHandle.readComplete();
104 pipeline.fireChannelReadComplete();
105
106 if (exception != null) {
107 closed = closeOnReadError(exception);
108
109 pipeline.fireExceptionCaught(exception);
110 }
111
112 if (closed) {
113 inputShutdown = true;
114 if (isOpen()) {
115 close(voidPromise());
116 }
117 }
118 } finally {
119
120
121
122
123
124
125 if (!readPending && !config.isAutoRead()) {
126 removeReadOp();
127 }
128 }
129 }
130 }
131
132 @Override
133 protected void doWrite(ChannelOutboundBuffer in) throws Exception {
134 int maxMessagesPerWrite = maxMessagesPerWrite();
135 while (maxMessagesPerWrite > 0) {
136 Object msg = in.current();
137 if (msg == null) {
138 break;
139 }
140 try {
141 boolean done = false;
142 for (int i = config().getWriteSpinCount() - 1; i >= 0; i--) {
143 if (doWriteMessage(msg, in)) {
144 done = true;
145 break;
146 }
147 }
148
149 if (done) {
150 maxMessagesPerWrite--;
151 in.remove();
152 } else {
153 break;
154 }
155 } catch (Exception e) {
156 if (continueOnWriteError()) {
157 maxMessagesPerWrite--;
158 in.remove(e);
159 } else {
160 throw e;
161 }
162 }
163 }
164 if (in.isEmpty()) {
165
166 removeAndSubmit(NioIoOps.WRITE);
167 } else {
168
169 addAndSubmit(NioIoOps.WRITE);
170 }
171 }
172
173
174
175
176 protected boolean continueOnWriteError() {
177 return false;
178 }
179
180 protected boolean closeOnReadError(Throwable cause) {
181 if (!isActive()) {
182
183 return true;
184 }
185 if (cause instanceof PortUnreachableException) {
186 return false;
187 }
188 if (cause instanceof IOException) {
189
190
191 return !(this instanceof ServerChannel);
192 }
193 return true;
194 }
195
196
197
198
199 protected abstract int doReadMessages(List<Object> buf) throws Exception;
200
201
202
203
204
205
206 protected abstract boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception;
207 }