View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.oio;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.channel.Channel;
21  import io.netty.channel.ChannelConfig;
22  import io.netty.channel.ChannelFuture;
23  import io.netty.channel.ChannelMetadata;
24  import io.netty.channel.ChannelOption;
25  import io.netty.channel.ChannelOutboundBuffer;
26  import io.netty.channel.ChannelPipeline;
27  import io.netty.channel.FileRegion;
28  import io.netty.channel.RecvByteBufAllocator;
29  import io.netty.channel.socket.ChannelInputShutdownEvent;
30  import io.netty.channel.socket.ChannelInputShutdownReadComplete;
31  import io.netty.util.internal.StringUtil;
32  
33  import java.io.IOException;
34  
35  /**
36   * Abstract base class for OIO which reads and writes bytes from/to a Socket
37   */
38  public abstract class AbstractOioByteChannel extends AbstractOioChannel {
39  
40      private static final ChannelMetadata METADATA = new ChannelMetadata(false);
41      private static final String EXPECTED_TYPES =
42              " (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " +
43              StringUtil.simpleClassName(FileRegion.class) + ')';
44  
45      /**
46       * @see AbstractOioByteChannel#AbstractOioByteChannel(Channel)
47       */
48      protected AbstractOioByteChannel(Channel parent) {
49          super(parent);
50      }
51  
52      @Override
53      public ChannelMetadata metadata() {
54          return METADATA;
55      }
56  
57      /**
58       * Determine if the input side of this channel is shutdown.
59       * @return {@code true} if the input side of this channel is shutdown.
60       */
61      protected abstract boolean isInputShutdown();
62  
63      /**
64       * Shutdown the input side of this channel.
65       * @return A channel future that will complete when the shutdown is complete.
66       */
67      protected abstract ChannelFuture shutdownInput();
68  
69      private void closeOnRead(ChannelPipeline pipeline) {
70          if (isOpen()) {
71              if (Boolean.TRUE.equals(config().getOption(ChannelOption.ALLOW_HALF_CLOSURE))) {
72                  shutdownInput();
73                  pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
74              } else {
75                  unsafe().close(unsafe().voidPromise());
76              }
77              pipeline.fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
78          }
79      }
80  
81      private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close,
82              RecvByteBufAllocator.Handle allocHandle) {
83          if (byteBuf != null) {
84              if (byteBuf.isReadable()) {
85                  readPending = false;
86                  pipeline.fireChannelRead(byteBuf);
87              } else {
88                  byteBuf.release();
89              }
90          }
91          allocHandle.readComplete();
92          pipeline.fireChannelReadComplete();
93          pipeline.fireExceptionCaught(cause);
94          if (close || cause instanceof IOException) {
95              closeOnRead(pipeline);
96          }
97      }
98  
99      @Override
100     protected void doRead() {
101         final ChannelConfig config = config();
102         if (isInputShutdown() || !readPending) {
103             // We have to check readPending here because the Runnable to read could have been scheduled and later
104             // during the same read loop readPending was set to false.
105             return;
106         }
107         // In OIO we should set readPending to false even if the read was not successful so we can schedule
108         // another read on the event loop if no reads are done.
109         readPending = false;
110 
111         final ChannelPipeline pipeline = pipeline();
112         final ByteBufAllocator allocator = config.getAllocator();
113         final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
114         allocHandle.reset(config);
115 
116         ByteBuf byteBuf = null;
117         boolean close = false;
118         boolean readData = false;
119         try {
120             byteBuf = allocHandle.allocate(allocator);
121             do {
122                 allocHandle.lastBytesRead(doReadBytes(byteBuf));
123                 if (allocHandle.lastBytesRead() <= 0) {
124                     if (!byteBuf.isReadable()) { // nothing was read. release the buffer.
125                         byteBuf.release();
126                         byteBuf = null;
127                         close = allocHandle.lastBytesRead() < 0;
128                         if (close) {
129                             // There is nothing left to read as we received an EOF.
130                             readPending = false;
131                         }
132                     }
133                     break;
134                 } else {
135                     readData = true;
136                 }
137 
138                 final int available = available();
139                 if (available <= 0) {
140                     break;
141                 }
142 
143                 // Oio collects consecutive read operations into 1 ByteBuf before propagating up the pipeline.
144                 if (!byteBuf.isWritable()) {
145                     final int capacity = byteBuf.capacity();
146                     final int maxCapacity = byteBuf.maxCapacity();
147                     if (capacity == maxCapacity) {
148                         allocHandle.incMessagesRead(1);
149                         readPending = false;
150                         pipeline.fireChannelRead(byteBuf);
151                         byteBuf = allocHandle.allocate(allocator);
152                     } else {
153                         final int writerIndex = byteBuf.writerIndex();
154                         if (writerIndex + available > maxCapacity) {
155                             byteBuf.capacity(maxCapacity);
156                         } else {
157                             byteBuf.ensureWritable(available);
158                         }
159                     }
160                 }
161             } while (allocHandle.continueReading());
162 
163             if (byteBuf != null) {
164                 // It is possible we allocated a buffer because the previous one was not writable, but then didn't use
165                 // it because allocHandle.continueReading() returned false.
166                 if (byteBuf.isReadable()) {
167                     readPending = false;
168                     pipeline.fireChannelRead(byteBuf);
169                 } else {
170                     byteBuf.release();
171                 }
172                 byteBuf = null;
173             }
174 
175             if (readData) {
176                 allocHandle.readComplete();
177                 pipeline.fireChannelReadComplete();
178             }
179 
180             if (close) {
181                 closeOnRead(pipeline);
182             }
183         } catch (Throwable t) {
184             handleReadException(pipeline, byteBuf, t, close, allocHandle);
185         } finally {
186             if (readPending || config.isAutoRead() || !readData && isActive()) {
187                 // Reading 0 bytes could mean there is a SocketTimeout and no data was actually read, so we
188                 // should execute read() again because no data may have been read.
189                 read();
190             }
191         }
192     }
193 
194     @Override
195     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
196         for (;;) {
197             Object msg = in.current();
198             if (msg == null) {
199                 // nothing left to write
200                 break;
201             }
202             if (msg instanceof ByteBuf) {
203                 ByteBuf buf = (ByteBuf) msg;
204                 int readableBytes = buf.readableBytes();
205                 while (readableBytes > 0) {
206                     doWriteBytes(buf);
207                     int newReadableBytes = buf.readableBytes();
208                     in.progress(readableBytes - newReadableBytes);
209                     readableBytes = newReadableBytes;
210                 }
211                 in.remove();
212             } else if (msg instanceof FileRegion) {
213                 FileRegion region = (FileRegion) msg;
214                 long transferred = region.transferred();
215                 doWriteFileRegion(region);
216                 in.progress(region.transferred() - transferred);
217                 in.remove();
218             } else {
219                 in.remove(new UnsupportedOperationException(
220                         "unsupported message type: " + StringUtil.simpleClassName(msg)));
221             }
222         }
223     }
224 
225     @Override
226     protected final Object filterOutboundMessage(Object msg) throws Exception {
227         if (msg instanceof ByteBuf || msg instanceof FileRegion) {
228             return msg;
229         }
230 
231         throw new UnsupportedOperationException(
232                 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
233     }
234 
235     /**
236      * Return the number of bytes ready to read from the underlying Socket.
237      */
238     protected abstract int available();
239 
240     /**
241      * Read bytes from the underlying Socket.
242      *
243      * @param buf           the {@link ByteBuf} into which the read bytes will be written
244      * @return amount       the number of bytes read. This may return a negative amount if the underlying
245      *                      Socket was closed
246      * @throws Exception    is thrown if an error occurred
247      */
248     protected abstract int doReadBytes(ByteBuf buf) throws Exception;
249 
250     /**
251      * Write the data which is hold by the {@link ByteBuf} to the underlying Socket.
252      *
253      * @param buf           the {@link ByteBuf} which holds the data to transfer
254      * @throws Exception    is thrown if an error occurred
255      */
256     protected abstract void doWriteBytes(ByteBuf buf) throws Exception;
257 
258     /**
259      * Write the data which is hold by the {@link FileRegion} to the underlying Socket.
260      *
261      * @param region        the {@link FileRegion} which holds the data to transfer
262      * @throws Exception    is thrown if an error occurred
263      */
264     protected abstract void doWriteFileRegion(FileRegion region) throws Exception;
265 }