View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.nio;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.channel.Channel;
21  import io.netty.channel.ChannelConfig;
22  import io.netty.channel.ChannelOption;
23  import io.netty.channel.ChannelOutboundBuffer;
24  import io.netty.channel.ChannelPipeline;
25  import io.netty.channel.FileRegion;
26  import io.netty.channel.RecvByteBufAllocator;
27  import io.netty.channel.socket.ChannelInputShutdownEvent;
28  import io.netty.util.internal.StringUtil;
29  
30  import java.io.IOException;
31  import java.nio.channels.SelectableChannel;
32  import java.nio.channels.SelectionKey;
33  
34  /**
35   * {@link AbstractNioChannel} base class for {@link Channel}s that operate on bytes.
36   */
37  public abstract class AbstractNioByteChannel extends AbstractNioChannel {
38  
39      private static final String EXPECTED_TYPES =
40              " (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " +
41              StringUtil.simpleClassName(FileRegion.class) + ')';
42  
43      private Runnable flushTask;
44  
45      /**
46       * Create a new instance
47       *
48       * @param parent            the parent {@link Channel} by which this instance was created. May be {@code null}
49       * @param ch                the underlying {@link SelectableChannel} on which it operates
50       */
51      protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
52          super(parent, ch, SelectionKey.OP_READ);
53      }
54  
55      @Override
56      protected AbstractNioUnsafe newUnsafe() {
57          return new NioByteUnsafe();
58      }
59  
60      protected class NioByteUnsafe extends AbstractNioUnsafe {
61  
62          private void closeOnRead(ChannelPipeline pipeline) {
63              SelectionKey key = selectionKey();
64              setInputShutdown();
65              if (isOpen()) {
66                  if (Boolean.TRUE.equals(config().getOption(ChannelOption.ALLOW_HALF_CLOSURE))) {
67                      key.interestOps(key.interestOps() & ~readInterestOp);
68                      pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
69                  } else {
70                      close(voidPromise());
71                  }
72              }
73          }
74  
75          private void handleReadException(ChannelPipeline pipeline,
76                                           ByteBuf byteBuf, Throwable cause, boolean close) {
77              if (byteBuf != null) {
78                  if (byteBuf.isReadable()) {
79                      setReadPending(false);
80                      pipeline.fireChannelRead(byteBuf);
81                  } else {
82                      byteBuf.release();
83                  }
84              }
85              pipeline.fireChannelReadComplete();
86              pipeline.fireExceptionCaught(cause);
87              if (close || cause instanceof IOException) {
88                  closeOnRead(pipeline);
89              }
90          }
91  
92          @Override
93          public final void read() {
94              final ChannelConfig config = config();
95              if (!config.isAutoRead() && !isReadPending()) {
96                  // ChannelConfig.setAutoRead(false) was called in the meantime
97                  removeReadOp();
98                  return;
99              }
100 
101             final ChannelPipeline pipeline = pipeline();
102             final ByteBufAllocator allocator = config.getAllocator();
103             final int maxMessagesPerRead = config.getMaxMessagesPerRead();
104             RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
105 
106             ByteBuf byteBuf = null;
107             int messages = 0;
108             boolean close = false;
109             try {
110                 int totalReadAmount = 0;
111                 boolean readPendingReset = false;
112                 do {
113                     byteBuf = allocHandle.allocate(allocator);
114                     int writable = byteBuf.writableBytes();
115                     int localReadAmount = doReadBytes(byteBuf);
116                     if (localReadAmount <= 0) {
117                         // not was read release the buffer
118                         byteBuf.release();
119                         byteBuf = null;
120                         close = localReadAmount < 0;
121                         break;
122                     }
123                     if (!readPendingReset) {
124                         readPendingReset = true;
125                         setReadPending(false);
126                     }
127                     pipeline.fireChannelRead(byteBuf);
128                     byteBuf = null;
129 
130                     if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) {
131                         // Avoid overflow.
132                         totalReadAmount = Integer.MAX_VALUE;
133                         break;
134                     }
135 
136                     totalReadAmount += localReadAmount;
137 
138                     // stop reading
139                     if (!config.isAutoRead()) {
140                         break;
141                     }
142 
143                     if (localReadAmount < writable) {
144                         // Read less than what the buffer can hold,
145                         // which might mean we drained the recv buffer completely.
146                         break;
147                     }
148                 } while (++ messages < maxMessagesPerRead);
149 
150                 pipeline.fireChannelReadComplete();
151                 allocHandle.record(totalReadAmount);
152 
153                 if (close) {
154                     closeOnRead(pipeline);
155                     close = false;
156                 }
157             } catch (Throwable t) {
158                 handleReadException(pipeline, byteBuf, t, close);
159             } finally {
160                 // Check if there is a readPending which was not processed yet.
161                 // This could be for two reasons:
162                 // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
163                 // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
164                 //
165                 // See https://github.com/netty/netty/issues/2254
166                 if (!config.isAutoRead() && !isReadPending()) {
167                     removeReadOp();
168                 }
169             }
170         }
171     }
172 
173     @Override
174     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
175         int writeSpinCount = -1;
176 
177         for (;;) {
178             Object msg = in.current();
179             if (msg == null) {
180                 // Wrote all messages.
181                 clearOpWrite();
182                 break;
183             }
184 
185             if (msg instanceof ByteBuf) {
186                 ByteBuf buf = (ByteBuf) msg;
187                 int readableBytes = buf.readableBytes();
188                 if (readableBytes == 0) {
189                     in.remove();
190                     continue;
191                 }
192 
193                 boolean setOpWrite = false;
194                 boolean done = false;
195                 long flushedAmount = 0;
196                 if (writeSpinCount == -1) {
197                     writeSpinCount = config().getWriteSpinCount();
198                 }
199                 for (int i = writeSpinCount - 1; i >= 0; i --) {
200                     int localFlushedAmount = doWriteBytes(buf);
201                     if (localFlushedAmount == 0) {
202                         setOpWrite = true;
203                         break;
204                     }
205 
206                     flushedAmount += localFlushedAmount;
207                     if (!buf.isReadable()) {
208                         done = true;
209                         break;
210                     }
211                 }
212 
213                 in.progress(flushedAmount);
214 
215                 if (done) {
216                     in.remove();
217                 } else {
218                     incompleteWrite(setOpWrite);
219                     break;
220                 }
221             } else if (msg instanceof FileRegion) {
222                 FileRegion region = (FileRegion) msg;
223                 boolean done = region.transfered() >= region.count();
224                 boolean setOpWrite = false;
225 
226                 if (!done) {
227                     long flushedAmount = 0;
228                     if (writeSpinCount == -1) {
229                         writeSpinCount = config().getWriteSpinCount();
230                     }
231 
232                     for (int i = writeSpinCount - 1; i >= 0; i--) {
233                         long localFlushedAmount = doWriteFileRegion(region);
234                         if (localFlushedAmount == 0) {
235                             setOpWrite = true;
236                             break;
237                         }
238 
239                         flushedAmount += localFlushedAmount;
240                         if (region.transfered() >= region.count()) {
241                             done = true;
242                             break;
243                         }
244                     }
245 
246                     in.progress(flushedAmount);
247                 }
248 
249                 if (done) {
250                     in.remove();
251                 } else {
252                     incompleteWrite(setOpWrite);
253                     break;
254                 }
255             } else {
256                 // Should not reach here.
257                 throw new Error();
258             }
259         }
260     }
261 
262     @Override
263     protected final Object filterOutboundMessage(Object msg) {
264         if (msg instanceof ByteBuf) {
265             ByteBuf buf = (ByteBuf) msg;
266             if (buf.isDirect()) {
267                 return msg;
268             }
269 
270             return newDirectBuffer(buf);
271         }
272 
273         if (msg instanceof FileRegion) {
274             return msg;
275         }
276 
277         throw new UnsupportedOperationException(
278                 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
279     }
280 
281     protected final void incompleteWrite(boolean setOpWrite) {
282         // Did not write completely.
283         if (setOpWrite) {
284             setOpWrite();
285         } else {
286             // Schedule flush again later so other tasks can be picked up in the meantime
287             Runnable flushTask = this.flushTask;
288             if (flushTask == null) {
289                 flushTask = this.flushTask = new Runnable() {
290                     @Override
291                     public void run() {
292                         flush();
293                     }
294                 };
295             }
296             eventLoop().execute(flushTask);
297         }
298     }
299 
300     /**
301      * Write a {@link FileRegion}
302      *
303      * @param region        the {@link FileRegion} from which the bytes should be written
304      * @return amount       the amount of written bytes
305      */
306     protected abstract long doWriteFileRegion(FileRegion region) throws Exception;
307 
308     /**
309      * Read bytes into the given {@link ByteBuf} and return the amount.
310      */
311     protected abstract int doReadBytes(ByteBuf buf) throws Exception;
312 
313     /**
314      * Write bytes form the given {@link ByteBuf} to the underlying {@link java.nio.channels.Channel}.
315      * @param buf           the {@link ByteBuf} from which the bytes should be written
316      * @return amount       the amount of written bytes
317      */
318     protected abstract int doWriteBytes(ByteBuf buf) throws Exception;
319 
320     protected final void setOpWrite() {
321         final SelectionKey key = selectionKey();
322         // Check first if the key is still valid as it may be canceled as part of the deregistration
323         // from the EventLoop
324         // See https://github.com/netty/netty/issues/2104
325         if (!key.isValid()) {
326             return;
327         }
328         final int interestOps = key.interestOps();
329         if ((interestOps & SelectionKey.OP_WRITE) == 0) {
330             key.interestOps(interestOps | SelectionKey.OP_WRITE);
331         }
332     }
333 
334     protected final void clearOpWrite() {
335         final SelectionKey key = selectionKey();
336         // Check first if the key is still valid as it may be canceled as part of the deregistration
337         // from the EventLoop
338         // See https://github.com/netty/netty/issues/2104
339         if (!key.isValid()) {
340             return;
341         }
342         final int interestOps = key.interestOps();
343         if ((interestOps & SelectionKey.OP_WRITE) != 0) {
344             key.interestOps(interestOps & ~SelectionKey.OP_WRITE);
345         }
346     }
347 }