1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.nio;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.channel.Channel;
21 import io.netty.channel.ChannelConfig;
22 import io.netty.channel.ChannelFuture;
23 import io.netty.channel.ChannelMetadata;
24 import io.netty.channel.ChannelOutboundBuffer;
25 import io.netty.channel.ChannelPipeline;
26 import io.netty.channel.FileRegion;
27 import io.netty.channel.IoRegistration;
28 import io.netty.channel.RecvByteBufAllocator;
29 import io.netty.channel.internal.ChannelUtils;
30 import io.netty.channel.socket.ChannelInputShutdownEvent;
31 import io.netty.channel.socket.ChannelInputShutdownReadComplete;
32 import io.netty.channel.socket.SocketChannelConfig;
33 import io.netty.util.internal.StringUtil;
34
35 import java.io.IOException;
36 import java.nio.channels.SelectableChannel;
37 import java.nio.channels.SelectionKey;
38
39 import static io.netty.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL;
40
41
42
43
44 public abstract class AbstractNioByteChannel extends AbstractNioChannel {
45 private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
46 private static final String EXPECTED_TYPES =
47 " (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " +
48 StringUtil.simpleClassName(FileRegion.class) + ')';
49
50 private final Runnable flushTask = new Runnable() {
51 @Override
52 public void run() {
53
54
55 ((AbstractNioUnsafe) unsafe()).flush0();
56 }
57 };
58 private boolean inputClosedSeenErrorOnRead;
59
60
61
62
63
64
65
66 protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
67 super(parent, ch, SelectionKey.OP_READ);
68 }
69
70
71
72
73 protected abstract ChannelFuture shutdownInput();
74
75 protected boolean isInputShutdown0() {
76 return false;
77 }
78
79 @Override
80 protected AbstractNioUnsafe newUnsafe() {
81 return new NioByteUnsafe();
82 }
83
84 @Override
85 public ChannelMetadata metadata() {
86 return METADATA;
87 }
88
89 final boolean shouldBreakReadReady(ChannelConfig config) {
90 return isInputShutdown0() && (inputClosedSeenErrorOnRead || !isAllowHalfClosure(config));
91 }
92
93 private static boolean isAllowHalfClosure(ChannelConfig config) {
94 return config instanceof SocketChannelConfig &&
95 ((SocketChannelConfig) config).isAllowHalfClosure();
96 }
97
98 protected class NioByteUnsafe extends AbstractNioUnsafe {
99
100 private void closeOnRead(ChannelPipeline pipeline) {
101 if (!isInputShutdown0()) {
102 if (isAllowHalfClosure(config())) {
103 shutdownInput();
104 pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
105 } else {
106 close(voidPromise());
107 }
108 } else if (!inputClosedSeenErrorOnRead) {
109 inputClosedSeenErrorOnRead = true;
110 pipeline.fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
111 }
112 }
113
114 private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close,
115 RecvByteBufAllocator.Handle allocHandle) {
116 if (byteBuf != null) {
117 if (byteBuf.isReadable()) {
118 readPending = false;
119 pipeline.fireChannelRead(byteBuf);
120 } else {
121 byteBuf.release();
122 }
123 }
124 allocHandle.readComplete();
125 pipeline.fireChannelReadComplete();
126 pipeline.fireExceptionCaught(cause);
127
128
129
130 if (close || cause instanceof OutOfMemoryError || cause instanceof IOException) {
131 closeOnRead(pipeline);
132 }
133 }
134
135 @Override
136 public final void read() {
137 final ChannelConfig config = config();
138 if (shouldBreakReadReady(config)) {
139 clearReadPending();
140 return;
141 }
142 final ChannelPipeline pipeline = pipeline();
143 final ByteBufAllocator allocator = config.getAllocator();
144 final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
145 allocHandle.reset(config);
146
147 ByteBuf byteBuf = null;
148 boolean close = false;
149 try {
150 do {
151 byteBuf = allocHandle.allocate(allocator);
152 allocHandle.lastBytesRead(doReadBytes(byteBuf));
153 if (allocHandle.lastBytesRead() <= 0) {
154
155 byteBuf.release();
156 byteBuf = null;
157 close = allocHandle.lastBytesRead() < 0;
158 if (close) {
159
160 readPending = false;
161 }
162 break;
163 }
164
165 allocHandle.incMessagesRead(1);
166 readPending = false;
167 pipeline.fireChannelRead(byteBuf);
168 byteBuf = null;
169 } while (allocHandle.continueReading());
170
171 allocHandle.readComplete();
172 pipeline.fireChannelReadComplete();
173
174 if (close) {
175 closeOnRead(pipeline);
176 }
177 } catch (Throwable t) {
178 handleReadException(pipeline, byteBuf, t, close, allocHandle);
179 } finally {
180
181
182
183
184
185
186 if (!readPending && !config.isAutoRead()) {
187 removeReadOp();
188 }
189 }
190 }
191 }
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207 protected final int doWrite0(ChannelOutboundBuffer in) throws Exception {
208 Object msg = in.current();
209 if (msg == null) {
210
211 return 0;
212 }
213 return doWriteInternal(in, in.current());
214 }
215
216 private int doWriteInternal(ChannelOutboundBuffer in, Object msg) throws Exception {
217 if (msg instanceof ByteBuf) {
218 ByteBuf buf = (ByteBuf) msg;
219 if (!buf.isReadable()) {
220 in.remove();
221 return 0;
222 }
223
224 final int localFlushedAmount = doWriteBytes(buf);
225 if (localFlushedAmount > 0) {
226 in.progress(localFlushedAmount);
227 if (!buf.isReadable()) {
228 in.remove();
229 }
230 return 1;
231 }
232 } else if (msg instanceof FileRegion) {
233 FileRegion region = (FileRegion) msg;
234 if (region.transferred() >= region.count()) {
235 in.remove();
236 return 0;
237 }
238
239 long localFlushedAmount = doWriteFileRegion(region);
240 if (localFlushedAmount > 0) {
241 in.progress(localFlushedAmount);
242 if (region.transferred() >= region.count()) {
243 in.remove();
244 }
245 return 1;
246 }
247 } else {
248
249 throw new Error();
250 }
251 return WRITE_STATUS_SNDBUF_FULL;
252 }
253
254 @Override
255 protected void doWrite(ChannelOutboundBuffer in) throws Exception {
256 int writeSpinCount = config().getWriteSpinCount();
257 do {
258 Object msg = in.current();
259 if (msg == null) {
260
261 clearOpWrite();
262
263 return;
264 }
265 writeSpinCount -= doWriteInternal(in, msg);
266 } while (writeSpinCount > 0);
267
268 incompleteWrite(writeSpinCount < 0);
269 }
270
271 @Override
272 protected final Object filterOutboundMessage(Object msg) {
273 if (msg instanceof ByteBuf) {
274 ByteBuf buf = (ByteBuf) msg;
275 if (buf.isDirect()) {
276 return msg;
277 }
278
279 return newDirectBuffer(buf);
280 }
281
282 if (msg instanceof FileRegion) {
283 return msg;
284 }
285
286 throw new UnsupportedOperationException(
287 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
288 }
289
290 protected final void incompleteWrite(boolean setOpWrite) {
291
292 if (setOpWrite) {
293 setOpWrite();
294 } else {
295
296
297
298
299 clearOpWrite();
300
301
302 eventLoop().execute(flushTask);
303 }
304 }
305
306
307
308
309
310
311
312 protected abstract long doWriteFileRegion(FileRegion region) throws Exception;
313
314
315
316
317 protected abstract int doReadBytes(ByteBuf buf) throws Exception;
318
319
320
321
322
323
324 protected abstract int doWriteBytes(ByteBuf buf) throws Exception;
325
326 protected final void setOpWrite() {
327 final IoRegistration registration = registration();
328
329
330
331 if (!registration.isValid()) {
332 return;
333 }
334
335 addAndSubmit(NioIoOps.WRITE);
336 }
337
338 protected final void clearOpWrite() {
339 final IoRegistration registration = registration();
340
341
342
343 if (!registration.isValid()) {
344 return;
345 }
346 removeAndSubmit(NioIoOps.WRITE);
347 }
348 }