View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.oio;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.channel.Channel;
20  import io.netty.channel.ChannelConfig;
21  import io.netty.channel.ChannelMetadata;
22  import io.netty.channel.ChannelOption;
23  import io.netty.channel.ChannelOutboundBuffer;
24  import io.netty.channel.ChannelPipeline;
25  import io.netty.channel.FileRegion;
26  import io.netty.channel.RecvByteBufAllocator;
27  import io.netty.channel.socket.ChannelInputShutdownEvent;
28  import io.netty.util.internal.StringUtil;
29  
30  import java.io.IOException;
31  
32  /**
33   * Abstract base class for OIO which reads and writes bytes from/to a Socket
34   */
35  public abstract class AbstractOioByteChannel extends AbstractOioChannel {
36  
37      private static final ChannelMetadata METADATA = new ChannelMetadata(false);
38      private static final String EXPECTED_TYPES =
39              " (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " +
40              StringUtil.simpleClassName(FileRegion.class) + ')';
41  
42      private volatile boolean inputShutdown;
43  
44      /**
45       * @see AbstractOioByteChannel#AbstractOioByteChannel(Channel)
46       */
47      protected AbstractOioByteChannel(Channel parent) {
48          super(parent);
49      }
50  
51      protected boolean isInputShutdown() {
52          return inputShutdown;
53      }
54  
55      @Override
56      public ChannelMetadata metadata() {
57          return METADATA;
58      }
59  
60      /**
61       * Check if the input was shutdown and if so return {@code true}. The default implementation sleeps also for
62       * {@link #SO_TIMEOUT} milliseconds to simulate some blocking.
63       */
64      protected boolean checkInputShutdown() {
65          if (inputShutdown) {
66              try {
67                  Thread.sleep(SO_TIMEOUT);
68              } catch (InterruptedException e) {
69                  // ignore
70              }
71              return true;
72          }
73          return false;
74      }
75  
76      @Override
77      protected void doRead() {
78          if (checkInputShutdown()) {
79              return;
80          }
81          final ChannelConfig config = config();
82          final ChannelPipeline pipeline = pipeline();
83  
84          RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
85  
86          ByteBuf byteBuf = allocHandle.allocate(alloc());
87  
88          boolean closed = false;
89          boolean read = false;
90          Throwable exception = null;
91          int localReadAmount = 0;
92          try {
93              int totalReadAmount = 0;
94  
95              for (;;) {
96                  localReadAmount = doReadBytes(byteBuf);
97                  if (localReadAmount > 0) {
98                      read = true;
99                  } else if (localReadAmount < 0) {
100                     closed = true;
101                 }
102 
103                 final int available = available();
104                 if (available <= 0) {
105                     break;
106                 }
107 
108                 if (!byteBuf.isWritable()) {
109                     final int capacity = byteBuf.capacity();
110                     final int maxCapacity = byteBuf.maxCapacity();
111                     if (capacity == maxCapacity) {
112                         if (read) {
113                             read = false;
114                             pipeline.fireChannelRead(byteBuf);
115                             byteBuf = alloc().buffer();
116                         }
117                     } else {
118                         final int writerIndex = byteBuf.writerIndex();
119                         if (writerIndex + available > maxCapacity) {
120                             byteBuf.capacity(maxCapacity);
121                         } else {
122                             byteBuf.ensureWritable(available);
123                         }
124                     }
125                 }
126 
127                 if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) {
128                     // Avoid overflow.
129                     totalReadAmount = Integer.MAX_VALUE;
130                     break;
131                 }
132 
133                 totalReadAmount += localReadAmount;
134 
135                 if (!config.isAutoRead()) {
136                     // stop reading until next Channel.read() call
137                     // See https://github.com/netty/netty/issues/1363
138                     break;
139                 }
140             }
141             allocHandle.record(totalReadAmount);
142 
143         } catch (Throwable t) {
144             exception = t;
145         } finally {
146             if (read) {
147                 pipeline.fireChannelRead(byteBuf);
148             } else {
149                 // nothing read into the buffer so release it
150                 byteBuf.release();
151             }
152 
153             pipeline.fireChannelReadComplete();
154             if (exception != null) {
155                 if (exception instanceof IOException) {
156                     closed = true;
157                     pipeline().fireExceptionCaught(exception);
158                 } else {
159                     pipeline.fireExceptionCaught(exception);
160                     unsafe().close(voidPromise());
161                 }
162             }
163 
164             if (closed) {
165                 inputShutdown = true;
166                 if (isOpen()) {
167                     if (Boolean.TRUE.equals(config().getOption(ChannelOption.ALLOW_HALF_CLOSURE))) {
168                         pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
169                     } else {
170                         unsafe().close(unsafe().voidPromise());
171                     }
172                 }
173             }
174             if (localReadAmount == 0 && isActive()) {
175                 // If the read amount was 0 and the channel is still active we need to trigger a new read()
176                 // as otherwise we will never try to read again and the user will never know.
177                 // Just call read() is ok here as it will be submitted to the EventLoop as a task and so we are
178                 // able to process the rest of the tasks in the queue first.
179                 //
180                 // See https://github.com/netty/netty/issues/2404
181                 read();
182             }
183         }
184     }
185 
186     @Override
187     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
188         for (;;) {
189             Object msg = in.current();
190             if (msg == null) {
191                 // nothing left to write
192                 break;
193             }
194             if (msg instanceof ByteBuf) {
195                 ByteBuf buf = (ByteBuf) msg;
196                 int readableBytes = buf.readableBytes();
197                 while (readableBytes > 0) {
198                     doWriteBytes(buf);
199                     int newReadableBytes = buf.readableBytes();
200                     in.progress(readableBytes - newReadableBytes);
201                     readableBytes = newReadableBytes;
202                 }
203                 in.remove();
204             } else if (msg instanceof FileRegion) {
205                 FileRegion region = (FileRegion) msg;
206                 long transfered = region.transfered();
207                 doWriteFileRegion(region);
208                 in.progress(region.transfered() - transfered);
209                 in.remove();
210             } else {
211                 in.remove(new UnsupportedOperationException(
212                         "unsupported message type: " + StringUtil.simpleClassName(msg)));
213             }
214         }
215     }
216 
217     @Override
218     protected final Object filterOutboundMessage(Object msg) throws Exception {
219         if (msg instanceof ByteBuf || msg instanceof FileRegion) {
220             return msg;
221         }
222 
223         throw new UnsupportedOperationException(
224                 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
225     }
226 
227     /**
228      * Return the number of bytes ready to read from the underlying Socket.
229      */
230     protected abstract int available();
231 
232     /**
233      * Read bytes from the underlying Socket.
234      *
235      * @param buf           the {@link ByteBuf} into which the read bytes will be written
236      * @return amount       the number of bytes read. This may return a negative amount if the underlying
237      *                      Socket was closed
238      * @throws Exception    is thrown if an error occurred
239      */
240     protected abstract int doReadBytes(ByteBuf buf) throws Exception;
241 
242     /**
243      * Write the data which is hold by the {@link ByteBuf} to the underlying Socket.
244      *
245      * @param buf           the {@link ByteBuf} which holds the data to transfer
246      * @throws Exception    is thrown if an error occurred
247      */
248     protected abstract void doWriteBytes(ByteBuf buf) throws Exception;
249 
250     /**
251      * Write the data which is hold by the {@link FileRegion} to the underlying Socket.
252      *
253      * @param region        the {@link FileRegion} which holds the data to transfer
254      * @throws Exception    is thrown if an error occurred
255      */
256     protected abstract void doWriteFileRegion(FileRegion region) throws Exception;
257 }