View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.nio;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.channel.Channel;
21  import io.netty.channel.ChannelConfig;
22  import io.netty.channel.ChannelFuture;
23  import io.netty.channel.ChannelMetadata;
24  import io.netty.channel.ChannelOutboundBuffer;
25  import io.netty.channel.ChannelPipeline;
26  import io.netty.channel.FileRegion;
27  import io.netty.channel.RecvByteBufAllocator;
28  import io.netty.channel.internal.ChannelUtils;
29  import io.netty.channel.socket.ChannelInputShutdownEvent;
30  import io.netty.channel.socket.ChannelInputShutdownReadComplete;
31  import io.netty.channel.socket.SocketChannelConfig;
32  import io.netty.util.internal.StringUtil;
33  import io.netty.util.internal.ThrowableUtil;
34  
35  import java.io.IOException;
36  import java.nio.channels.SelectableChannel;
37  import java.nio.channels.SelectionKey;
38  
39  import static io.netty.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL;
40  
41  /**
42   * {@link AbstractNioChannel} base class for {@link Channel}s that operate on bytes.
43   */
44  public abstract class AbstractNioByteChannel extends AbstractNioChannel {
45      private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
46      private static final String EXPECTED_TYPES =
47              " (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " +
48              StringUtil.simpleClassName(FileRegion.class) + ')';
49  
50      private final Runnable flushTask = new Runnable() {
51          @Override
52          public void run() {
53              // Calling flush0 directly to ensure we not try to flush messages that were added via write(...) in the
54              // meantime.
55              ((AbstractNioUnsafe) unsafe()).flush0();
56          }
57      };
58      private boolean inputClosedSeenErrorOnRead;
59  
60      /**
61       * Create a new instance
62       *
63       * @param parent            the parent {@link Channel} by which this instance was created. May be {@code null}
64       * @param ch                the underlying {@link SelectableChannel} on which it operates
65       */
66      protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
67          super(parent, ch, SelectionKey.OP_READ);
68      }
69  
70      /**
71       * Shutdown the input side of the channel.
72       */
73      protected abstract ChannelFuture shutdownInput();
74  
75      protected boolean isInputShutdown0() {
76          return false;
77      }
78  
79      @Override
80      protected AbstractNioUnsafe newUnsafe() {
81          return new NioByteUnsafe();
82      }
83  
84      @Override
85      public ChannelMetadata metadata() {
86          return METADATA;
87      }
88  
89      final boolean shouldBreakReadReady(ChannelConfig config) {
90          return isInputShutdown0() && (inputClosedSeenErrorOnRead || !isAllowHalfClosure(config));
91      }
92  
93      private static boolean isAllowHalfClosure(ChannelConfig config) {
94          return config instanceof SocketChannelConfig &&
95                  ((SocketChannelConfig) config).isAllowHalfClosure();
96      }
97  
98      protected class NioByteUnsafe extends AbstractNioUnsafe {
99  
100         private void closeOnRead(ChannelPipeline pipeline) {
101             if (!isInputShutdown0()) {
102                 if (isAllowHalfClosure(config())) {
103                     shutdownInput();
104                     pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
105                 } else {
106                     close(voidPromise());
107                 }
108             } else if (!inputClosedSeenErrorOnRead) {
109                 inputClosedSeenErrorOnRead = true;
110                 pipeline.fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
111             }
112         }
113 
114         private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close,
115                 RecvByteBufAllocator.Handle allocHandle) {
116             if (byteBuf != null) {
117                 if (byteBuf.isReadable()) {
118                     readPending = false;
119                     try {
120                         pipeline.fireChannelRead(byteBuf);
121                     } catch (Exception e) {
122                         ThrowableUtil.addSuppressed(cause, e);
123                     }
124                 } else {
125                     byteBuf.release();
126                 }
127             }
128             allocHandle.readComplete();
129             pipeline.fireChannelReadComplete();
130             pipeline.fireExceptionCaught(cause);
131 
132             // If oom will close the read event, release connection.
133             // See https://github.com/netty/netty/issues/10434
134             if (close || cause instanceof OutOfMemoryError || cause instanceof IOException) {
135                 closeOnRead(pipeline);
136             }
137         }
138 
139         @Override
140         public final void read() {
141             final ChannelConfig config = config();
142             if (shouldBreakReadReady(config)) {
143                 clearReadPending();
144                 return;
145             }
146             final ChannelPipeline pipeline = pipeline();
147             final ByteBufAllocator allocator = config.getAllocator();
148             final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
149             allocHandle.reset(config);
150 
151             ByteBuf byteBuf = null;
152             boolean close = false;
153             try {
154                 do {
155                     byteBuf = allocHandle.allocate(allocator);
156                     allocHandle.lastBytesRead(doReadBytes(byteBuf));
157                     if (allocHandle.lastBytesRead() <= 0) {
158                         // nothing was read. release the buffer.
159                         byteBuf.release();
160                         byteBuf = null;
161                         close = allocHandle.lastBytesRead() < 0;
162                         if (close) {
163                             // There is nothing left to read as we received an EOF.
164                             readPending = false;
165                         }
166                         break;
167                     }
168 
169                     allocHandle.incMessagesRead(1);
170                     readPending = false;
171                     pipeline.fireChannelRead(byteBuf);
172                     byteBuf = null;
173                 } while (allocHandle.continueReading());
174 
175                 allocHandle.readComplete();
176                 pipeline.fireChannelReadComplete();
177 
178                 if (close) {
179                     closeOnRead(pipeline);
180                 }
181             } catch (Throwable t) {
182                 handleReadException(pipeline, byteBuf, t, close, allocHandle);
183             } finally {
184                 // Check if there is a readPending which was not processed yet.
185                 // This could be for two reasons:
186                 // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
187                 // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
188                 //
189                 // See https://github.com/netty/netty/issues/2254
190                 if (!readPending && !config.isAutoRead()) {
191                     removeReadOp();
192                 }
193             }
194         }
195     }
196 
197     /**
198      * Write objects to the OS.
199      * @param in the collection which contains objects to write.
200      * @return The value that should be decremented from the write quantum which starts at
201      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
202      * <ul>
203      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
204      *     is encountered</li>
205      *     <li>1 - if a single call to write data was made to the OS</li>
206      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but no
207      *     data was accepted</li>
208      * </ul>
209      * @throws Exception if an I/O exception occurs during write.
210      */
211     protected final int doWrite0(ChannelOutboundBuffer in) throws Exception {
212         Object msg = in.current();
213         if (msg == null) {
214             // Directly return here so incompleteWrite(...) is not called.
215             return 0;
216         }
217         return doWriteInternal(in, in.current());
218     }
219 
220     private int doWriteInternal(ChannelOutboundBuffer in, Object msg) throws Exception {
221         if (msg instanceof ByteBuf) {
222             ByteBuf buf = (ByteBuf) msg;
223             if (!buf.isReadable()) {
224                 in.remove();
225                 return 0;
226             }
227 
228             final int localFlushedAmount = doWriteBytes(buf);
229             if (localFlushedAmount > 0) {
230                 in.progress(localFlushedAmount);
231                 if (!buf.isReadable()) {
232                     in.remove();
233                 }
234                 return 1;
235             }
236         } else if (msg instanceof FileRegion) {
237             FileRegion region = (FileRegion) msg;
238             if (region.transferred() >= region.count()) {
239                 in.remove();
240                 return 0;
241             }
242 
243             long localFlushedAmount = doWriteFileRegion(region);
244             if (localFlushedAmount > 0) {
245                 in.progress(localFlushedAmount);
246                 if (region.transferred() >= region.count()) {
247                     in.remove();
248                 }
249                 return 1;
250             }
251         } else {
252             // Should not reach here.
253             throw new Error();
254         }
255         return WRITE_STATUS_SNDBUF_FULL;
256     }
257 
258     @Override
259     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
260         int writeSpinCount = config().getWriteSpinCount();
261         do {
262             Object msg = in.current();
263             if (msg == null) {
264                 // Wrote all messages.
265                 clearOpWrite();
266                 // Directly return here so incompleteWrite(...) is not called.
267                 return;
268             }
269             writeSpinCount -= doWriteInternal(in, msg);
270         } while (writeSpinCount > 0);
271 
272         incompleteWrite(writeSpinCount < 0);
273     }
274 
275     @Override
276     protected final Object filterOutboundMessage(Object msg) {
277         if (msg instanceof ByteBuf) {
278             ByteBuf buf = (ByteBuf) msg;
279             if (buf.isDirect()) {
280                 return msg;
281             }
282 
283             return newDirectBuffer(buf);
284         }
285 
286         if (msg instanceof FileRegion) {
287             return msg;
288         }
289 
290         throw new UnsupportedOperationException(
291                 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
292     }
293 
294     protected final void incompleteWrite(boolean setOpWrite) {
295         // Did not write completely.
296         if (setOpWrite) {
297             setOpWrite();
298         } else {
299             // It is possible that we have set the write OP, woken up by NIO because the socket is writable, and then
300             // use our write quantum. In this case we no longer want to set the write OP because the socket is still
301             // writable (as far as we know). We will find out next time we attempt to write if the socket is writable
302             // and set the write OP if necessary.
303             clearOpWrite();
304 
305             // Schedule flush again later so other tasks can be picked up in the meantime
306             eventLoop().execute(flushTask);
307         }
308     }
309 
310     /**
311      * Write a {@link FileRegion}
312      *
313      * @param region        the {@link FileRegion} from which the bytes should be written
314      * @return amount       the amount of written bytes
315      */
316     protected abstract long doWriteFileRegion(FileRegion region) throws Exception;
317 
318     /**
319      * Read bytes into the given {@link ByteBuf} and return the amount.
320      */
321     protected abstract int doReadBytes(ByteBuf buf) throws Exception;
322 
323     /**
324      * Write bytes form the given {@link ByteBuf} to the underlying {@link java.nio.channels.Channel}.
325      * @param buf           the {@link ByteBuf} from which the bytes should be written
326      * @return amount       the amount of written bytes
327      */
328     protected abstract int doWriteBytes(ByteBuf buf) throws Exception;
329 
330     protected final void setOpWrite() {
331         final SelectionKey key = selectionKey();
332         // Check first if the key is still valid as it may be canceled as part of the deregistration
333         // from the EventLoop
334         // See https://github.com/netty/netty/issues/2104
335         if (!key.isValid()) {
336             return;
337         }
338         final int interestOps = key.interestOps();
339         if ((interestOps & SelectionKey.OP_WRITE) == 0) {
340             key.interestOps(interestOps | SelectionKey.OP_WRITE);
341         }
342     }
343 
344     protected final void clearOpWrite() {
345         final SelectionKey key = selectionKey();
346         // Check first if the key is still valid as it may be canceled as part of the deregistration
347         // from the EventLoop
348         // See https://github.com/netty/netty/issues/2104
349         if (!key.isValid()) {
350             return;
351         }
352         final int interestOps = key.interestOps();
353         if ((interestOps & SelectionKey.OP_WRITE) != 0) {
354             key.interestOps(interestOps & ~SelectionKey.OP_WRITE);
355         }
356     }
357 }