View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.nio;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.channel.Channel;
21  import io.netty.channel.ChannelConfig;
22  import io.netty.channel.ChannelFuture;
23  import io.netty.channel.ChannelMetadata;
24  import io.netty.channel.ChannelOutboundBuffer;
25  import io.netty.channel.ChannelPipeline;
26  import io.netty.channel.FileRegion;
27  import io.netty.channel.IoRegistration;
28  import io.netty.channel.RecvByteBufAllocator;
29  import io.netty.channel.internal.ChannelUtils;
30  import io.netty.channel.socket.ChannelInputShutdownEvent;
31  import io.netty.channel.socket.ChannelInputShutdownReadComplete;
32  import io.netty.channel.socket.SocketChannelConfig;
33  import io.netty.util.internal.StringUtil;
34  
35  import java.io.IOException;
36  import java.nio.channels.SelectableChannel;
37  import java.nio.channels.SelectionKey;
38  
39  import static io.netty.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL;
40  import static io.netty.util.internal.StringUtil.className;
41  
42  /**
43   * {@link AbstractNioChannel} base class for {@link Channel}s that operate on bytes.
44   */
45  public abstract class AbstractNioByteChannel extends AbstractNioChannel {
46      private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
47      private static final String EXPECTED_TYPES =
48              " (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " +
49              StringUtil.simpleClassName(FileRegion.class) + ')';
50  
51      private final Runnable flushTask = new Runnable() {
52          @Override
53          public void run() {
54              // Calling flush0 directly to ensure we not try to flush messages that were added via write(...) in the
55              // meantime.
56              ((AbstractNioUnsafe) unsafe()).flush0();
57          }
58      };
59      private boolean inputClosedSeenErrorOnRead;
60  
61      /**
62       * Create a new instance
63       *
64       * @param parent            the parent {@link Channel} by which this instance was created. May be {@code null}
65       * @param ch                the underlying {@link SelectableChannel} on which it operates
66       */
67      protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
68          super(parent, ch, SelectionKey.OP_READ);
69      }
70  
71      /**
72       * Shutdown the input side of the channel.
73       */
74      protected abstract ChannelFuture shutdownInput();
75  
76      protected boolean isInputShutdown0() {
77          return false;
78      }
79  
80      @Override
81      protected AbstractNioUnsafe newUnsafe() {
82          return new NioByteUnsafe();
83      }
84  
85      @Override
86      public ChannelMetadata metadata() {
87          return METADATA;
88      }
89  
90      final boolean shouldBreakReadReady(ChannelConfig config) {
91          return isInputShutdown0() && (inputClosedSeenErrorOnRead || !isAllowHalfClosure(config));
92      }
93  
94      private static boolean isAllowHalfClosure(ChannelConfig config) {
95          return config instanceof SocketChannelConfig &&
96                  ((SocketChannelConfig) config).isAllowHalfClosure();
97      }
98  
99      protected class NioByteUnsafe extends AbstractNioUnsafe {
100 
101         private void closeOnRead(ChannelPipeline pipeline) {
102             if (!isInputShutdown0()) {
103                 if (isAllowHalfClosure(config())) {
104                     shutdownInput();
105                     pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
106                 } else {
107                     close(voidPromise());
108                 }
109             } else if (!inputClosedSeenErrorOnRead) {
110                 inputClosedSeenErrorOnRead = true;
111                 pipeline.fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
112             }
113         }
114 
115         private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close,
116                 RecvByteBufAllocator.Handle allocHandle) {
117             if (byteBuf != null) {
118                 if (byteBuf.isReadable()) {
119                     readPending = false;
120                     pipeline.fireChannelRead(byteBuf);
121                 } else {
122                     byteBuf.release();
123                 }
124             }
125             allocHandle.readComplete();
126             pipeline.fireChannelReadComplete();
127             pipeline.fireExceptionCaught(cause);
128 
129             // If oom will close the read event, release connection.
130             // See https://github.com/netty/netty/issues/10434
131             if (close || cause instanceof OutOfMemoryError || cause instanceof IOException) {
132                 closeOnRead(pipeline);
133             }
134         }
135 
136         @Override
137         public final void read() {
138             final ChannelConfig config = config();
139             if (shouldBreakReadReady(config)) {
140                 clearReadPending();
141                 return;
142             }
143             final ChannelPipeline pipeline = pipeline();
144             final ByteBufAllocator allocator = config.getAllocator();
145             final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
146             allocHandle.reset(config);
147 
148             ByteBuf byteBuf = null;
149             boolean close = false;
150             try {
151                 do {
152                     byteBuf = allocHandle.allocate(allocator);
153                     allocHandle.lastBytesRead(doReadBytes(byteBuf));
154                     if (allocHandle.lastBytesRead() <= 0) {
155                         // nothing was read. release the buffer.
156                         byteBuf.release();
157                         byteBuf = null;
158                         close = allocHandle.lastBytesRead() < 0;
159                         if (close) {
160                             // There is nothing left to read as we received an EOF.
161                             readPending = false;
162                         }
163                         break;
164                     }
165 
166                     allocHandle.incMessagesRead(1);
167                     readPending = false;
168                     pipeline.fireChannelRead(byteBuf);
169                     byteBuf = null;
170                 } while (allocHandle.continueReading());
171 
172                 allocHandle.readComplete();
173                 pipeline.fireChannelReadComplete();
174 
175                 if (close) {
176                     closeOnRead(pipeline);
177                 }
178             } catch (Throwable t) {
179                 handleReadException(pipeline, byteBuf, t, close, allocHandle);
180             } finally {
181                 // Check if there is a readPending which was not processed yet.
182                 // This could be for two reasons:
183                 // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
184                 // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
185                 //
186                 // See https://github.com/netty/netty/issues/2254
187                 if (!readPending && !config.isAutoRead()) {
188                     removeReadOp();
189                 }
190             }
191         }
192     }
193 
194     /**
195      * Write objects to the OS.
196      * @param in the collection which contains objects to write.
197      * @return The value that should be decremented from the write quantum which starts at
198      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
199      * <ul>
200      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
201      *     is encountered</li>
202      *     <li>1 - if a single call to write data was made to the OS</li>
203      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but no
204      *     data was accepted</li>
205      * </ul>
206      * @throws Exception if an I/O exception occurs during write.
207      */
208     protected final int doWrite0(ChannelOutboundBuffer in) throws Exception {
209         Object msg = in.current();
210         if (msg == null) {
211             // Directly return here so incompleteWrite(...) is not called.
212             return 0;
213         }
214         return doWriteInternal(in, in.current());
215     }
216 
217     private int doWriteInternal(ChannelOutboundBuffer in, Object msg) throws Exception {
218         if (msg instanceof ByteBuf) {
219             ByteBuf buf = (ByteBuf) msg;
220             if (!buf.isReadable()) {
221                 in.remove();
222                 return 0;
223             }
224 
225             final int localFlushedAmount = doWriteBytes(buf);
226             if (localFlushedAmount > 0) {
227                 in.progress(localFlushedAmount);
228                 if (!buf.isReadable()) {
229                     in.remove();
230                 }
231                 return 1;
232             }
233         } else if (msg instanceof FileRegion) {
234             FileRegion region = (FileRegion) msg;
235             if (region.transferred() >= region.count()) {
236                 in.remove();
237                 return 0;
238             }
239 
240             long localFlushedAmount = doWriteFileRegion(region);
241             if (localFlushedAmount > 0) {
242                 in.progress(localFlushedAmount);
243                 if (region.transferred() >= region.count()) {
244                     in.remove();
245                 }
246                 return 1;
247             }
248         } else {
249             // Should not reach here.
250             throw new Error("Unexpected message type: " + className(msg));
251         }
252         return WRITE_STATUS_SNDBUF_FULL;
253     }
254 
255     @Override
256     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
257         int writeSpinCount = config().getWriteSpinCount();
258         do {
259             Object msg = in.current();
260             if (msg == null) {
261                 // Wrote all messages.
262                 clearOpWrite();
263                 // Directly return here so incompleteWrite(...) is not called.
264                 return;
265             }
266             writeSpinCount -= doWriteInternal(in, msg);
267         } while (writeSpinCount > 0);
268 
269         incompleteWrite(writeSpinCount < 0);
270     }
271 
272     @Override
273     protected final Object filterOutboundMessage(Object msg) {
274         if (msg instanceof ByteBuf) {
275             ByteBuf buf = (ByteBuf) msg;
276             if (buf.isDirect()) {
277                 return msg;
278             }
279 
280             return newDirectBuffer(buf);
281         }
282 
283         if (msg instanceof FileRegion) {
284             return msg;
285         }
286 
287         throw new UnsupportedOperationException(
288                 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
289     }
290 
291     protected final void incompleteWrite(boolean setOpWrite) {
292         // Did not write completely.
293         if (setOpWrite) {
294             setOpWrite();
295         } else {
296             // It is possible that we have set the write OP, woken up by NIO because the socket is writable, and then
297             // use our write quantum. In this case we no longer want to set the write OP because the socket is still
298             // writable (as far as we know). We will find out next time we attempt to write if the socket is writable
299             // and set the write OP if necessary.
300             clearOpWrite();
301 
302             // Schedule flush again later so other tasks can be picked up in the meantime
303             eventLoop().execute(flushTask);
304         }
305     }
306 
307     /**
308      * Write a {@link FileRegion}
309      *
310      * @param region        the {@link FileRegion} from which the bytes should be written
311      * @return amount       the amount of written bytes
312      */
313     protected abstract long doWriteFileRegion(FileRegion region) throws Exception;
314 
315     /**
316      * Read bytes into the given {@link ByteBuf} and return the amount.
317      */
318     protected abstract int doReadBytes(ByteBuf buf) throws Exception;
319 
320     /**
321      * Write bytes form the given {@link ByteBuf} to the underlying {@link java.nio.channels.Channel}.
322      * @param buf           the {@link ByteBuf} from which the bytes should be written
323      * @return amount       the amount of written bytes
324      */
325     protected abstract int doWriteBytes(ByteBuf buf) throws Exception;
326 
327     protected final void setOpWrite() {
328         final IoRegistration registration = registration();
329         // Check first if the key is still valid as it may be canceled as part of the deregistration
330         // from the EventLoop
331         // See https://github.com/netty/netty/issues/2104
332         if (!registration.isValid()) {
333             return;
334         }
335 
336         addAndSubmit(NioIoOps.WRITE);
337     }
338 
339     protected final void clearOpWrite() {
340         final IoRegistration registration = registration();
341         // Check first if the key is still valid as it may be canceled as part of the deregistration
342         // from the EventLoop
343         // See https://github.com/netty/netty/issues/2104
344         if (!registration.isValid()) {
345             return;
346         }
347         removeAndSubmit(NioIoOps.WRITE);
348     }
349 }