View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.nio;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.channel.Channel;
21  import io.netty.channel.ChannelConfig;
22  import io.netty.channel.ChannelFuture;
23  import io.netty.channel.ChannelMetadata;
24  import io.netty.channel.ChannelOutboundBuffer;
25  import io.netty.channel.ChannelPipeline;
26  import io.netty.channel.FileRegion;
27  import io.netty.channel.IoRegistration;
28  import io.netty.channel.RecvByteBufAllocator;
29  import io.netty.channel.internal.ChannelUtils;
30  import io.netty.channel.socket.ChannelInputShutdownEvent;
31  import io.netty.channel.socket.ChannelInputShutdownReadComplete;
32  import io.netty.channel.socket.SocketChannelConfig;
33  import io.netty.util.internal.StringUtil;
34  
35  import java.io.IOException;
36  import java.nio.channels.SelectableChannel;
37  import java.nio.channels.SelectionKey;
38  
39  import static io.netty.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL;
40  
41  /**
42   * {@link AbstractNioChannel} base class for {@link Channel}s that operate on bytes.
43   */
44  public abstract class AbstractNioByteChannel extends AbstractNioChannel {
45      private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
46      private static final String EXPECTED_TYPES =
47              " (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " +
48              StringUtil.simpleClassName(FileRegion.class) + ')';
49  
50      private final Runnable flushTask = new Runnable() {
51          @Override
52          public void run() {
53              // Calling flush0 directly to ensure we not try to flush messages that were added via write(...) in the
54              // meantime.
55              ((AbstractNioUnsafe) unsafe()).flush0();
56          }
57      };
58      private boolean inputClosedSeenErrorOnRead;
59  
60      /**
61       * Create a new instance
62       *
63       * @param parent            the parent {@link Channel} by which this instance was created. May be {@code null}
64       * @param ch                the underlying {@link SelectableChannel} on which it operates
65       */
66      protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
67          super(parent, ch, SelectionKey.OP_READ);
68      }
69  
70      /**
71       * Shutdown the input side of the channel.
72       */
73      protected abstract ChannelFuture shutdownInput();
74  
75      protected boolean isInputShutdown0() {
76          return false;
77      }
78  
79      @Override
80      protected AbstractNioUnsafe newUnsafe() {
81          return new NioByteUnsafe();
82      }
83  
84      @Override
85      public ChannelMetadata metadata() {
86          return METADATA;
87      }
88  
89      final boolean shouldBreakReadReady(ChannelConfig config) {
90          return isInputShutdown0() && (inputClosedSeenErrorOnRead || !isAllowHalfClosure(config));
91      }
92  
93      private static boolean isAllowHalfClosure(ChannelConfig config) {
94          return config instanceof SocketChannelConfig &&
95                  ((SocketChannelConfig) config).isAllowHalfClosure();
96      }
97  
98      protected class NioByteUnsafe extends AbstractNioUnsafe {
99  
100         private void closeOnRead(ChannelPipeline pipeline) {
101             if (!isInputShutdown0()) {
102                 if (isAllowHalfClosure(config())) {
103                     shutdownInput();
104                     pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
105                 } else {
106                     close(voidPromise());
107                 }
108             } else if (!inputClosedSeenErrorOnRead) {
109                 inputClosedSeenErrorOnRead = true;
110                 pipeline.fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
111             }
112         }
113 
114         private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close,
115                 RecvByteBufAllocator.Handle allocHandle) {
116             if (byteBuf != null) {
117                 if (byteBuf.isReadable()) {
118                     readPending = false;
119                     pipeline.fireChannelRead(byteBuf);
120                 } else {
121                     byteBuf.release();
122                 }
123             }
124             allocHandle.readComplete();
125             pipeline.fireChannelReadComplete();
126             pipeline.fireExceptionCaught(cause);
127 
128             // If oom will close the read event, release connection.
129             // See https://github.com/netty/netty/issues/10434
130             if (close || cause instanceof OutOfMemoryError || cause instanceof IOException) {
131                 closeOnRead(pipeline);
132             }
133         }
134 
135         @Override
136         public final void read() {
137             final ChannelConfig config = config();
138             if (shouldBreakReadReady(config)) {
139                 clearReadPending();
140                 return;
141             }
142             final ChannelPipeline pipeline = pipeline();
143             final ByteBufAllocator allocator = config.getAllocator();
144             final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
145             allocHandle.reset(config);
146 
147             ByteBuf byteBuf = null;
148             boolean close = false;
149             try {
150                 do {
151                     byteBuf = allocHandle.allocate(allocator);
152                     allocHandle.lastBytesRead(doReadBytes(byteBuf));
153                     if (allocHandle.lastBytesRead() <= 0) {
154                         // nothing was read. release the buffer.
155                         byteBuf.release();
156                         byteBuf = null;
157                         close = allocHandle.lastBytesRead() < 0;
158                         if (close) {
159                             // There is nothing left to read as we received an EOF.
160                             readPending = false;
161                         }
162                         break;
163                     }
164 
165                     allocHandle.incMessagesRead(1);
166                     readPending = false;
167                     pipeline.fireChannelRead(byteBuf);
168                     byteBuf = null;
169                 } while (allocHandle.continueReading());
170 
171                 allocHandle.readComplete();
172                 pipeline.fireChannelReadComplete();
173 
174                 if (close) {
175                     closeOnRead(pipeline);
176                 }
177             } catch (Throwable t) {
178                 handleReadException(pipeline, byteBuf, t, close, allocHandle);
179             } finally {
180                 // Check if there is a readPending which was not processed yet.
181                 // This could be for two reasons:
182                 // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
183                 // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
184                 //
185                 // See https://github.com/netty/netty/issues/2254
186                 if (!readPending && !config.isAutoRead()) {
187                     removeReadOp();
188                 }
189             }
190         }
191     }
192 
193     /**
194      * Write objects to the OS.
195      * @param in the collection which contains objects to write.
196      * @return The value that should be decremented from the write quantum which starts at
197      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
198      * <ul>
199      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
200      *     is encountered</li>
201      *     <li>1 - if a single call to write data was made to the OS</li>
202      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but no
203      *     data was accepted</li>
204      * </ul>
205      * @throws Exception if an I/O exception occurs during write.
206      */
207     protected final int doWrite0(ChannelOutboundBuffer in) throws Exception {
208         Object msg = in.current();
209         if (msg == null) {
210             // Directly return here so incompleteWrite(...) is not called.
211             return 0;
212         }
213         return doWriteInternal(in, in.current());
214     }
215 
216     private int doWriteInternal(ChannelOutboundBuffer in, Object msg) throws Exception {
217         if (msg instanceof ByteBuf) {
218             ByteBuf buf = (ByteBuf) msg;
219             if (!buf.isReadable()) {
220                 in.remove();
221                 return 0;
222             }
223 
224             final int localFlushedAmount = doWriteBytes(buf);
225             if (localFlushedAmount > 0) {
226                 in.progress(localFlushedAmount);
227                 if (!buf.isReadable()) {
228                     in.remove();
229                 }
230                 return 1;
231             }
232         } else if (msg instanceof FileRegion) {
233             FileRegion region = (FileRegion) msg;
234             if (region.transferred() >= region.count()) {
235                 in.remove();
236                 return 0;
237             }
238 
239             long localFlushedAmount = doWriteFileRegion(region);
240             if (localFlushedAmount > 0) {
241                 in.progress(localFlushedAmount);
242                 if (region.transferred() >= region.count()) {
243                     in.remove();
244                 }
245                 return 1;
246             }
247         } else {
248             // Should not reach here.
249             throw new Error();
250         }
251         return WRITE_STATUS_SNDBUF_FULL;
252     }
253 
254     @Override
255     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
256         int writeSpinCount = config().getWriteSpinCount();
257         do {
258             Object msg = in.current();
259             if (msg == null) {
260                 // Wrote all messages.
261                 clearOpWrite();
262                 // Directly return here so incompleteWrite(...) is not called.
263                 return;
264             }
265             writeSpinCount -= doWriteInternal(in, msg);
266         } while (writeSpinCount > 0);
267 
268         incompleteWrite(writeSpinCount < 0);
269     }
270 
271     @Override
272     protected final Object filterOutboundMessage(Object msg) {
273         if (msg instanceof ByteBuf) {
274             ByteBuf buf = (ByteBuf) msg;
275             if (buf.isDirect()) {
276                 return msg;
277             }
278 
279             return newDirectBuffer(buf);
280         }
281 
282         if (msg instanceof FileRegion) {
283             return msg;
284         }
285 
286         throw new UnsupportedOperationException(
287                 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
288     }
289 
290     protected final void incompleteWrite(boolean setOpWrite) {
291         // Did not write completely.
292         if (setOpWrite) {
293             setOpWrite();
294         } else {
295             // It is possible that we have set the write OP, woken up by NIO because the socket is writable, and then
296             // use our write quantum. In this case we no longer want to set the write OP because the socket is still
297             // writable (as far as we know). We will find out next time we attempt to write if the socket is writable
298             // and set the write OP if necessary.
299             clearOpWrite();
300 
301             // Schedule flush again later so other tasks can be picked up in the meantime
302             eventLoop().execute(flushTask);
303         }
304     }
305 
306     /**
307      * Write a {@link FileRegion}
308      *
309      * @param region        the {@link FileRegion} from which the bytes should be written
310      * @return amount       the amount of written bytes
311      */
312     protected abstract long doWriteFileRegion(FileRegion region) throws Exception;
313 
314     /**
315      * Read bytes into the given {@link ByteBuf} and return the amount.
316      */
317     protected abstract int doReadBytes(ByteBuf buf) throws Exception;
318 
319     /**
320      * Write bytes form the given {@link ByteBuf} to the underlying {@link java.nio.channels.Channel}.
321      * @param buf           the {@link ByteBuf} from which the bytes should be written
322      * @return amount       the amount of written bytes
323      */
324     protected abstract int doWriteBytes(ByteBuf buf) throws Exception;
325 
326     protected final void setOpWrite() {
327         final IoRegistration registration = registration();
328         // Check first if the key is still valid as it may be canceled as part of the deregistration
329         // from the EventLoop
330         // See https://github.com/netty/netty/issues/2104
331         if (!registration.isValid()) {
332             return;
333         }
334 
335         addAndSubmit(NioIoOps.WRITE);
336     }
337 
338     protected final void clearOpWrite() {
339         final IoRegistration registration = registration();
340         // Check first if the key is still valid as it may be canceled as part of the deregistration
341         // from the EventLoop
342         // See https://github.com/netty/netty/issues/2104
343         if (!registration.isValid()) {
344             return;
345         }
346         removeAndSubmit(NioIoOps.WRITE);
347     }
348 }