View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.nio;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.channel.Channel;
21  import io.netty.channel.ChannelConfig;
22  import io.netty.channel.ChannelFuture;
23  import io.netty.channel.ChannelMetadata;
24  import io.netty.channel.ChannelOutboundBuffer;
25  import io.netty.channel.ChannelPipeline;
26  import io.netty.channel.FileRegion;
27  import io.netty.channel.RecvByteBufAllocator;
28  import io.netty.channel.internal.ChannelUtils;
29  import io.netty.channel.socket.ChannelInputShutdownEvent;
30  import io.netty.channel.socket.ChannelInputShutdownReadComplete;
31  import io.netty.channel.socket.SocketChannelConfig;
32  import io.netty.util.internal.StringUtil;
33  
34  import java.io.IOException;
35  import java.nio.channels.SelectableChannel;
36  import java.nio.channels.SelectionKey;
37  
38  import static io.netty.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL;
39  
40  /**
41   * {@link AbstractNioChannel} base class for {@link Channel}s that operate on bytes.
42   */
43  public abstract class AbstractNioByteChannel extends AbstractNioChannel {
44      private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
45      private static final String EXPECTED_TYPES =
46              " (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " +
47              StringUtil.simpleClassName(FileRegion.class) + ')';
48  
49      private final Runnable flushTask = new Runnable() {
50          @Override
51          public void run() {
52              // Calling flush0 directly to ensure we not try to flush messages that were added via write(...) in the
53              // meantime.
54              ((AbstractNioUnsafe) unsafe()).flush0();
55          }
56      };
57      private boolean inputClosedSeenErrorOnRead;
58  
59      /**
60       * Create a new instance
61       *
62       * @param parent            the parent {@link Channel} by which this instance was created. May be {@code null}
63       * @param ch                the underlying {@link SelectableChannel} on which it operates
64       */
65      protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
66          super(parent, ch, SelectionKey.OP_READ);
67      }
68  
69      /**
70       * Shutdown the input side of the channel.
71       */
72      protected abstract ChannelFuture shutdownInput();
73  
74      protected boolean isInputShutdown0() {
75          return false;
76      }
77  
78      @Override
79      protected AbstractNioUnsafe newUnsafe() {
80          return new NioByteUnsafe();
81      }
82  
83      @Override
84      public ChannelMetadata metadata() {
85          return METADATA;
86      }
87  
88      final boolean shouldBreakReadReady(ChannelConfig config) {
89          return isInputShutdown0() && (inputClosedSeenErrorOnRead || !isAllowHalfClosure(config));
90      }
91  
92      private static boolean isAllowHalfClosure(ChannelConfig config) {
93          return config instanceof SocketChannelConfig &&
94                  ((SocketChannelConfig) config).isAllowHalfClosure();
95      }
96  
97      protected class NioByteUnsafe extends AbstractNioUnsafe {
98  
99          private void closeOnRead(ChannelPipeline pipeline) {
100             if (!isInputShutdown0()) {
101                 if (isAllowHalfClosure(config())) {
102                     shutdownInput();
103                     pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
104                 } else {
105                     close(voidPromise());
106                 }
107             } else {
108                 inputClosedSeenErrorOnRead = true;
109                 pipeline.fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
110             }
111         }
112 
113         private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close,
114                 RecvByteBufAllocator.Handle allocHandle) {
115             if (byteBuf != null) {
116                 if (byteBuf.isReadable()) {
117                     readPending = false;
118                     pipeline.fireChannelRead(byteBuf);
119                 } else {
120                     byteBuf.release();
121                 }
122             }
123             allocHandle.readComplete();
124             pipeline.fireChannelReadComplete();
125             pipeline.fireExceptionCaught(cause);
126             if (close || cause instanceof IOException) {
127                 closeOnRead(pipeline);
128             }
129         }
130 
131         @Override
132         public final void read() {
133             final ChannelConfig config = config();
134             if (shouldBreakReadReady(config)) {
135                 clearReadPending();
136                 return;
137             }
138             final ChannelPipeline pipeline = pipeline();
139             final ByteBufAllocator allocator = config.getAllocator();
140             final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
141             allocHandle.reset(config);
142 
143             ByteBuf byteBuf = null;
144             boolean close = false;
145             try {
146                 do {
147                     byteBuf = allocHandle.allocate(allocator);
148                     allocHandle.lastBytesRead(doReadBytes(byteBuf));
149                     if (allocHandle.lastBytesRead() <= 0) {
150                         // nothing was read. release the buffer.
151                         byteBuf.release();
152                         byteBuf = null;
153                         close = allocHandle.lastBytesRead() < 0;
154                         if (close) {
155                             // There is nothing left to read as we received an EOF.
156                             readPending = false;
157                         }
158                         break;
159                     }
160 
161                     allocHandle.incMessagesRead(1);
162                     readPending = false;
163                     pipeline.fireChannelRead(byteBuf);
164                     byteBuf = null;
165                 } while (allocHandle.continueReading());
166 
167                 allocHandle.readComplete();
168                 pipeline.fireChannelReadComplete();
169 
170                 if (close) {
171                     closeOnRead(pipeline);
172                 }
173             } catch (Throwable t) {
174                 handleReadException(pipeline, byteBuf, t, close, allocHandle);
175             } finally {
176                 // Check if there is a readPending which was not processed yet.
177                 // This could be for two reasons:
178                 // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
179                 // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
180                 //
181                 // See https://github.com/netty/netty/issues/2254
182                 if (!readPending && !config.isAutoRead()) {
183                     removeReadOp();
184                 }
185             }
186         }
187     }
188 
189     /**
190      * Write objects to the OS.
191      * @param in the collection which contains objects to write.
192      * @return The value that should be decremented from the write quantum which starts at
193      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
194      * <ul>
195      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
196      *     is encountered</li>
197      *     <li>1 - if a single call to write data was made to the OS</li>
198      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but no
199      *     data was accepted</li>
200      * </ul>
201      * @throws Exception if an I/O exception occurs during write.
202      */
203     protected final int doWrite0(ChannelOutboundBuffer in) throws Exception {
204         Object msg = in.current();
205         if (msg == null) {
206             // Directly return here so incompleteWrite(...) is not called.
207             return 0;
208         }
209         return doWriteInternal(in, in.current());
210     }
211 
212     private int doWriteInternal(ChannelOutboundBuffer in, Object msg) throws Exception {
213         if (msg instanceof ByteBuf) {
214             ByteBuf buf = (ByteBuf) msg;
215             if (!buf.isReadable()) {
216                 in.remove();
217                 return 0;
218             }
219 
220             final int localFlushedAmount = doWriteBytes(buf);
221             if (localFlushedAmount > 0) {
222                 in.progress(localFlushedAmount);
223                 if (!buf.isReadable()) {
224                     in.remove();
225                 }
226                 return 1;
227             }
228         } else if (msg instanceof FileRegion) {
229             FileRegion region = (FileRegion) msg;
230             if (region.transferred() >= region.count()) {
231                 in.remove();
232                 return 0;
233             }
234 
235             long localFlushedAmount = doWriteFileRegion(region);
236             if (localFlushedAmount > 0) {
237                 in.progress(localFlushedAmount);
238                 if (region.transferred() >= region.count()) {
239                     in.remove();
240                 }
241                 return 1;
242             }
243         } else {
244             // Should not reach here.
245             throw new Error();
246         }
247         return WRITE_STATUS_SNDBUF_FULL;
248     }
249 
250     @Override
251     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
252         int writeSpinCount = config().getWriteSpinCount();
253         do {
254             Object msg = in.current();
255             if (msg == null) {
256                 // Wrote all messages.
257                 clearOpWrite();
258                 // Directly return here so incompleteWrite(...) is not called.
259                 return;
260             }
261             writeSpinCount -= doWriteInternal(in, msg);
262         } while (writeSpinCount > 0);
263 
264         incompleteWrite(writeSpinCount < 0);
265     }
266 
267     @Override
268     protected final Object filterOutboundMessage(Object msg) {
269         if (msg instanceof ByteBuf) {
270             ByteBuf buf = (ByteBuf) msg;
271             if (buf.isDirect()) {
272                 return msg;
273             }
274 
275             return newDirectBuffer(buf);
276         }
277 
278         if (msg instanceof FileRegion) {
279             return msg;
280         }
281 
282         throw new UnsupportedOperationException(
283                 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
284     }
285 
286     protected final void incompleteWrite(boolean setOpWrite) {
287         // Did not write completely.
288         if (setOpWrite) {
289             setOpWrite();
290         } else {
291             // It is possible that we have set the write OP, woken up by NIO because the socket is writable, and then
292             // use our write quantum. In this case we no longer want to set the write OP because the socket is still
293             // writable (as far as we know). We will find out next time we attempt to write if the socket is writable
294             // and set the write OP if necessary.
295             clearOpWrite();
296 
297             // Schedule flush again later so other tasks can be picked up in the meantime
298             eventLoop().execute(flushTask);
299         }
300     }
301 
302     /**
303      * Write a {@link FileRegion}
304      *
305      * @param region        the {@link FileRegion} from which the bytes should be written
306      * @return amount       the amount of written bytes
307      */
308     protected abstract long doWriteFileRegion(FileRegion region) throws Exception;
309 
310     /**
311      * Read bytes into the given {@link ByteBuf} and return the amount.
312      */
313     protected abstract int doReadBytes(ByteBuf buf) throws Exception;
314 
315     /**
316      * Write bytes form the given {@link ByteBuf} to the underlying {@link java.nio.channels.Channel}.
317      * @param buf           the {@link ByteBuf} from which the bytes should be written
318      * @return amount       the amount of written bytes
319      */
320     protected abstract int doWriteBytes(ByteBuf buf) throws Exception;
321 
322     protected final void setOpWrite() {
323         final SelectionKey key = selectionKey();
324         // Check first if the key is still valid as it may be canceled as part of the deregistration
325         // from the EventLoop
326         // See https://github.com/netty/netty/issues/2104
327         if (!key.isValid()) {
328             return;
329         }
330         final int interestOps = key.interestOps();
331         if ((interestOps & SelectionKey.OP_WRITE) == 0) {
332             key.interestOps(interestOps | SelectionKey.OP_WRITE);
333         }
334     }
335 
336     protected final void clearOpWrite() {
337         final SelectionKey key = selectionKey();
338         // Check first if the key is still valid as it may be canceled as part of the deregistration
339         // from the EventLoop
340         // See https://github.com/netty/netty/issues/2104
341         if (!key.isValid()) {
342             return;
343         }
344         final int interestOps = key.interestOps();
345         if ((interestOps & SelectionKey.OP_WRITE) != 0) {
346             key.interestOps(interestOps & ~SelectionKey.OP_WRITE);
347         }
348     }
349 }