View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.nio;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.channel.Channel;
21  import io.netty.channel.ChannelConfig;
22  import io.netty.channel.ChannelFuture;
23  import io.netty.channel.ChannelMetadata;
24  import io.netty.channel.ChannelOption;
25  import io.netty.channel.ChannelOutboundBuffer;
26  import io.netty.channel.ChannelPipeline;
27  import io.netty.channel.FileRegion;
28  import io.netty.channel.RecvByteBufAllocator;
29  import io.netty.channel.internal.ChannelUtils;
30  import io.netty.channel.socket.ChannelInputShutdownEvent;
31  import io.netty.channel.socket.ChannelInputShutdownReadComplete;
32  import io.netty.util.internal.StringUtil;
33  
34  import java.io.IOException;
35  import java.nio.channels.SelectableChannel;
36  import java.nio.channels.SelectionKey;
37  
38  import static io.netty.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL;
39  
40  /**
41   * {@link AbstractNioChannel} base class for {@link Channel}s that operate on bytes.
42   */
43  public abstract class AbstractNioByteChannel extends AbstractNioChannel {
44      private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
45      private static final String EXPECTED_TYPES =
46              " (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " +
47              StringUtil.simpleClassName(FileRegion.class) + ')';
48  
49      private Runnable flushTask;
50  
51      /**
52       * Create a new instance
53       *
54       * @param parent            the parent {@link Channel} by which this instance was created. May be {@code null}
55       * @param ch                the underlying {@link SelectableChannel} on which it operates
56       */
57      protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
58          super(parent, ch, SelectionKey.OP_READ);
59      }
60  
61      /**
62       * Shutdown the input side of the channel.
63       */
64      protected abstract ChannelFuture shutdownInput();
65  
66      protected boolean isInputShutdown0() {
67          return false;
68      }
69  
70      @Override
71      protected AbstractNioUnsafe newUnsafe() {
72          return new NioByteUnsafe();
73      }
74  
75      @Override
76      public ChannelMetadata metadata() {
77          return METADATA;
78      }
79  
80      protected class NioByteUnsafe extends AbstractNioUnsafe {
81  
82          private void closeOnRead(ChannelPipeline pipeline) {
83              if (!isInputShutdown0()) {
84                  if (Boolean.TRUE.equals(config().getOption(ChannelOption.ALLOW_HALF_CLOSURE))) {
85                      shutdownInput();
86                      pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
87                  } else {
88                      close(voidPromise());
89                  }
90              } else {
91                  pipeline.fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
92              }
93          }
94  
95          private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close,
96                  RecvByteBufAllocator.Handle allocHandle) {
97              if (byteBuf != null) {
98                  if (byteBuf.isReadable()) {
99                      readPending = false;
100                     pipeline.fireChannelRead(byteBuf);
101                 } else {
102                     byteBuf.release();
103                 }
104             }
105             allocHandle.readComplete();
106             pipeline.fireChannelReadComplete();
107             pipeline.fireExceptionCaught(cause);
108             if (close || cause instanceof IOException) {
109                 closeOnRead(pipeline);
110             }
111         }
112 
113         @Override
114         public final void read() {
115             final ChannelConfig config = config();
116             final ChannelPipeline pipeline = pipeline();
117             final ByteBufAllocator allocator = config.getAllocator();
118             final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
119             allocHandle.reset(config);
120 
121             ByteBuf byteBuf = null;
122             boolean close = false;
123             try {
124                 do {
125                     byteBuf = allocHandle.allocate(allocator);
126                     allocHandle.lastBytesRead(doReadBytes(byteBuf));
127                     if (allocHandle.lastBytesRead() <= 0) {
128                         // nothing was read. release the buffer.
129                         byteBuf.release();
130                         byteBuf = null;
131                         close = allocHandle.lastBytesRead() < 0;
132                         if (close) {
133                             // There is nothing left to read as we received an EOF.
134                             readPending = false;
135                         }
136                         break;
137                     }
138 
139                     allocHandle.incMessagesRead(1);
140                     readPending = false;
141                     pipeline.fireChannelRead(byteBuf);
142                     byteBuf = null;
143                 } while (allocHandle.continueReading());
144 
145                 allocHandle.readComplete();
146                 pipeline.fireChannelReadComplete();
147 
148                 if (close) {
149                     closeOnRead(pipeline);
150                 }
151             } catch (Throwable t) {
152                 handleReadException(pipeline, byteBuf, t, close, allocHandle);
153             } finally {
154                 // Check if there is a readPending which was not processed yet.
155                 // This could be for two reasons:
156                 // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
157                 // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
158                 //
159                 // See https://github.com/netty/netty/issues/2254
160                 if (!readPending && !config.isAutoRead()) {
161                     removeReadOp();
162                 }
163             }
164         }
165     }
166 
167     /**
168      * Write objects to the OS.
169      * @param in the collection which contains objects to write.
170      * @return The value that should be decremented from the write quantum which starts at
171      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
172      * <ul>
173      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
174      *     is encountered</li>
175      *     <li>1 - if a single call to write data was made to the OS</li>
176      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but no
177      *     data was accepted</li>
178      * </ul>
179      * @throws Exception if an I/O exception occurs during write.
180      */
181     protected final int doWrite0(ChannelOutboundBuffer in) throws Exception {
182         Object msg = in.current();
183         if (msg == null) {
184             // Directly return here so incompleteWrite(...) is not called.
185             return 0;
186         }
187         return doWriteInternal(in, in.current());
188     }
189 
190     private int doWriteInternal(ChannelOutboundBuffer in, Object msg) throws Exception {
191         if (msg instanceof ByteBuf) {
192             ByteBuf buf = (ByteBuf) msg;
193             if (!buf.isReadable()) {
194                 in.remove();
195                 return 0;
196             }
197 
198             final int localFlushedAmount = doWriteBytes(buf);
199             if (localFlushedAmount > 0) {
200                 in.progress(localFlushedAmount);
201                 if (!buf.isReadable()) {
202                     in.remove();
203                 }
204                 return 1;
205             }
206         } else if (msg instanceof FileRegion) {
207             FileRegion region = (FileRegion) msg;
208             if (region.transferred() >= region.count()) {
209                 in.remove();
210                 return 0;
211             }
212 
213             long localFlushedAmount = doWriteFileRegion(region);
214             if (localFlushedAmount > 0) {
215                 in.progress(localFlushedAmount);
216                 if (region.transferred() >= region.count()) {
217                     in.remove();
218                 }
219                 return 1;
220             }
221         } else {
222             // Should not reach here.
223             throw new Error();
224         }
225         return WRITE_STATUS_SNDBUF_FULL;
226     }
227 
228     @Override
229     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
230         int writeSpinCount = config().getWriteSpinCount();
231         do {
232             Object msg = in.current();
233             if (msg == null) {
234                 // Wrote all messages.
235                 clearOpWrite();
236                 // Directly return here so incompleteWrite(...) is not called.
237                 return;
238             }
239             writeSpinCount -= doWriteInternal(in, msg);
240         } while (writeSpinCount > 0);
241 
242         incompleteWrite(writeSpinCount < 0);
243     }
244 
245     @Override
246     protected final Object filterOutboundMessage(Object msg) {
247         if (msg instanceof ByteBuf) {
248             ByteBuf buf = (ByteBuf) msg;
249             if (buf.isDirect()) {
250                 return msg;
251             }
252 
253             return newDirectBuffer(buf);
254         }
255 
256         if (msg instanceof FileRegion) {
257             return msg;
258         }
259 
260         throw new UnsupportedOperationException(
261                 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
262     }
263 
264     protected final void incompleteWrite(boolean setOpWrite) {
265         // Did not write completely.
266         if (setOpWrite) {
267             setOpWrite();
268         } else {
269             // Schedule flush again later so other tasks can be picked up in the meantime
270             Runnable flushTask = this.flushTask;
271             if (flushTask == null) {
272                 flushTask = this.flushTask = new Runnable() {
273                     @Override
274                     public void run() {
275                         flush();
276                     }
277                 };
278             }
279             eventLoop().execute(flushTask);
280         }
281     }
282 
283     /**
284      * Write a {@link FileRegion}
285      *
286      * @param region        the {@link FileRegion} from which the bytes should be written
287      * @return amount       the amount of written bytes
288      */
289     protected abstract long doWriteFileRegion(FileRegion region) throws Exception;
290 
291     /**
292      * Read bytes into the given {@link ByteBuf} and return the amount.
293      */
294     protected abstract int doReadBytes(ByteBuf buf) throws Exception;
295 
296     /**
297      * Write bytes form the given {@link ByteBuf} to the underlying {@link java.nio.channels.Channel}.
298      * @param buf           the {@link ByteBuf} from which the bytes should be written
299      * @return amount       the amount of written bytes
300      */
301     protected abstract int doWriteBytes(ByteBuf buf) throws Exception;
302 
303     protected final void setOpWrite() {
304         final SelectionKey key = selectionKey();
305         // Check first if the key is still valid as it may be canceled as part of the deregistration
306         // from the EventLoop
307         // See https://github.com/netty/netty/issues/2104
308         if (!key.isValid()) {
309             return;
310         }
311         final int interestOps = key.interestOps();
312         if ((interestOps & SelectionKey.OP_WRITE) == 0) {
313             key.interestOps(interestOps | SelectionKey.OP_WRITE);
314         }
315     }
316 
317     protected final void clearOpWrite() {
318         final SelectionKey key = selectionKey();
319         // Check first if the key is still valid as it may be canceled as part of the deregistration
320         // from the EventLoop
321         // See https://github.com/netty/netty/issues/2104
322         if (!key.isValid()) {
323             return;
324         }
325         final int interestOps = key.interestOps();
326         if ((interestOps & SelectionKey.OP_WRITE) != 0) {
327             key.interestOps(interestOps & ~SelectionKey.OP_WRITE);
328         }
329     }
330 }