View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty5.channel.nio;
17  
18  import io.netty5.buffer.api.Buffer;
19  import io.netty5.buffer.api.BufferAllocator;
20  import io.netty5.channel.AdaptiveRecvBufferAllocator;
21  import io.netty5.channel.ChannelShutdownDirection;
22  import io.netty5.util.Resource;
23  import io.netty5.channel.Channel;
24  import io.netty5.channel.ChannelMetadata;
25  import io.netty5.channel.ChannelOutboundBuffer;
26  import io.netty5.channel.ChannelPipeline;
27  import io.netty5.channel.EventLoop;
28  import io.netty5.channel.FileRegion;
29  import io.netty5.channel.RecvBufferAllocator;
30  import io.netty5.channel.internal.ChannelUtils;
31  import io.netty5.util.internal.StringUtil;
32  
33  import java.io.IOException;
34  import java.net.SocketAddress;
35  import java.nio.channels.SelectableChannel;
36  import java.nio.channels.SelectionKey;
37  
38  import static io.netty5.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL;
39  
40  /**
41   * {@link AbstractNioChannel} base class for {@link Channel}s that operate on bytes.
42   */
43  public abstract class AbstractNioByteChannel<P extends Channel, L extends SocketAddress, R extends SocketAddress>
44          extends AbstractNioChannel<P, L, R> {
45      private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
46      private static final String EXPECTED_TYPES =
47              " (expected: " + StringUtil.simpleClassName(Buffer.class) + ", " +
48              StringUtil.simpleClassName(FileRegion.class) + ')';
49  
50      // Calling flush0 directly to ensure we not try to flush messages that were added via write(...) in the
51      // meantime.
52      private final Runnable flushTask = this::writeFlushed;
53      private boolean inputClosedSeenErrorOnRead;
54  
55      /**
56       * Create a new instance
57       *
58       * @param parent            the parent {@link Channel} by which this instance was created. May be {@code null}
59       * @param eventLoop         the {@link EventLoop} to use for IO.
60       * @param ch                the underlying {@link SelectableChannel} on which it operates
61       */
62      protected AbstractNioByteChannel(P parent, EventLoop eventLoop, SelectableChannel ch) {
63          super(parent, eventLoop, METADATA, new AdaptiveRecvBufferAllocator(), ch, SelectionKey.OP_READ);
64      }
65  
66      final boolean shouldBreakReadReady() {
67          return isShutdown(ChannelShutdownDirection.Inbound) &&
68                  (inputClosedSeenErrorOnRead || !isAllowHalfClosure());
69      }
70  
71      private void closeOnRead() {
72          if (!isShutdown(ChannelShutdownDirection.Inbound)) {
73              if (isAllowHalfClosure()) {
74                  shutdownTransport(ChannelShutdownDirection.Inbound, newPromise());
75              } else {
76                  closeTransport(newPromise());
77              }
78          } else {
79              inputClosedSeenErrorOnRead = true;
80          }
81      }
82  
83      private void handleReadException(ChannelPipeline pipeline, Buffer buffer, Throwable cause, boolean close,
84              RecvBufferAllocator.Handle allocHandle) {
85          if (buffer.readableBytes() > 0) {
86              readPending = false;
87              pipeline.fireChannelRead(buffer);
88          } else {
89              buffer.close();
90          }
91          allocHandle.readComplete();
92          pipeline.fireChannelReadComplete();
93          pipeline.fireChannelExceptionCaught(cause);
94  
95          // If oom will close the read event, release connection.
96          // See https://github.com/netty/netty/issues/10434
97          if (close || cause instanceof OutOfMemoryError || cause instanceof IOException) {
98              closeOnRead();
99          } else {
100             readIfIsAutoRead();
101         }
102     }
103 
104     @Override
105     protected final void readNow() {
106         if (shouldBreakReadReady()) {
107             clearReadPending();
108             return;
109         }
110         final ChannelPipeline pipeline = pipeline();
111         final BufferAllocator bufferAllocator = bufferAllocator();
112         final RecvBufferAllocator.Handle allocHandle = recvBufAllocHandle();
113         allocHandle.reset();
114 
115         Buffer buffer = null;
116         boolean close = false;
117         try {
118             do {
119                 buffer = allocHandle.allocate(bufferAllocator);
120                 allocHandle.lastBytesRead(doReadBytes(buffer));
121                 if (allocHandle.lastBytesRead() <= 0) {
122                     // nothing was read. release the buffer.
123                     Resource.dispose(buffer);
124                     buffer = null;
125                     close = allocHandle.lastBytesRead() < 0;
126                     if (close) {
127                         // There is nothing left to read as we received an EOF.
128                         readPending = false;
129                     }
130                     break;
131                 }
132 
133                 allocHandle.incMessagesRead(1);
134                 readPending = false;
135                 pipeline.fireChannelRead(buffer);
136                 buffer = null;
137             } while (allocHandle.continueReading(isAutoRead()) && !isShutdown(ChannelShutdownDirection.Inbound));
138 
139             allocHandle.readComplete();
140             pipeline.fireChannelReadComplete();
141 
142             if (close) {
143                 closeOnRead();
144             } else {
145                 readIfIsAutoRead();
146             }
147         } catch (Throwable t) {
148             handleReadException(pipeline, buffer, t, close, allocHandle);
149         } finally {
150             // Check if there is a readPending which was not processed yet.
151             // This could be for two reasons:
152             // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
153             // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
154             //
155             // See https://github.com/netty/netty/issues/2254
156             if (!readPending && !isAutoRead()) {
157                 removeReadOp();
158             }
159         }
160     }
161 
162     /**
163      * Write objects to the OS.
164      * @param in the collection which contains objects to write.
165      * @return The value that should be decremented from the write quantum which starts at
166      * {@link #getWriteSpinCount()}. The typical use cases are as follows:
167      * <ul>
168      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link Buffer} (or other empty content)
169      *     is encountered</li>
170      *     <li>1 - if a single call to write data was made to the OS</li>
171      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but no
172      *     data was accepted</li>
173      * </ul>
174      * @throws Exception if an I/O exception occurs during write.
175      */
176     protected final int doWrite0(ChannelOutboundBuffer in) throws Exception {
177         Object msg = in.current();
178         if (msg == null) {
179             // Directly return here so incompleteWrite(...) is not called.
180             return 0;
181         }
182         return doWriteInternal(in, in.current());
183     }
184 
185     private int doWriteInternal(ChannelOutboundBuffer in, Object msg) throws Exception {
186         if (msg instanceof Buffer) {
187             Buffer buf = (Buffer) msg;
188             if (buf.readableBytes() == 0) {
189                 in.remove();
190                 return 0;
191             }
192 
193             final int localFlushAmount = doWriteBytes(buf);
194             if (localFlushAmount > 0) {
195                 in.progress(localFlushAmount);
196                 if (buf.readableBytes() == 0) {
197                     in.remove();
198                 }
199                 return 1;
200             }
201         } else if (msg instanceof FileRegion) {
202             FileRegion region = (FileRegion) msg;
203             if (region.transferred() >= region.count()) {
204                 in.remove();
205                 return 0;
206             }
207 
208             long localFlushedAmount = doWriteFileRegion(region);
209             if (localFlushedAmount > 0) {
210                 in.progress(localFlushedAmount);
211                 if (region.transferred() >= region.count()) {
212                     in.remove();
213                 }
214                 return 1;
215             }
216         } else {
217             // Should not reach here.
218             throw new Error();
219         }
220         return WRITE_STATUS_SNDBUF_FULL;
221     }
222 
223     @Override
224     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
225         int writeSpinCount = getWriteSpinCount();
226         do {
227             Object msg = in.current();
228             if (msg == null) {
229                 // Wrote all messages.
230                 clearOpWrite();
231                 // Directly return here so incompleteWrite(...) is not called.
232                 return;
233             }
234             writeSpinCount -= doWriteInternal(in, msg);
235         } while (writeSpinCount > 0);
236 
237         incompleteWrite(writeSpinCount < 0);
238     }
239 
240     @Override
241     protected final Object filterOutboundMessage(Object msg) {
242         if (msg instanceof Buffer) {
243             Buffer buf = (Buffer) msg;
244             if (buf.isDirect()) {
245                 return msg;
246             }
247 
248             return newDirectBuffer(buf);
249         }
250 
251         if (msg instanceof FileRegion) {
252             return msg;
253         }
254 
255         throw new UnsupportedOperationException(
256                 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
257     }
258 
259     protected final void incompleteWrite(boolean setOpWrite) {
260         // Did not write completely.
261         if (setOpWrite) {
262             setOpWrite();
263         } else {
264             // It is possible that we have set the write OP, woken up by NIO because the socket is writable, and then
265             // use our write quantum. In this case we no longer want to set the write OP because the socket is still
266             // writable (as far as we know). We will find out next time we attempt to write if the socket is writable
267             // and set the write OP if necessary.
268             clearOpWrite();
269 
270             // Schedule flush again later so other tasks can be picked up in the meantime
271             executor().execute(flushTask);
272         }
273     }
274 
275     /**
276      * Write a {@link FileRegion}
277      *
278      * @param region        the {@link FileRegion} from which the bytes should be written
279      * @return amount       the amount of written bytes
280      */
281     protected abstract long doWriteFileRegion(FileRegion region) throws Exception;
282 
283     /**
284      * Read bytes into the given {@link Buffer} and return the amount.
285      */
286     protected abstract int doReadBytes(Buffer buf) throws Exception;
287 
288     /**
289      * Write bytes form the given {@link Buffer} to the underlying {@link java.nio.channels.Channel}.
290      * @param buf           the {@link Buffer} from which the bytes should be written
291      * @return amount       the amount of written bytes
292      */
293     protected abstract int doWriteBytes(Buffer buf) throws Exception;
294 
295     protected final void setOpWrite() {
296         final SelectionKey key = selectionKey();
297         // Check first if the key is still valid as it may be canceled as part of the deregistration
298         // from the EventLoop
299         // See https://github.com/netty/netty/issues/2104
300         if (key == null || !key.isValid()) {
301             return;
302         }
303         final int interestOps = key.interestOps();
304         if ((interestOps & SelectionKey.OP_WRITE) == 0) {
305             key.interestOps(interestOps | SelectionKey.OP_WRITE);
306         }
307     }
308 
309     protected final void clearOpWrite() {
310         final SelectionKey key = selectionKey();
311         // Check first if the key is still valid as it may be canceled as part of the deregistration
312         // from the EventLoop
313         // See https://github.com/netty/netty/issues/2104
314         if (key == null || !key.isValid()) {
315             return;
316         }
317         final int interestOps = key.interestOps();
318         if ((interestOps & SelectionKey.OP_WRITE) != 0) {
319             key.interestOps(interestOps & ~SelectionKey.OP_WRITE);
320         }
321     }
322 }