View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.nio;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.channel.Channel;
21  import io.netty.channel.ChannelConfig;
22  import io.netty.channel.ChannelFuture;
23  import io.netty.channel.ChannelMetadata;
24  import io.netty.channel.ChannelOutboundBuffer;
25  import io.netty.channel.ChannelPipeline;
26  import io.netty.channel.FileRegion;
27  import io.netty.channel.IoRegistration;
28  import io.netty.channel.RecvByteBufAllocator;
29  import io.netty.channel.internal.ChannelUtils;
30  import io.netty.channel.socket.ChannelInputShutdownEvent;
31  import io.netty.channel.socket.ChannelInputShutdownReadComplete;
32  import io.netty.channel.socket.SocketChannelConfig;
33  import io.netty.util.LeakPresenceDetector;
34  import io.netty.util.internal.StringUtil;
35  
36  import java.io.IOException;
37  import java.nio.channels.SelectableChannel;
38  import java.nio.channels.SelectionKey;
39  
40  import static io.netty.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL;
41  import static io.netty.util.internal.StringUtil.className;
42  
43  /**
44   * {@link AbstractNioChannel} base class for {@link Channel}s that operate on bytes.
45   */
46  public abstract class AbstractNioByteChannel extends AbstractNioChannel {
47      private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
48      private static final String EXPECTED_TYPES =
49              " (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " +
50              StringUtil.simpleClassName(FileRegion.class) + ')';
51  
52      private final Runnable flushTask = new Runnable() {
53          @Override
54          public void run() {
55              // Calling flush0 directly to ensure we not try to flush messages that were added via write(...) in the
56              // meantime.
57              ((AbstractNioUnsafe) unsafe()).flush0();
58          }
59      };
60      private boolean inputClosedSeenErrorOnRead;
61  
62      /**
63       * Create a new instance
64       *
65       * @param parent            the parent {@link Channel} by which this instance was created. May be {@code null}
66       * @param ch                the underlying {@link SelectableChannel} on which it operates
67       */
68      protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
69          super(parent, ch, SelectionKey.OP_READ);
70      }
71  
72      /**
73       * Shutdown the input side of the channel.
74       */
75      protected abstract ChannelFuture shutdownInput();
76  
77      protected boolean isInputShutdown0() {
78          return false;
79      }
80  
81      @Override
82      protected AbstractNioUnsafe newUnsafe() {
83          return new NioByteUnsafe();
84      }
85  
86      @Override
87      public ChannelMetadata metadata() {
88          return METADATA;
89      }
90  
91      final boolean shouldBreakReadReady(ChannelConfig config) {
92          return isInputShutdown0() && (inputClosedSeenErrorOnRead || !isAllowHalfClosure(config));
93      }
94  
95      private static boolean isAllowHalfClosure(ChannelConfig config) {
96          return config instanceof SocketChannelConfig &&
97                  ((SocketChannelConfig) config).isAllowHalfClosure();
98      }
99  
100     protected class NioByteUnsafe extends AbstractNioUnsafe {
101 
102         private void closeOnRead(ChannelPipeline pipeline) {
103             if (!isInputShutdown0()) {
104                 if (isAllowHalfClosure(config())) {
105                     shutdownInput();
106                     pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
107                 } else {
108                     close(voidPromise());
109                 }
110             } else if (!inputClosedSeenErrorOnRead) {
111                 inputClosedSeenErrorOnRead = true;
112                 pipeline.fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
113             }
114         }
115 
116         private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close,
117                 RecvByteBufAllocator.Handle allocHandle) {
118             if (byteBuf != null) {
119                 if (byteBuf.isReadable()) {
120                     readPending = false;
121                     try {
122                         pipeline.fireChannelRead(byteBuf);
123                     } catch (Exception e) {
124                         cause.addSuppressed(e);
125                     }
126                 } else {
127                     byteBuf.release();
128                 }
129             }
130             allocHandle.readComplete();
131             pipeline.fireChannelReadComplete();
132             pipeline.fireExceptionCaught(cause);
133 
134             // If oom will close the read event, release connection.
135             // See https://github.com/netty/netty/issues/10434
136             if (close ||
137                     cause instanceof OutOfMemoryError ||
138                     cause instanceof LeakPresenceDetector.AllocationProhibitedException ||
139                     cause instanceof IOException) {
140                 closeOnRead(pipeline);
141             }
142         }
143 
144         @Override
145         public final void read() {
146             final ChannelConfig config = config();
147             if (shouldBreakReadReady(config)) {
148                 clearReadPending();
149                 return;
150             }
151             final ChannelPipeline pipeline = pipeline();
152             final ByteBufAllocator allocator = config.getAllocator();
153             final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
154             allocHandle.reset(config);
155 
156             ByteBuf byteBuf = null;
157             boolean close = false;
158             try {
159                 do {
160                     byteBuf = allocHandle.allocate(allocator);
161                     allocHandle.lastBytesRead(doReadBytes(byteBuf));
162                     if (allocHandle.lastBytesRead() <= 0) {
163                         // nothing was read. release the buffer.
164                         byteBuf.release();
165                         byteBuf = null;
166                         close = allocHandle.lastBytesRead() < 0;
167                         if (close) {
168                             // There is nothing left to read as we received an EOF.
169                             readPending = false;
170                         }
171                         break;
172                     }
173 
174                     allocHandle.incMessagesRead(1);
175                     readPending = false;
176                     pipeline.fireChannelRead(byteBuf);
177                     byteBuf = null;
178                 } while (allocHandle.continueReading());
179 
180                 allocHandle.readComplete();
181                 pipeline.fireChannelReadComplete();
182 
183                 if (close) {
184                     closeOnRead(pipeline);
185                 }
186             } catch (Throwable t) {
187                 handleReadException(pipeline, byteBuf, t, close, allocHandle);
188             } finally {
189                 // Check if there is a readPending which was not processed yet.
190                 // This could be for two reasons:
191                 // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
192                 // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
193                 //
194                 // See https://github.com/netty/netty/issues/2254
195                 if (!readPending && !config.isAutoRead()) {
196                     removeReadOp();
197                 }
198             }
199         }
200     }
201 
202     /**
203      * Write objects to the OS.
204      * @param in the collection which contains objects to write.
205      * @return The value that should be decremented from the write quantum which starts at
206      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
207      * <ul>
208      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
209      *     is encountered</li>
210      *     <li>1 - if a single call to write data was made to the OS</li>
211      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but no
212      *     data was accepted</li>
213      * </ul>
214      * @throws Exception if an I/O exception occurs during write.
215      */
216     protected final int doWrite0(ChannelOutboundBuffer in) throws Exception {
217         Object msg = in.current();
218         if (msg == null) {
219             // Directly return here so incompleteWrite(...) is not called.
220             return 0;
221         }
222         return doWriteInternal(in, in.current());
223     }
224 
225     private int doWriteInternal(ChannelOutboundBuffer in, Object msg) throws Exception {
226         if (msg instanceof ByteBuf) {
227             ByteBuf buf = (ByteBuf) msg;
228             if (!buf.isReadable()) {
229                 in.remove();
230                 return 0;
231             }
232 
233             final int localFlushedAmount = doWriteBytes(buf);
234             if (localFlushedAmount > 0) {
235                 in.progress(localFlushedAmount);
236                 if (!buf.isReadable()) {
237                     in.remove();
238                 }
239                 return 1;
240             }
241         } else if (msg instanceof FileRegion) {
242             FileRegion region = (FileRegion) msg;
243             if (region.transferred() >= region.count()) {
244                 in.remove();
245                 return 0;
246             }
247 
248             long localFlushedAmount = doWriteFileRegion(region);
249             if (localFlushedAmount > 0) {
250                 in.progress(localFlushedAmount);
251                 if (region.transferred() >= region.count()) {
252                     in.remove();
253                 }
254                 return 1;
255             }
256         } else {
257             // Should not reach here.
258             throw new Error("Unexpected message type: " + className(msg));
259         }
260         return WRITE_STATUS_SNDBUF_FULL;
261     }
262 
263     @Override
264     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
265         int writeSpinCount = config().getWriteSpinCount();
266         do {
267             Object msg = in.current();
268             if (msg == null) {
269                 // Wrote all messages.
270                 clearOpWrite();
271                 // Directly return here so incompleteWrite(...) is not called.
272                 return;
273             }
274             writeSpinCount -= doWriteInternal(in, msg);
275         } while (writeSpinCount > 0);
276 
277         incompleteWrite(writeSpinCount < 0);
278     }
279 
280     @Override
281     protected final Object filterOutboundMessage(Object msg) {
282         if (msg instanceof ByteBuf) {
283             ByteBuf buf = (ByteBuf) msg;
284             if (buf.isDirect()) {
285                 return msg;
286             }
287 
288             return newDirectBuffer(buf);
289         }
290 
291         if (msg instanceof FileRegion) {
292             return msg;
293         }
294 
295         throw new UnsupportedOperationException(
296                 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
297     }
298 
299     protected final void incompleteWrite(boolean setOpWrite) {
300         // Did not write completely.
301         if (setOpWrite) {
302             setOpWrite();
303         } else {
304             // It is possible that we have set the write OP, woken up by NIO because the socket is writable, and then
305             // use our write quantum. In this case we no longer want to set the write OP because the socket is still
306             // writable (as far as we know). We will find out next time we attempt to write if the socket is writable
307             // and set the write OP if necessary.
308             clearOpWrite();
309 
310             // Schedule flush again later so other tasks can be picked up in the meantime
311             eventLoop().execute(flushTask);
312         }
313     }
314 
315     /**
316      * Write a {@link FileRegion}
317      *
318      * @param region        the {@link FileRegion} from which the bytes should be written
319      * @return amount       the amount of written bytes
320      */
321     protected abstract long doWriteFileRegion(FileRegion region) throws Exception;
322 
323     /**
324      * Read bytes into the given {@link ByteBuf} and return the amount.
325      */
326     protected abstract int doReadBytes(ByteBuf buf) throws Exception;
327 
328     /**
329      * Write bytes form the given {@link ByteBuf} to the underlying {@link java.nio.channels.Channel}.
330      * @param buf           the {@link ByteBuf} from which the bytes should be written
331      * @return amount       the amount of written bytes
332      */
333     protected abstract int doWriteBytes(ByteBuf buf) throws Exception;
334 
335     protected final void setOpWrite() {
336         final IoRegistration registration = registration();
337         // Check first if the key is still valid as it may be canceled as part of the deregistration
338         // from the EventLoop
339         // See https://github.com/netty/netty/issues/2104
340         if (!registration.isValid()) {
341             return;
342         }
343 
344         addAndSubmit(NioIoOps.WRITE);
345     }
346 
347     protected final void clearOpWrite() {
348         final IoRegistration registration = registration();
349         // Check first if the key is still valid as it may be canceled as part of the deregistration
350         // from the EventLoop
351         // See https://github.com/netty/netty/issues/2104
352         if (!registration.isValid()) {
353             return;
354         }
355         removeAndSubmit(NioIoOps.WRITE);
356     }
357 }