View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.nio;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.channel.Channel;
21  import io.netty.channel.ChannelConfig;
22  import io.netty.channel.ChannelOption;
23  import io.netty.channel.ChannelOutboundBuffer;
24  import io.netty.channel.ChannelPipeline;
25  import io.netty.channel.FileRegion;
26  import io.netty.channel.RecvByteBufAllocator;
27  import io.netty.channel.socket.ChannelInputShutdownEvent;
28  import io.netty.util.internal.StringUtil;
29  
30  import java.io.IOException;
31  import java.nio.channels.SelectableChannel;
32  import java.nio.channels.SelectionKey;
33  
34  /**
35   * {@link AbstractNioChannel} base class for {@link Channel}s that operate on bytes.
36   */
37  public abstract class AbstractNioByteChannel extends AbstractNioChannel {
38  
39      private static final String EXPECTED_TYPES =
40              " (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " +
41              StringUtil.simpleClassName(FileRegion.class) + ')';
42  
43      private Runnable flushTask;
44  
45      /**
46       * Create a new instance
47       *
48       * @param parent            the parent {@link Channel} by which this instance was created. May be {@code null}
49       * @param ch                the underlying {@link SelectableChannel} on which it operates
50       */
51      protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
52          super(parent, ch, SelectionKey.OP_READ);
53      }
54  
55      @Override
56      protected AbstractNioUnsafe newUnsafe() {
57          return new NioByteUnsafe();
58      }
59  
60      protected class NioByteUnsafe extends AbstractNioUnsafe {
61          private RecvByteBufAllocator.Handle allocHandle;
62  
63          private void closeOnRead(ChannelPipeline pipeline) {
64              SelectionKey key = selectionKey();
65              setInputShutdown();
66              if (isOpen()) {
67                  if (Boolean.TRUE.equals(config().getOption(ChannelOption.ALLOW_HALF_CLOSURE))) {
68                      key.interestOps(key.interestOps() & ~readInterestOp);
69                      pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
70                  } else {
71                      close(voidPromise());
72                  }
73              }
74          }
75  
76          private void handleReadException(ChannelPipeline pipeline,
77                                           ByteBuf byteBuf, Throwable cause, boolean close) {
78              if (byteBuf != null) {
79                  if (byteBuf.isReadable()) {
80                      setReadPending(false);
81                      pipeline.fireChannelRead(byteBuf);
82                  } else {
83                      byteBuf.release();
84                  }
85              }
86              pipeline.fireChannelReadComplete();
87              pipeline.fireExceptionCaught(cause);
88              if (close || cause instanceof IOException) {
89                  closeOnRead(pipeline);
90              }
91          }
92  
93          @Override
94          public final void read() {
95              final ChannelConfig config = config();
96              if (!config.isAutoRead() && !isReadPending()) {
97                  // ChannelConfig.setAutoRead(false) was called in the meantime
98                  removeReadOp();
99                  return;
100             }
101 
102             final ChannelPipeline pipeline = pipeline();
103             final ByteBufAllocator allocator = config.getAllocator();
104             final int maxMessagesPerRead = config.getMaxMessagesPerRead();
105             RecvByteBufAllocator.Handle allocHandle = this.allocHandle;
106             if (allocHandle == null) {
107                 this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();
108             }
109 
110             ByteBuf byteBuf = null;
111             int messages = 0;
112             boolean close = false;
113             try {
114                 int totalReadAmount = 0;
115                 boolean readPendingReset = false;
116                 do {
117                     byteBuf = allocHandle.allocate(allocator);
118                     int writable = byteBuf.writableBytes();
119                     int localReadAmount = doReadBytes(byteBuf);
120                     if (localReadAmount <= 0) {
121                         // not was read release the buffer
122                         byteBuf.release();
123                         byteBuf = null;
124                         close = localReadAmount < 0;
125                         if (close) {
126                             // There is nothing left to read as we received an EOF.
127                             setReadPending(false);
128                         }
129                         break;
130                     }
131                     if (!readPendingReset) {
132                         readPendingReset = true;
133                         setReadPending(false);
134                     }
135                     pipeline.fireChannelRead(byteBuf);
136                     byteBuf = null;
137 
138                     if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) {
139                         // Avoid overflow.
140                         totalReadAmount = Integer.MAX_VALUE;
141                         break;
142                     }
143 
144                     totalReadAmount += localReadAmount;
145 
146                     // stop reading
147                     if (!config.isAutoRead()) {
148                         break;
149                     }
150 
151                     if (localReadAmount < writable) {
152                         // Read less than what the buffer can hold,
153                         // which might mean we drained the recv buffer completely.
154                         break;
155                     }
156                 } while (++ messages < maxMessagesPerRead);
157 
158                 pipeline.fireChannelReadComplete();
159                 allocHandle.record(totalReadAmount);
160 
161                 if (close) {
162                     closeOnRead(pipeline);
163                     close = false;
164                 }
165             } catch (Throwable t) {
166                 handleReadException(pipeline, byteBuf, t, close);
167             } finally {
168                 // Check if there is a readPending which was not processed yet.
169                 // This could be for two reasons:
170                 // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
171                 // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
172                 //
173                 // See https://github.com/netty/netty/issues/2254
174                 if (!config.isAutoRead() && !isReadPending()) {
175                     removeReadOp();
176                 }
177             }
178         }
179     }
180 
181     @Override
182     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
183         int writeSpinCount = -1;
184 
185         boolean setOpWrite = false;
186         for (;;) {
187             Object msg = in.current();
188             if (msg == null) {
189                 // Wrote all messages.
190                 clearOpWrite();
191                 // Directly return here so incompleteWrite(...) is not called.
192                 return;
193             }
194 
195             if (msg instanceof ByteBuf) {
196                 ByteBuf buf = (ByteBuf) msg;
197                 int readableBytes = buf.readableBytes();
198                 if (readableBytes == 0) {
199                     in.remove();
200                     continue;
201                 }
202 
203                 boolean done = false;
204                 long flushedAmount = 0;
205                 if (writeSpinCount == -1) {
206                     writeSpinCount = config().getWriteSpinCount();
207                 }
208                 for (int i = writeSpinCount - 1; i >= 0; i --) {
209                     int localFlushedAmount = doWriteBytes(buf);
210                     if (localFlushedAmount == 0) {
211                         setOpWrite = true;
212                         break;
213                     }
214 
215                     flushedAmount += localFlushedAmount;
216                     if (!buf.isReadable()) {
217                         done = true;
218                         break;
219                     }
220                 }
221 
222                 in.progress(flushedAmount);
223 
224                 if (done) {
225                     in.remove();
226                 } else {
227                     // Break the loop and so incompleteWrite(...) is called.
228                     break;
229                 }
230             } else if (msg instanceof FileRegion) {
231                 FileRegion region = (FileRegion) msg;
232                 boolean done = region.transfered() >= region.count();
233 
234                 if (!done) {
235                     long flushedAmount = 0;
236                     if (writeSpinCount == -1) {
237                         writeSpinCount = config().getWriteSpinCount();
238                     }
239 
240                     for (int i = writeSpinCount - 1; i >= 0; i--) {
241                         long localFlushedAmount = doWriteFileRegion(region);
242                         if (localFlushedAmount == 0) {
243                             setOpWrite = true;
244                             break;
245                         }
246 
247                         flushedAmount += localFlushedAmount;
248                         if (region.transfered() >= region.count()) {
249                             done = true;
250                             break;
251                         }
252                     }
253 
254                     in.progress(flushedAmount);
255                 }
256 
257                 if (done) {
258                     in.remove();
259                 } else {
260                     // Break the loop and so incompleteWrite(...) is called.
261                     break;
262                 }
263             } else {
264                 // Should not reach here.
265                 throw new Error();
266             }
267         }
268         incompleteWrite(setOpWrite);
269     }
270 
271     @Override
272     protected final Object filterOutboundMessage(Object msg) {
273         if (msg instanceof ByteBuf) {
274             ByteBuf buf = (ByteBuf) msg;
275             if (buf.isDirect()) {
276                 return msg;
277             }
278 
279             return newDirectBuffer(buf);
280         }
281 
282         if (msg instanceof FileRegion) {
283             return msg;
284         }
285 
286         throw new UnsupportedOperationException(
287                 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
288     }
289 
290     protected final void incompleteWrite(boolean setOpWrite) {
291         // Did not write completely.
292         if (setOpWrite) {
293             setOpWrite();
294         } else {
295             // Schedule flush again later so other tasks can be picked up in the meantime
296             Runnable flushTask = this.flushTask;
297             if (flushTask == null) {
298                 flushTask = this.flushTask = new Runnable() {
299                     @Override
300                     public void run() {
301                         flush();
302                     }
303                 };
304             }
305             eventLoop().execute(flushTask);
306         }
307     }
308 
309     /**
310      * Write a {@link FileRegion}
311      *
312      * @param region        the {@link FileRegion} from which the bytes should be written
313      * @return amount       the amount of written bytes
314      */
315     protected abstract long doWriteFileRegion(FileRegion region) throws Exception;
316 
317     /**
318      * Read bytes into the given {@link ByteBuf} and return the amount.
319      */
320     protected abstract int doReadBytes(ByteBuf buf) throws Exception;
321 
322     /**
323      * Write bytes form the given {@link ByteBuf} to the underlying {@link java.nio.channels.Channel}.
324      * @param buf           the {@link ByteBuf} from which the bytes should be written
325      * @return amount       the amount of written bytes
326      */
327     protected abstract int doWriteBytes(ByteBuf buf) throws Exception;
328 
329     protected final void setOpWrite() {
330         final SelectionKey key = selectionKey();
331         // Check first if the key is still valid as it may be canceled as part of the deregistration
332         // from the EventLoop
333         // See https://github.com/netty/netty/issues/2104
334         if (!key.isValid()) {
335             return;
336         }
337         final int interestOps = key.interestOps();
338         if ((interestOps & SelectionKey.OP_WRITE) == 0) {
339             key.interestOps(interestOps | SelectionKey.OP_WRITE);
340         }
341     }
342 
343     protected final void clearOpWrite() {
344         final SelectionKey key = selectionKey();
345         // Check first if the key is still valid as it may be canceled as part of the deregistration
346         // from the EventLoop
347         // See https://github.com/netty/netty/issues/2104
348         if (!key.isValid()) {
349             return;
350         }
351         final int interestOps = key.interestOps();
352         if ((interestOps & SelectionKey.OP_WRITE) != 0) {
353             key.interestOps(interestOps & ~SelectionKey.OP_WRITE);
354         }
355     }
356 }