1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.nio;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.channel.Channel;
21 import io.netty.channel.ChannelConfig;
22 import io.netty.channel.ChannelFuture;
23 import io.netty.channel.ChannelMetadata;
24 import io.netty.channel.ChannelOutboundBuffer;
25 import io.netty.channel.ChannelPipeline;
26 import io.netty.channel.FileRegion;
27 import io.netty.channel.IoRegistration;
28 import io.netty.channel.RecvByteBufAllocator;
29 import io.netty.channel.internal.ChannelUtils;
30 import io.netty.channel.socket.ChannelInputShutdownEvent;
31 import io.netty.channel.socket.ChannelInputShutdownReadComplete;
32 import io.netty.channel.socket.SocketChannelConfig;
33 import io.netty.util.internal.StringUtil;
34
35 import java.io.IOException;
36 import java.nio.channels.SelectableChannel;
37 import java.nio.channels.SelectionKey;
38
39 import static io.netty.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL;
40 import static io.netty.util.internal.StringUtil.className;
41
42
43
44
45 public abstract class AbstractNioByteChannel extends AbstractNioChannel {
46 private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
47 private static final String EXPECTED_TYPES =
48 " (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " +
49 StringUtil.simpleClassName(FileRegion.class) + ')';
50
51 private final Runnable flushTask = new Runnable() {
52 @Override
53 public void run() {
54
55
56 ((AbstractNioUnsafe) unsafe()).flush0();
57 }
58 };
59 private boolean inputClosedSeenErrorOnRead;
60
61
62
63
64
65
66
67 protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
68 super(parent, ch, SelectionKey.OP_READ);
69 }
70
71
72
73
74 protected abstract ChannelFuture shutdownInput();
75
76 protected boolean isInputShutdown0() {
77 return false;
78 }
79
80 @Override
81 protected AbstractNioUnsafe newUnsafe() {
82 return new NioByteUnsafe();
83 }
84
85 @Override
86 public ChannelMetadata metadata() {
87 return METADATA;
88 }
89
90 final boolean shouldBreakReadReady(ChannelConfig config) {
91 return isInputShutdown0() && (inputClosedSeenErrorOnRead || !isAllowHalfClosure(config));
92 }
93
94 private static boolean isAllowHalfClosure(ChannelConfig config) {
95 return config instanceof SocketChannelConfig &&
96 ((SocketChannelConfig) config).isAllowHalfClosure();
97 }
98
99 protected class NioByteUnsafe extends AbstractNioUnsafe {
100
101 private void closeOnRead(ChannelPipeline pipeline) {
102 if (!isInputShutdown0()) {
103 if (isAllowHalfClosure(config())) {
104 shutdownInput();
105 pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
106 } else {
107 close(voidPromise());
108 }
109 } else if (!inputClosedSeenErrorOnRead) {
110 inputClosedSeenErrorOnRead = true;
111 pipeline.fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
112 }
113 }
114
115 private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close,
116 RecvByteBufAllocator.Handle allocHandle) {
117 if (byteBuf != null) {
118 if (byteBuf.isReadable()) {
119 readPending = false;
120 pipeline.fireChannelRead(byteBuf);
121 } else {
122 byteBuf.release();
123 }
124 }
125 allocHandle.readComplete();
126 pipeline.fireChannelReadComplete();
127 pipeline.fireExceptionCaught(cause);
128
129
130
131 if (close || cause instanceof OutOfMemoryError || cause instanceof IOException) {
132 closeOnRead(pipeline);
133 }
134 }
135
136 @Override
137 public final void read() {
138 final ChannelConfig config = config();
139 if (shouldBreakReadReady(config)) {
140 clearReadPending();
141 return;
142 }
143 final ChannelPipeline pipeline = pipeline();
144 final ByteBufAllocator allocator = config.getAllocator();
145 final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
146 allocHandle.reset(config);
147
148 ByteBuf byteBuf = null;
149 boolean close = false;
150 try {
151 do {
152 byteBuf = allocHandle.allocate(allocator);
153 allocHandle.lastBytesRead(doReadBytes(byteBuf));
154 if (allocHandle.lastBytesRead() <= 0) {
155
156 byteBuf.release();
157 byteBuf = null;
158 close = allocHandle.lastBytesRead() < 0;
159 if (close) {
160
161 readPending = false;
162 }
163 break;
164 }
165
166 allocHandle.incMessagesRead(1);
167 readPending = false;
168 pipeline.fireChannelRead(byteBuf);
169 byteBuf = null;
170 } while (allocHandle.continueReading());
171
172 allocHandle.readComplete();
173 pipeline.fireChannelReadComplete();
174
175 if (close) {
176 closeOnRead(pipeline);
177 }
178 } catch (Throwable t) {
179 handleReadException(pipeline, byteBuf, t, close, allocHandle);
180 } finally {
181
182
183
184
185
186
187 if (!readPending && !config.isAutoRead()) {
188 removeReadOp();
189 }
190 }
191 }
192 }
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208 protected final int doWrite0(ChannelOutboundBuffer in) throws Exception {
209 Object msg = in.current();
210 if (msg == null) {
211
212 return 0;
213 }
214 return doWriteInternal(in, in.current());
215 }
216
217 private int doWriteInternal(ChannelOutboundBuffer in, Object msg) throws Exception {
218 if (msg instanceof ByteBuf) {
219 ByteBuf buf = (ByteBuf) msg;
220 if (!buf.isReadable()) {
221 in.remove();
222 return 0;
223 }
224
225 final int localFlushedAmount = doWriteBytes(buf);
226 if (localFlushedAmount > 0) {
227 in.progress(localFlushedAmount);
228 if (!buf.isReadable()) {
229 in.remove();
230 }
231 return 1;
232 }
233 } else if (msg instanceof FileRegion) {
234 FileRegion region = (FileRegion) msg;
235 if (region.transferred() >= region.count()) {
236 in.remove();
237 return 0;
238 }
239
240 long localFlushedAmount = doWriteFileRegion(region);
241 if (localFlushedAmount > 0) {
242 in.progress(localFlushedAmount);
243 if (region.transferred() >= region.count()) {
244 in.remove();
245 }
246 return 1;
247 }
248 } else {
249
250 throw new Error("Unexpected message type: " + className(msg));
251 }
252 return WRITE_STATUS_SNDBUF_FULL;
253 }
254
255 @Override
256 protected void doWrite(ChannelOutboundBuffer in) throws Exception {
257 int writeSpinCount = config().getWriteSpinCount();
258 do {
259 Object msg = in.current();
260 if (msg == null) {
261
262 clearOpWrite();
263
264 return;
265 }
266 writeSpinCount -= doWriteInternal(in, msg);
267 } while (writeSpinCount > 0);
268
269 incompleteWrite(writeSpinCount < 0);
270 }
271
272 @Override
273 protected final Object filterOutboundMessage(Object msg) {
274 if (msg instanceof ByteBuf) {
275 ByteBuf buf = (ByteBuf) msg;
276 if (buf.isDirect()) {
277 return msg;
278 }
279
280 return newDirectBuffer(buf);
281 }
282
283 if (msg instanceof FileRegion) {
284 return msg;
285 }
286
287 throw new UnsupportedOperationException(
288 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
289 }
290
291 protected final void incompleteWrite(boolean setOpWrite) {
292
293 if (setOpWrite) {
294 setOpWrite();
295 } else {
296
297
298
299
300 clearOpWrite();
301
302
303 eventLoop().execute(flushTask);
304 }
305 }
306
307
308
309
310
311
312
313 protected abstract long doWriteFileRegion(FileRegion region) throws Exception;
314
315
316
317
318 protected abstract int doReadBytes(ByteBuf buf) throws Exception;
319
320
321
322
323
324
325 protected abstract int doWriteBytes(ByteBuf buf) throws Exception;
326
327 protected final void setOpWrite() {
328 final IoRegistration registration = registration();
329
330
331
332 if (!registration.isValid()) {
333 return;
334 }
335
336 addAndSubmit(NioIoOps.WRITE);
337 }
338
339 protected final void clearOpWrite() {
340 final IoRegistration registration = registration();
341
342
343
344 if (!registration.isValid()) {
345 return;
346 }
347 removeAndSubmit(NioIoOps.WRITE);
348 }
349 }