1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.nio;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.channel.Channel;
21 import io.netty.channel.ChannelConfig;
22 import io.netty.channel.ChannelFuture;
23 import io.netty.channel.ChannelMetadata;
24 import io.netty.channel.ChannelOutboundBuffer;
25 import io.netty.channel.ChannelPipeline;
26 import io.netty.channel.FileRegion;
27 import io.netty.channel.IoRegistration;
28 import io.netty.channel.RecvByteBufAllocator;
29 import io.netty.channel.internal.ChannelUtils;
30 import io.netty.channel.socket.ChannelInputShutdownEvent;
31 import io.netty.channel.socket.ChannelInputShutdownReadComplete;
32 import io.netty.channel.socket.SocketChannelConfig;
33 import io.netty.util.LeakPresenceDetector;
34 import io.netty.util.internal.StringUtil;
35
36 import java.io.IOException;
37 import java.nio.channels.SelectableChannel;
38 import java.nio.channels.SelectionKey;
39
40 import static io.netty.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL;
41 import static io.netty.util.internal.StringUtil.className;
42
43
44
45
46 public abstract class AbstractNioByteChannel extends AbstractNioChannel {
47 private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
48 private static final String EXPECTED_TYPES =
49 " (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " +
50 StringUtil.simpleClassName(FileRegion.class) + ')';
51
52 private final Runnable flushTask = new Runnable() {
53 @Override
54 public void run() {
55
56
57 ((AbstractNioUnsafe) unsafe()).flush0();
58 }
59 };
60 private boolean inputClosedSeenErrorOnRead;
61
62
63
64
65
66
67
68 protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
69 super(parent, ch, SelectionKey.OP_READ);
70 }
71
72
73
74
75 protected abstract ChannelFuture shutdownInput();
76
77 protected boolean isInputShutdown0() {
78 return false;
79 }
80
81 @Override
82 protected AbstractNioUnsafe newUnsafe() {
83 return new NioByteUnsafe();
84 }
85
86 @Override
87 public ChannelMetadata metadata() {
88 return METADATA;
89 }
90
91 final boolean shouldBreakReadReady(ChannelConfig config) {
92 return isInputShutdown0() && (inputClosedSeenErrorOnRead || !isAllowHalfClosure(config));
93 }
94
95 private static boolean isAllowHalfClosure(ChannelConfig config) {
96 return config instanceof SocketChannelConfig &&
97 ((SocketChannelConfig) config).isAllowHalfClosure();
98 }
99
100 protected class NioByteUnsafe extends AbstractNioUnsafe {
101
102 private void closeOnRead(ChannelPipeline pipeline) {
103 if (!isInputShutdown0()) {
104 if (isAllowHalfClosure(config())) {
105 shutdownInput();
106 pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
107 } else {
108 close(voidPromise());
109 }
110 } else if (!inputClosedSeenErrorOnRead) {
111 inputClosedSeenErrorOnRead = true;
112 pipeline.fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
113 }
114 }
115
116 private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close,
117 RecvByteBufAllocator.Handle allocHandle) {
118 if (byteBuf != null) {
119 if (byteBuf.isReadable()) {
120 readPending = false;
121 try {
122 pipeline.fireChannelRead(byteBuf);
123 } catch (Exception e) {
124 cause.addSuppressed(e);
125 }
126 } else {
127 byteBuf.release();
128 }
129 }
130 allocHandle.readComplete();
131 pipeline.fireChannelReadComplete();
132 pipeline.fireExceptionCaught(cause);
133
134
135
136 if (close ||
137 cause instanceof OutOfMemoryError ||
138 cause instanceof LeakPresenceDetector.AllocationProhibitedException ||
139 cause instanceof IOException) {
140 closeOnRead(pipeline);
141 }
142 }
143
144 @Override
145 public final void read() {
146 final ChannelConfig config = config();
147 if (shouldBreakReadReady(config)) {
148 clearReadPending();
149 return;
150 }
151 final ChannelPipeline pipeline = pipeline();
152 final ByteBufAllocator allocator = config.getAllocator();
153 final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
154 allocHandle.reset(config);
155
156 ByteBuf byteBuf = null;
157 boolean close = false;
158 try {
159 do {
160 byteBuf = allocHandle.allocate(allocator);
161 allocHandle.lastBytesRead(doReadBytes(byteBuf));
162 if (allocHandle.lastBytesRead() <= 0) {
163
164 byteBuf.release();
165 byteBuf = null;
166 close = allocHandle.lastBytesRead() < 0;
167 if (close) {
168
169 readPending = false;
170 }
171 break;
172 }
173
174 allocHandle.incMessagesRead(1);
175 readPending = false;
176 pipeline.fireChannelRead(byteBuf);
177 byteBuf = null;
178 } while (allocHandle.continueReading());
179
180 allocHandle.readComplete();
181 pipeline.fireChannelReadComplete();
182
183 if (close) {
184 closeOnRead(pipeline);
185 }
186 } catch (Throwable t) {
187 handleReadException(pipeline, byteBuf, t, close, allocHandle);
188 } finally {
189
190
191
192
193
194
195 if (!readPending && !config.isAutoRead()) {
196 removeReadOp();
197 }
198 }
199 }
200 }
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216 protected final int doWrite0(ChannelOutboundBuffer in) throws Exception {
217 Object msg = in.current();
218 if (msg == null) {
219
220 return 0;
221 }
222 return doWriteInternal(in, in.current());
223 }
224
225 private int doWriteInternal(ChannelOutboundBuffer in, Object msg) throws Exception {
226 if (msg instanceof ByteBuf) {
227 ByteBuf buf = (ByteBuf) msg;
228 if (!buf.isReadable()) {
229 in.remove();
230 return 0;
231 }
232
233 final int localFlushedAmount = doWriteBytes(buf);
234 if (localFlushedAmount > 0) {
235 in.progress(localFlushedAmount);
236 if (!buf.isReadable()) {
237 in.remove();
238 }
239 return 1;
240 }
241 } else if (msg instanceof FileRegion) {
242 FileRegion region = (FileRegion) msg;
243 if (region.transferred() >= region.count()) {
244 in.remove();
245 return 0;
246 }
247
248 long localFlushedAmount = doWriteFileRegion(region);
249 if (localFlushedAmount > 0) {
250 in.progress(localFlushedAmount);
251 if (region.transferred() >= region.count()) {
252 in.remove();
253 }
254 return 1;
255 }
256 } else {
257
258 throw new Error("Unexpected message type: " + className(msg));
259 }
260 return WRITE_STATUS_SNDBUF_FULL;
261 }
262
263 @Override
264 protected void doWrite(ChannelOutboundBuffer in) throws Exception {
265 int writeSpinCount = config().getWriteSpinCount();
266 do {
267 Object msg = in.current();
268 if (msg == null) {
269
270 clearOpWrite();
271
272 return;
273 }
274 writeSpinCount -= doWriteInternal(in, msg);
275 } while (writeSpinCount > 0);
276
277 incompleteWrite(writeSpinCount < 0);
278 }
279
280 @Override
281 protected final Object filterOutboundMessage(Object msg) {
282 if (msg instanceof ByteBuf) {
283 ByteBuf buf = (ByteBuf) msg;
284 if (buf.isDirect()) {
285 return msg;
286 }
287
288 return newDirectBuffer(buf);
289 }
290
291 if (msg instanceof FileRegion) {
292 return msg;
293 }
294
295 throw new UnsupportedOperationException(
296 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
297 }
298
299 protected final void incompleteWrite(boolean setOpWrite) {
300
301 if (setOpWrite) {
302 setOpWrite();
303 } else {
304
305
306
307
308 clearOpWrite();
309
310
311 eventLoop().execute(flushTask);
312 }
313 }
314
315
316
317
318
319
320
321 protected abstract long doWriteFileRegion(FileRegion region) throws Exception;
322
323
324
325
326 protected abstract int doReadBytes(ByteBuf buf) throws Exception;
327
328
329
330
331
332
333 protected abstract int doWriteBytes(ByteBuf buf) throws Exception;
334
335 protected final void setOpWrite() {
336 final IoRegistration registration = registration();
337
338
339
340 if (!registration.isValid()) {
341 return;
342 }
343
344 addAndSubmit(NioIoOps.WRITE);
345 }
346
347 protected final void clearOpWrite() {
348 final IoRegistration registration = registration();
349
350
351
352 if (!registration.isValid()) {
353 return;
354 }
355 removeAndSubmit(NioIoOps.WRITE);
356 }
357 }