View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.oio;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.channel.Channel;
20  import io.netty.channel.ChannelConfig;
21  import io.netty.channel.ChannelMetadata;
22  import io.netty.channel.ChannelOption;
23  import io.netty.channel.ChannelOutboundBuffer;
24  import io.netty.channel.ChannelPipeline;
25  import io.netty.channel.FileRegion;
26  import io.netty.channel.RecvByteBufAllocator;
27  import io.netty.channel.socket.ChannelInputShutdownEvent;
28  import io.netty.util.internal.StringUtil;
29  
30  import java.io.IOException;
31  
32  /**
33   * Abstract base class for OIO which reads and writes bytes from/to a Socket
34   */
35  public abstract class AbstractOioByteChannel extends AbstractOioChannel {
36  
37      private static final ChannelMetadata METADATA = new ChannelMetadata(false);
38      private static final String EXPECTED_TYPES =
39              " (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " +
40              StringUtil.simpleClassName(FileRegion.class) + ')';
41  
42      private RecvByteBufAllocator.Handle allocHandle;
43      private volatile boolean inputShutdown;
44  
45      /**
46       * @see AbstractOioByteChannel#AbstractOioByteChannel(Channel)
47       */
48      protected AbstractOioByteChannel(Channel parent) {
49          super(parent);
50      }
51  
52      protected boolean isInputShutdown() {
53          return inputShutdown;
54      }
55  
56      @Override
57      public ChannelMetadata metadata() {
58          return METADATA;
59      }
60  
61      /**
62       * Check if the input was shutdown and if so return {@code true}. The default implementation sleeps also for
63       * {@link #SO_TIMEOUT} milliseconds to simulate some blocking.
64       */
65      protected boolean checkInputShutdown() {
66          if (inputShutdown) {
67              try {
68                  Thread.sleep(SO_TIMEOUT);
69              } catch (InterruptedException e) {
70                  // ignore
71              }
72              return true;
73          }
74          return false;
75      }
76  
77      @Override
78      protected void doRead() {
79          if (checkInputShutdown()) {
80              return;
81          }
82          final ChannelConfig config = config();
83          final ChannelPipeline pipeline = pipeline();
84  
85          RecvByteBufAllocator.Handle allocHandle = this.allocHandle;
86          if (allocHandle == null) {
87              this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();
88          }
89  
90          ByteBuf byteBuf = allocHandle.allocate(alloc());
91  
92          boolean closed = false;
93          boolean read = false;
94          Throwable exception = null;
95          int localReadAmount = 0;
96          try {
97              int totalReadAmount = 0;
98  
99              for (;;) {
100                 localReadAmount = doReadBytes(byteBuf);
101                 if (localReadAmount > 0) {
102                     read = true;
103                 } else if (localReadAmount < 0) {
104                     closed = true;
105                 }
106 
107                 final int available = available();
108                 if (available <= 0) {
109                     break;
110                 }
111 
112                 if (!byteBuf.isWritable()) {
113                     final int capacity = byteBuf.capacity();
114                     final int maxCapacity = byteBuf.maxCapacity();
115                     if (capacity == maxCapacity) {
116                         if (read) {
117                             read = false;
118                             pipeline.fireChannelRead(byteBuf);
119                             byteBuf = alloc().buffer();
120                         }
121                     } else {
122                         final int writerIndex = byteBuf.writerIndex();
123                         if (writerIndex + available > maxCapacity) {
124                             byteBuf.capacity(maxCapacity);
125                         } else {
126                             byteBuf.ensureWritable(available);
127                         }
128                     }
129                 }
130 
131                 if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) {
132                     // Avoid overflow.
133                     totalReadAmount = Integer.MAX_VALUE;
134                     break;
135                 }
136 
137                 totalReadAmount += localReadAmount;
138 
139                 if (!config.isAutoRead()) {
140                     // stop reading until next Channel.read() call
141                     // See https://github.com/netty/netty/issues/1363
142                     break;
143                 }
144             }
145             allocHandle.record(totalReadAmount);
146 
147         } catch (Throwable t) {
148             exception = t;
149         } finally {
150             if (read) {
151                 pipeline.fireChannelRead(byteBuf);
152             } else {
153                 // nothing read into the buffer so release it
154                 byteBuf.release();
155             }
156 
157             pipeline.fireChannelReadComplete();
158             if (exception != null) {
159                 if (exception instanceof IOException) {
160                     closed = true;
161                     pipeline().fireExceptionCaught(exception);
162                 } else {
163                     pipeline.fireExceptionCaught(exception);
164                     unsafe().close(voidPromise());
165                 }
166             }
167 
168             if (closed) {
169                 // There is nothing left to read as we received an EOF.
170                 setReadPending(false);
171 
172                 inputShutdown = true;
173                 if (isOpen()) {
174                     if (Boolean.TRUE.equals(config().getOption(ChannelOption.ALLOW_HALF_CLOSURE))) {
175                         pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
176                     } else {
177                         unsafe().close(unsafe().voidPromise());
178                     }
179                 }
180             }
181             if (localReadAmount == 0 && isActive()) {
182                 // If the read amount was 0 and the channel is still active we need to trigger a new read()
183                 // as otherwise we will never try to read again and the user will never know.
184                 // Just call read() is ok here as it will be submitted to the EventLoop as a task and so we are
185                 // able to process the rest of the tasks in the queue first.
186                 //
187                 // See https://github.com/netty/netty/issues/2404
188                 read();
189             }
190         }
191     }
192 
193     @Override
194     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
195         for (;;) {
196             Object msg = in.current();
197             if (msg == null) {
198                 // nothing left to write
199                 break;
200             }
201             if (msg instanceof ByteBuf) {
202                 ByteBuf buf = (ByteBuf) msg;
203                 int readableBytes = buf.readableBytes();
204                 while (readableBytes > 0) {
205                     doWriteBytes(buf);
206                     int newReadableBytes = buf.readableBytes();
207                     in.progress(readableBytes - newReadableBytes);
208                     readableBytes = newReadableBytes;
209                 }
210                 in.remove();
211             } else if (msg instanceof FileRegion) {
212                 FileRegion region = (FileRegion) msg;
213                 long transfered = region.transfered();
214                 doWriteFileRegion(region);
215                 in.progress(region.transfered() - transfered);
216                 in.remove();
217             } else {
218                 in.remove(new UnsupportedOperationException(
219                         "unsupported message type: " + StringUtil.simpleClassName(msg)));
220             }
221         }
222     }
223 
224     @Override
225     protected final Object filterOutboundMessage(Object msg) throws Exception {
226         if (msg instanceof ByteBuf || msg instanceof FileRegion) {
227             return msg;
228         }
229 
230         throw new UnsupportedOperationException(
231                 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
232     }
233 
234     /**
235      * Return the number of bytes ready to read from the underlying Socket.
236      */
237     protected abstract int available();
238 
239     /**
240      * Read bytes from the underlying Socket.
241      *
242      * @param buf           the {@link ByteBuf} into which the read bytes will be written
243      * @return amount       the number of bytes read. This may return a negative amount if the underlying
244      *                      Socket was closed
245      * @throws Exception    is thrown if an error occurred
246      */
247     protected abstract int doReadBytes(ByteBuf buf) throws Exception;
248 
249     /**
250      * Write the data which is hold by the {@link ByteBuf} to the underlying Socket.
251      *
252      * @param buf           the {@link ByteBuf} which holds the data to transfer
253      * @throws Exception    is thrown if an error occurred
254      */
255     protected abstract void doWriteBytes(ByteBuf buf) throws Exception;
256 
257     /**
258      * Write the data which is hold by the {@link FileRegion} to the underlying Socket.
259      *
260      * @param region        the {@link FileRegion} which holds the data to transfer
261      * @throws Exception    is thrown if an error occurred
262      */
263     protected abstract void doWriteFileRegion(FileRegion region) throws Exception;
264 }