View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.oio;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.channel.Channel;
21  import io.netty.channel.ChannelConfig;
22  import io.netty.channel.ChannelFuture;
23  import io.netty.channel.ChannelMetadata;
24  import io.netty.channel.ChannelOption;
25  import io.netty.channel.ChannelOutboundBuffer;
26  import io.netty.channel.ChannelPipeline;
27  import io.netty.channel.FileRegion;
28  import io.netty.channel.RecvByteBufAllocator;
29  import io.netty.channel.socket.ChannelInputShutdownEvent;
30  import io.netty.channel.socket.ChannelInputShutdownReadComplete;
31  import io.netty.util.internal.StringUtil;
32  
33  import java.io.IOException;
34  
35  /**
36   * Abstract base class for OIO which reads and writes bytes from/to a Socket
37   *
38   * @deprecated use NIO / EPOLL / KQUEUE transport.
39   */
40  public abstract class AbstractOioByteChannel extends AbstractOioChannel {
41  
42      private static final ChannelMetadata METADATA = new ChannelMetadata(false);
43      private static final String EXPECTED_TYPES =
44              " (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " +
45              StringUtil.simpleClassName(FileRegion.class) + ')';
46  
47      /**
48       * @see AbstractOioByteChannel#AbstractOioByteChannel(Channel)
49       */
50      protected AbstractOioByteChannel(Channel parent) {
51          super(parent);
52      }
53  
54      @Override
55      public ChannelMetadata metadata() {
56          return METADATA;
57      }
58  
59      /**
60       * Determine if the input side of this channel is shutdown.
61       * @return {@code true} if the input side of this channel is shutdown.
62       */
63      protected abstract boolean isInputShutdown();
64  
65      /**
66       * Shutdown the input side of this channel.
67       * @return A channel future that will complete when the shutdown is complete.
68       */
69      protected abstract ChannelFuture shutdownInput();
70  
71      private void closeOnRead(ChannelPipeline pipeline) {
72          if (isOpen()) {
73              if (Boolean.TRUE.equals(config().getOption(ChannelOption.ALLOW_HALF_CLOSURE))) {
74                  shutdownInput();
75                  pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
76              } else {
77                  unsafe().close(unsafe().voidPromise());
78              }
79              pipeline.fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
80          }
81      }
82  
83      private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close,
84              RecvByteBufAllocator.Handle allocHandle) {
85          if (byteBuf != null) {
86              if (byteBuf.isReadable()) {
87                  readPending = false;
88                  pipeline.fireChannelRead(byteBuf);
89              } else {
90                  byteBuf.release();
91              }
92          }
93          allocHandle.readComplete();
94          pipeline.fireChannelReadComplete();
95          pipeline.fireExceptionCaught(cause);
96          if (close || cause instanceof IOException) {
97              closeOnRead(pipeline);
98          }
99      }
100 
101     @Override
102     protected void doRead() {
103         final ChannelConfig config = config();
104         if (isInputShutdown() || !readPending) {
105             // We have to check readPending here because the Runnable to read could have been scheduled and later
106             // during the same read loop readPending was set to false.
107             return;
108         }
109         // In OIO we should set readPending to false even if the read was not successful so we can schedule
110         // another read on the event loop if no reads are done.
111         readPending = false;
112 
113         final ChannelPipeline pipeline = pipeline();
114         final ByteBufAllocator allocator = config.getAllocator();
115         final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
116         allocHandle.reset(config);
117 
118         ByteBuf byteBuf = null;
119         boolean close = false;
120         boolean readData = false;
121         try {
122             byteBuf = allocHandle.allocate(allocator);
123             do {
124                 allocHandle.lastBytesRead(doReadBytes(byteBuf));
125                 if (allocHandle.lastBytesRead() <= 0) {
126                     if (!byteBuf.isReadable()) { // nothing was read. release the buffer.
127                         byteBuf.release();
128                         byteBuf = null;
129                         close = allocHandle.lastBytesRead() < 0;
130                         if (close) {
131                             // There is nothing left to read as we received an EOF.
132                             readPending = false;
133                         }
134                     }
135                     break;
136                 } else {
137                     readData = true;
138                 }
139 
140                 final int available = available();
141                 if (available <= 0) {
142                     break;
143                 }
144 
145                 // Oio collects consecutive read operations into 1 ByteBuf before propagating up the pipeline.
146                 if (!byteBuf.isWritable()) {
147                     final int capacity = byteBuf.capacity();
148                     final int maxCapacity = byteBuf.maxCapacity();
149                     if (capacity == maxCapacity) {
150                         allocHandle.incMessagesRead(1);
151                         readPending = false;
152                         pipeline.fireChannelRead(byteBuf);
153                         byteBuf = allocHandle.allocate(allocator);
154                     } else {
155                         final int writerIndex = byteBuf.writerIndex();
156                         if (writerIndex + available > maxCapacity) {
157                             byteBuf.capacity(maxCapacity);
158                         } else {
159                             byteBuf.ensureWritable(available);
160                         }
161                     }
162                 }
163             } while (allocHandle.continueReading());
164 
165             if (byteBuf != null) {
166                 // It is possible we allocated a buffer because the previous one was not writable, but then didn't use
167                 // it because allocHandle.continueReading() returned false.
168                 if (byteBuf.isReadable()) {
169                     readPending = false;
170                     pipeline.fireChannelRead(byteBuf);
171                 } else {
172                     byteBuf.release();
173                 }
174                 byteBuf = null;
175             }
176 
177             if (readData) {
178                 allocHandle.readComplete();
179                 pipeline.fireChannelReadComplete();
180             }
181 
182             if (close) {
183                 closeOnRead(pipeline);
184             }
185         } catch (Throwable t) {
186             handleReadException(pipeline, byteBuf, t, close, allocHandle);
187         } finally {
188             if (readPending || config.isAutoRead() || !readData && isActive()) {
189                 // Reading 0 bytes could mean there is a SocketTimeout and no data was actually read, so we
190                 // should execute read() again because no data may have been read.
191                 read();
192             }
193         }
194     }
195 
196     @Override
197     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
198         for (;;) {
199             Object msg = in.current();
200             if (msg == null) {
201                 // nothing left to write
202                 break;
203             }
204             if (msg instanceof ByteBuf) {
205                 ByteBuf buf = (ByteBuf) msg;
206                 int readableBytes = buf.readableBytes();
207                 while (readableBytes > 0) {
208                     doWriteBytes(buf);
209                     int newReadableBytes = buf.readableBytes();
210                     in.progress(readableBytes - newReadableBytes);
211                     readableBytes = newReadableBytes;
212                 }
213                 in.remove();
214             } else if (msg instanceof FileRegion) {
215                 FileRegion region = (FileRegion) msg;
216                 long transferred = region.transferred();
217                 doWriteFileRegion(region);
218                 in.progress(region.transferred() - transferred);
219                 in.remove();
220             } else {
221                 in.remove(new UnsupportedOperationException(
222                         "unsupported message type: " + StringUtil.simpleClassName(msg)));
223             }
224         }
225     }
226 
227     @Override
228     protected final Object filterOutboundMessage(Object msg) throws Exception {
229         if (msg instanceof ByteBuf || msg instanceof FileRegion) {
230             return msg;
231         }
232 
233         throw new UnsupportedOperationException(
234                 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
235     }
236 
237     /**
238      * Return the number of bytes ready to read from the underlying Socket.
239      */
240     protected abstract int available();
241 
242     /**
243      * Read bytes from the underlying Socket.
244      *
245      * @param buf           the {@link ByteBuf} into which the read bytes will be written
246      * @return amount       the number of bytes read. This may return a negative amount if the underlying
247      *                      Socket was closed
248      * @throws Exception    is thrown if an error occurred
249      */
250     protected abstract int doReadBytes(ByteBuf buf) throws Exception;
251 
252     /**
253      * Write the data which is hold by the {@link ByteBuf} to the underlying Socket.
254      *
255      * @param buf           the {@link ByteBuf} which holds the data to transfer
256      * @throws Exception    is thrown if an error occurred
257      */
258     protected abstract void doWriteBytes(ByteBuf buf) throws Exception;
259 
260     /**
261      * Write the data which is hold by the {@link FileRegion} to the underlying Socket.
262      *
263      * @param region        the {@link FileRegion} which holds the data to transfer
264      * @throws Exception    is thrown if an error occurred
265      */
266     protected abstract void doWriteFileRegion(FileRegion region) throws Exception;
267 }