1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.nio;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.channel.Channel;
21 import io.netty.channel.ChannelConfig;
22 import io.netty.channel.ChannelFuture;
23 import io.netty.channel.ChannelMetadata;
24 import io.netty.channel.ChannelOutboundBuffer;
25 import io.netty.channel.ChannelPipeline;
26 import io.netty.channel.FileRegion;
27 import io.netty.channel.RecvByteBufAllocator;
28 import io.netty.channel.internal.ChannelUtils;
29 import io.netty.channel.socket.ChannelInputShutdownEvent;
30 import io.netty.channel.socket.ChannelInputShutdownReadComplete;
31 import io.netty.channel.socket.SocketChannelConfig;
32 import io.netty.util.internal.StringUtil;
33 import io.netty.util.internal.ThrowableUtil;
34
35 import java.io.IOException;
36 import java.nio.channels.SelectableChannel;
37 import java.nio.channels.SelectionKey;
38
39 import static io.netty.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL;
40
41
42
43
44 public abstract class AbstractNioByteChannel extends AbstractNioChannel {
45 private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
46 private static final String EXPECTED_TYPES =
47 " (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " +
48 StringUtil.simpleClassName(FileRegion.class) + ')';
49
50 private final Runnable flushTask = new Runnable() {
51 @Override
52 public void run() {
53
54
55 ((AbstractNioUnsafe) unsafe()).flush0();
56 }
57 };
58 private boolean inputClosedSeenErrorOnRead;
59
60
61
62
63
64
65
66 protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
67 super(parent, ch, SelectionKey.OP_READ);
68 }
69
70
71
72
73 protected abstract ChannelFuture shutdownInput();
74
75 protected boolean isInputShutdown0() {
76 return false;
77 }
78
79 @Override
80 protected AbstractNioUnsafe newUnsafe() {
81 return new NioByteUnsafe();
82 }
83
84 @Override
85 public ChannelMetadata metadata() {
86 return METADATA;
87 }
88
89 final boolean shouldBreakReadReady(ChannelConfig config) {
90 return isInputShutdown0() && (inputClosedSeenErrorOnRead || !isAllowHalfClosure(config));
91 }
92
93 private static boolean isAllowHalfClosure(ChannelConfig config) {
94 return config instanceof SocketChannelConfig &&
95 ((SocketChannelConfig) config).isAllowHalfClosure();
96 }
97
98 protected class NioByteUnsafe extends AbstractNioUnsafe {
99
100 private void closeOnRead(ChannelPipeline pipeline) {
101 if (!isInputShutdown0()) {
102 if (isAllowHalfClosure(config())) {
103 shutdownInput();
104 pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
105 } else {
106 close(voidPromise());
107 }
108 } else if (!inputClosedSeenErrorOnRead) {
109 inputClosedSeenErrorOnRead = true;
110 pipeline.fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
111 }
112 }
113
114 private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close,
115 RecvByteBufAllocator.Handle allocHandle) {
116 if (byteBuf != null) {
117 if (byteBuf.isReadable()) {
118 readPending = false;
119 try {
120 pipeline.fireChannelRead(byteBuf);
121 } catch (Exception e) {
122 ThrowableUtil.addSuppressed(cause, e);
123 }
124 } else {
125 byteBuf.release();
126 }
127 }
128 allocHandle.readComplete();
129 pipeline.fireChannelReadComplete();
130 pipeline.fireExceptionCaught(cause);
131
132
133
134 if (close || cause instanceof OutOfMemoryError || cause instanceof IOException) {
135 closeOnRead(pipeline);
136 }
137 }
138
139 @Override
140 public final void read() {
141 final ChannelConfig config = config();
142 if (shouldBreakReadReady(config)) {
143 clearReadPending();
144 return;
145 }
146 final ChannelPipeline pipeline = pipeline();
147 final ByteBufAllocator allocator = config.getAllocator();
148 final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
149 allocHandle.reset(config);
150
151 ByteBuf byteBuf = null;
152 boolean close = false;
153 try {
154 do {
155 byteBuf = allocHandle.allocate(allocator);
156 allocHandle.lastBytesRead(doReadBytes(byteBuf));
157 if (allocHandle.lastBytesRead() <= 0) {
158
159 byteBuf.release();
160 byteBuf = null;
161 close = allocHandle.lastBytesRead() < 0;
162 if (close) {
163
164 readPending = false;
165 }
166 break;
167 }
168
169 allocHandle.incMessagesRead(1);
170 readPending = false;
171 pipeline.fireChannelRead(byteBuf);
172 byteBuf = null;
173 } while (allocHandle.continueReading());
174
175 allocHandle.readComplete();
176 pipeline.fireChannelReadComplete();
177
178 if (close) {
179 closeOnRead(pipeline);
180 }
181 } catch (Throwable t) {
182 handleReadException(pipeline, byteBuf, t, close, allocHandle);
183 } finally {
184
185
186
187
188
189
190 if (!readPending && !config.isAutoRead()) {
191 removeReadOp();
192 }
193 }
194 }
195 }
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211 protected final int doWrite0(ChannelOutboundBuffer in) throws Exception {
212 Object msg = in.current();
213 if (msg == null) {
214
215 return 0;
216 }
217 return doWriteInternal(in, in.current());
218 }
219
220 private int doWriteInternal(ChannelOutboundBuffer in, Object msg) throws Exception {
221 if (msg instanceof ByteBuf) {
222 ByteBuf buf = (ByteBuf) msg;
223 if (!buf.isReadable()) {
224 in.remove();
225 return 0;
226 }
227
228 final int localFlushedAmount = doWriteBytes(buf);
229 if (localFlushedAmount > 0) {
230 in.progress(localFlushedAmount);
231 if (!buf.isReadable()) {
232 in.remove();
233 }
234 return 1;
235 }
236 } else if (msg instanceof FileRegion) {
237 FileRegion region = (FileRegion) msg;
238 if (region.transferred() >= region.count()) {
239 in.remove();
240 return 0;
241 }
242
243 long localFlushedAmount = doWriteFileRegion(region);
244 if (localFlushedAmount > 0) {
245 in.progress(localFlushedAmount);
246 if (region.transferred() >= region.count()) {
247 in.remove();
248 }
249 return 1;
250 }
251 } else {
252
253 throw new Error();
254 }
255 return WRITE_STATUS_SNDBUF_FULL;
256 }
257
258 @Override
259 protected void doWrite(ChannelOutboundBuffer in) throws Exception {
260 int writeSpinCount = config().getWriteSpinCount();
261 do {
262 Object msg = in.current();
263 if (msg == null) {
264
265 clearOpWrite();
266
267 return;
268 }
269 writeSpinCount -= doWriteInternal(in, msg);
270 } while (writeSpinCount > 0);
271
272 incompleteWrite(writeSpinCount < 0);
273 }
274
275 @Override
276 protected final Object filterOutboundMessage(Object msg) {
277 if (msg instanceof ByteBuf) {
278 ByteBuf buf = (ByteBuf) msg;
279 if (buf.isDirect()) {
280 return msg;
281 }
282
283 return newDirectBuffer(buf);
284 }
285
286 if (msg instanceof FileRegion) {
287 return msg;
288 }
289
290 throw new UnsupportedOperationException(
291 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
292 }
293
294 protected final void incompleteWrite(boolean setOpWrite) {
295
296 if (setOpWrite) {
297 setOpWrite();
298 } else {
299
300
301
302
303 clearOpWrite();
304
305
306 eventLoop().execute(flushTask);
307 }
308 }
309
310
311
312
313
314
315
316 protected abstract long doWriteFileRegion(FileRegion region) throws Exception;
317
318
319
320
321 protected abstract int doReadBytes(ByteBuf buf) throws Exception;
322
323
324
325
326
327
328 protected abstract int doWriteBytes(ByteBuf buf) throws Exception;
329
330 protected final void setOpWrite() {
331 final SelectionKey key = selectionKey();
332
333
334
335 if (!key.isValid()) {
336 return;
337 }
338 final int interestOps = key.interestOps();
339 if ((interestOps & SelectionKey.OP_WRITE) == 0) {
340 key.interestOps(interestOps | SelectionKey.OP_WRITE);
341 }
342 }
343
344 protected final void clearOpWrite() {
345 final SelectionKey key = selectionKey();
346
347
348
349 if (!key.isValid()) {
350 return;
351 }
352 final int interestOps = key.interestOps();
353 if ((interestOps & SelectionKey.OP_WRITE) != 0) {
354 key.interestOps(interestOps & ~SelectionKey.OP_WRITE);
355 }
356 }
357 }