View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.nio;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.channel.Channel;
21  import io.netty.channel.ChannelConfig;
22  import io.netty.channel.ChannelFuture;
23  import io.netty.channel.ChannelMetadata;
24  import io.netty.channel.ChannelOutboundBuffer;
25  import io.netty.channel.ChannelPipeline;
26  import io.netty.channel.FileRegion;
27  import io.netty.channel.RecvByteBufAllocator;
28  import io.netty.channel.internal.ChannelUtils;
29  import io.netty.channel.socket.ChannelInputShutdownEvent;
30  import io.netty.channel.socket.ChannelInputShutdownReadComplete;
31  import io.netty.channel.socket.SocketChannelConfig;
32  import io.netty.util.internal.StringUtil;
33  
34  import java.io.IOException;
35  import java.nio.channels.SelectableChannel;
36  import java.nio.channels.SelectionKey;
37  
38  import static io.netty.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL;
39  
40  /**
41   * {@link AbstractNioChannel} base class for {@link Channel}s that operate on bytes.
42   */
43  public abstract class AbstractNioByteChannel extends AbstractNioChannel {
44      private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
45      private static final String EXPECTED_TYPES =
46              " (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " +
47              StringUtil.simpleClassName(FileRegion.class) + ')';
48  
49      private final Runnable flushTask = new Runnable() {
50          @Override
51          public void run() {
52              // Calling flush0 directly to ensure we not try to flush messages that were added via write(...) in the
53              // meantime.
54              ((AbstractNioUnsafe) unsafe()).flush0();
55          }
56      };
57      private boolean inputClosedSeenErrorOnRead;
58  
59      /**
60       * Create a new instance
61       *
62       * @param parent            the parent {@link Channel} by which this instance was created. May be {@code null}
63       * @param ch                the underlying {@link SelectableChannel} on which it operates
64       */
65      protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
66          super(parent, ch, SelectionKey.OP_READ);
67      }
68  
69      /**
70       * Shutdown the input side of the channel.
71       */
72      protected abstract ChannelFuture shutdownInput();
73  
74      protected boolean isInputShutdown0() {
75          return false;
76      }
77  
78      @Override
79      protected AbstractNioUnsafe newUnsafe() {
80          return new NioByteUnsafe();
81      }
82  
83      @Override
84      public ChannelMetadata metadata() {
85          return METADATA;
86      }
87  
88      final boolean shouldBreakReadReady(ChannelConfig config) {
89          return isInputShutdown0() && (inputClosedSeenErrorOnRead || !isAllowHalfClosure(config));
90      }
91  
92      private static boolean isAllowHalfClosure(ChannelConfig config) {
93          return config instanceof SocketChannelConfig &&
94                  ((SocketChannelConfig) config).isAllowHalfClosure();
95      }
96  
97      protected class NioByteUnsafe extends AbstractNioUnsafe {
98  
99          private void closeOnRead(ChannelPipeline pipeline) {
100             if (!isInputShutdown0()) {
101                 if (isAllowHalfClosure(config())) {
102                     shutdownInput();
103                     pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
104                 } else {
105                     close(voidPromise());
106                 }
107             } else if (!inputClosedSeenErrorOnRead) {
108                 inputClosedSeenErrorOnRead = true;
109                 pipeline.fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
110             }
111         }
112 
113         private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close,
114                 RecvByteBufAllocator.Handle allocHandle) {
115             if (byteBuf != null) {
116                 if (byteBuf.isReadable()) {
117                     readPending = false;
118                     pipeline.fireChannelRead(byteBuf);
119                 } else {
120                     byteBuf.release();
121                 }
122             }
123             allocHandle.readComplete();
124             pipeline.fireChannelReadComplete();
125             pipeline.fireExceptionCaught(cause);
126 
127             // If oom will close the read event, release connection.
128             // See https://github.com/netty/netty/issues/10434
129             if (close || cause instanceof OutOfMemoryError || cause instanceof IOException) {
130                 closeOnRead(pipeline);
131             }
132         }
133 
134         @Override
135         public final void read() {
136             final ChannelConfig config = config();
137             if (shouldBreakReadReady(config)) {
138                 clearReadPending();
139                 return;
140             }
141             final ChannelPipeline pipeline = pipeline();
142             final ByteBufAllocator allocator = config.getAllocator();
143             final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
144             allocHandle.reset(config);
145 
146             ByteBuf byteBuf = null;
147             boolean close = false;
148             try {
149                 do {
150                     byteBuf = allocHandle.allocate(allocator);
151                     allocHandle.lastBytesRead(doReadBytes(byteBuf));
152                     if (allocHandle.lastBytesRead() <= 0) {
153                         // nothing was read. release the buffer.
154                         byteBuf.release();
155                         byteBuf = null;
156                         close = allocHandle.lastBytesRead() < 0;
157                         if (close) {
158                             // There is nothing left to read as we received an EOF.
159                             readPending = false;
160                         }
161                         break;
162                     }
163 
164                     allocHandle.incMessagesRead(1);
165                     readPending = false;
166                     pipeline.fireChannelRead(byteBuf);
167                     byteBuf = null;
168                 } while (allocHandle.continueReading());
169 
170                 allocHandle.readComplete();
171                 pipeline.fireChannelReadComplete();
172 
173                 if (close) {
174                     closeOnRead(pipeline);
175                 }
176             } catch (Throwable t) {
177                 handleReadException(pipeline, byteBuf, t, close, allocHandle);
178             } finally {
179                 // Check if there is a readPending which was not processed yet.
180                 // This could be for two reasons:
181                 // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
182                 // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
183                 //
184                 // See https://github.com/netty/netty/issues/2254
185                 if (!readPending && !config.isAutoRead()) {
186                     removeReadOp();
187                 }
188             }
189         }
190     }
191 
192     /**
193      * Write objects to the OS.
194      * @param in the collection which contains objects to write.
195      * @return The value that should be decremented from the write quantum which starts at
196      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
197      * <ul>
198      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
199      *     is encountered</li>
200      *     <li>1 - if a single call to write data was made to the OS</li>
201      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but no
202      *     data was accepted</li>
203      * </ul>
204      * @throws Exception if an I/O exception occurs during write.
205      */
206     protected final int doWrite0(ChannelOutboundBuffer in) throws Exception {
207         Object msg = in.current();
208         if (msg == null) {
209             // Directly return here so incompleteWrite(...) is not called.
210             return 0;
211         }
212         return doWriteInternal(in, in.current());
213     }
214 
215     private int doWriteInternal(ChannelOutboundBuffer in, Object msg) throws Exception {
216         if (msg instanceof ByteBuf) {
217             ByteBuf buf = (ByteBuf) msg;
218             if (!buf.isReadable()) {
219                 in.remove();
220                 return 0;
221             }
222 
223             final int localFlushedAmount = doWriteBytes(buf);
224             if (localFlushedAmount > 0) {
225                 in.progress(localFlushedAmount);
226                 if (!buf.isReadable()) {
227                     in.remove();
228                 }
229                 return 1;
230             }
231         } else if (msg instanceof FileRegion) {
232             FileRegion region = (FileRegion) msg;
233             if (region.transferred() >= region.count()) {
234                 in.remove();
235                 return 0;
236             }
237 
238             long localFlushedAmount = doWriteFileRegion(region);
239             if (localFlushedAmount > 0) {
240                 in.progress(localFlushedAmount);
241                 if (region.transferred() >= region.count()) {
242                     in.remove();
243                 }
244                 return 1;
245             }
246         } else {
247             // Should not reach here.
248             throw new Error();
249         }
250         return WRITE_STATUS_SNDBUF_FULL;
251     }
252 
253     @Override
254     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
255         int writeSpinCount = config().getWriteSpinCount();
256         do {
257             Object msg = in.current();
258             if (msg == null) {
259                 // Wrote all messages.
260                 clearOpWrite();
261                 // Directly return here so incompleteWrite(...) is not called.
262                 return;
263             }
264             writeSpinCount -= doWriteInternal(in, msg);
265         } while (writeSpinCount > 0);
266 
267         incompleteWrite(writeSpinCount < 0);
268     }
269 
270     @Override
271     protected final Object filterOutboundMessage(Object msg) {
272         if (msg instanceof ByteBuf) {
273             ByteBuf buf = (ByteBuf) msg;
274             if (buf.isDirect()) {
275                 return msg;
276             }
277 
278             return newDirectBuffer(buf);
279         }
280 
281         if (msg instanceof FileRegion) {
282             return msg;
283         }
284 
285         throw new UnsupportedOperationException(
286                 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
287     }
288 
289     protected final void incompleteWrite(boolean setOpWrite) {
290         // Did not write completely.
291         if (setOpWrite) {
292             setOpWrite();
293         } else {
294             // It is possible that we have set the write OP, woken up by NIO because the socket is writable, and then
295             // use our write quantum. In this case we no longer want to set the write OP because the socket is still
296             // writable (as far as we know). We will find out next time we attempt to write if the socket is writable
297             // and set the write OP if necessary.
298             clearOpWrite();
299 
300             // Schedule flush again later so other tasks can be picked up in the meantime
301             eventLoop().execute(flushTask);
302         }
303     }
304 
305     /**
306      * Write a {@link FileRegion}
307      *
308      * @param region        the {@link FileRegion} from which the bytes should be written
309      * @return amount       the amount of written bytes
310      */
311     protected abstract long doWriteFileRegion(FileRegion region) throws Exception;
312 
313     /**
314      * Read bytes into the given {@link ByteBuf} and return the amount.
315      */
316     protected abstract int doReadBytes(ByteBuf buf) throws Exception;
317 
318     /**
319      * Write bytes form the given {@link ByteBuf} to the underlying {@link java.nio.channels.Channel}.
320      * @param buf           the {@link ByteBuf} from which the bytes should be written
321      * @return amount       the amount of written bytes
322      */
323     protected abstract int doWriteBytes(ByteBuf buf) throws Exception;
324 
325     protected final void setOpWrite() {
326         final SelectionKey key = selectionKey();
327         // Check first if the key is still valid as it may be canceled as part of the deregistration
328         // from the EventLoop
329         // See https://github.com/netty/netty/issues/2104
330         if (!key.isValid()) {
331             return;
332         }
333         final int interestOps = key.interestOps();
334         if ((interestOps & SelectionKey.OP_WRITE) == 0) {
335             key.interestOps(interestOps | SelectionKey.OP_WRITE);
336         }
337     }
338 
339     protected final void clearOpWrite() {
340         final SelectionKey key = selectionKey();
341         // Check first if the key is still valid as it may be canceled as part of the deregistration
342         // from the EventLoop
343         // See https://github.com/netty/netty/issues/2104
344         if (!key.isValid()) {
345             return;
346         }
347         final int interestOps = key.interestOps();
348         if ((interestOps & SelectionKey.OP_WRITE) != 0) {
349             key.interestOps(interestOps & ~SelectionKey.OP_WRITE);
350         }
351     }
352 }