View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.oio;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.channel.Channel;
21  import io.netty.channel.ChannelConfig;
22  import io.netty.channel.ChannelFuture;
23  import io.netty.channel.ChannelMetadata;
24  import io.netty.channel.ChannelOption;
25  import io.netty.channel.ChannelOutboundBuffer;
26  import io.netty.channel.ChannelPipeline;
27  import io.netty.channel.FileRegion;
28  import io.netty.channel.RecvByteBufAllocator;
29  import io.netty.channel.socket.ChannelInputShutdownEvent;
30  import io.netty.channel.socket.ChannelInputShutdownReadComplete;
31  import io.netty.util.internal.StringUtil;
32  
33  import java.io.IOException;
34  
35  /**
36   * Abstract base class for OIO which reads and writes bytes from/to a Socket
37   *
38   * @deprecated use NIO / EPOLL / KQUEUE transport.
39   */
40  public abstract class AbstractOioByteChannel extends AbstractOioChannel {
41  
42      private static final ChannelMetadata METADATA = new ChannelMetadata(false);
43      private static final String EXPECTED_TYPES =
44              " (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " +
45              StringUtil.simpleClassName(FileRegion.class) + ')';
46  
47      /**
48       * @see AbstractOioByteChannel#AbstractOioByteChannel(Channel)
49       */
50      protected AbstractOioByteChannel(Channel parent) {
51          super(parent);
52      }
53  
54      @Override
55      public ChannelMetadata metadata() {
56          return METADATA;
57      }
58  
59      /**
60       * Determine if the input side of this channel is shutdown.
61       * @return {@code true} if the input side of this channel is shutdown.
62       */
63      protected abstract boolean isInputShutdown();
64  
65      /**
66       * Shutdown the input side of this channel.
67       * @return A channel future that will complete when the shutdown is complete.
68       */
69      protected abstract ChannelFuture shutdownInput();
70  
71      private void closeOnRead(ChannelPipeline pipeline) {
72          if (isOpen()) {
73              if (Boolean.TRUE.equals(config().getOption(ChannelOption.ALLOW_HALF_CLOSURE))) {
74                  shutdownInput();
75                  pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
76              } else {
77                  unsafe().close(unsafe().voidPromise());
78              }
79              pipeline.fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
80          }
81      }
82  
83      private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close,
84              RecvByteBufAllocator.Handle allocHandle) {
85          if (byteBuf != null) {
86              if (byteBuf.isReadable()) {
87                  readPending = false;
88                  pipeline.fireChannelRead(byteBuf);
89              } else {
90                  byteBuf.release();
91              }
92          }
93          allocHandle.readComplete();
94          pipeline.fireChannelReadComplete();
95          pipeline.fireExceptionCaught(cause);
96  
97          // If oom will close the read event, release connection.
98          // See https://github.com/netty/netty/issues/10434
99          if (close || cause instanceof OutOfMemoryError || cause instanceof IOException) {
100             closeOnRead(pipeline);
101         }
102     }
103 
104     @Override
105     protected void doRead() {
106         final ChannelConfig config = config();
107         if (isInputShutdown() || !readPending) {
108             // We have to check readPending here because the Runnable to read could have been scheduled and later
109             // during the same read loop readPending was set to false.
110             return;
111         }
112         // In OIO we should set readPending to false even if the read was not successful so we can schedule
113         // another read on the event loop if no reads are done.
114         readPending = false;
115 
116         final ChannelPipeline pipeline = pipeline();
117         final ByteBufAllocator allocator = config.getAllocator();
118         final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
119         allocHandle.reset(config);
120 
121         ByteBuf byteBuf = null;
122         boolean close = false;
123         boolean readData = false;
124         try {
125             byteBuf = allocHandle.allocate(allocator);
126             do {
127                 allocHandle.lastBytesRead(doReadBytes(byteBuf));
128                 if (allocHandle.lastBytesRead() <= 0) {
129                     if (!byteBuf.isReadable()) { // nothing was read. release the buffer.
130                         byteBuf.release();
131                         byteBuf = null;
132                         close = allocHandle.lastBytesRead() < 0;
133                         if (close) {
134                             // There is nothing left to read as we received an EOF.
135                             readPending = false;
136                         }
137                     }
138                     break;
139                 } else {
140                     readData = true;
141                 }
142 
143                 final int available = available();
144                 if (available <= 0) {
145                     break;
146                 }
147 
148                 // Oio collects consecutive read operations into 1 ByteBuf before propagating up the pipeline.
149                 if (!byteBuf.isWritable()) {
150                     final int capacity = byteBuf.capacity();
151                     final int maxCapacity = byteBuf.maxCapacity();
152                     if (capacity == maxCapacity) {
153                         allocHandle.incMessagesRead(1);
154                         readPending = false;
155                         pipeline.fireChannelRead(byteBuf);
156                         byteBuf = allocHandle.allocate(allocator);
157                     } else {
158                         final int writerIndex = byteBuf.writerIndex();
159                         if (writerIndex + available > maxCapacity) {
160                             byteBuf.capacity(maxCapacity);
161                         } else {
162                             byteBuf.ensureWritable(available);
163                         }
164                     }
165                 }
166             } while (allocHandle.continueReading());
167 
168             if (byteBuf != null) {
169                 // It is possible we allocated a buffer because the previous one was not writable, but then didn't use
170                 // it because allocHandle.continueReading() returned false.
171                 if (byteBuf.isReadable()) {
172                     readPending = false;
173                     pipeline.fireChannelRead(byteBuf);
174                 } else {
175                     byteBuf.release();
176                 }
177                 byteBuf = null;
178             }
179 
180             if (readData) {
181                 allocHandle.readComplete();
182                 pipeline.fireChannelReadComplete();
183             }
184 
185             if (close) {
186                 closeOnRead(pipeline);
187             }
188         } catch (Throwable t) {
189             handleReadException(pipeline, byteBuf, t, close, allocHandle);
190         } finally {
191             if (readPending || config.isAutoRead() || !readData && isActive()) {
192                 // Reading 0 bytes could mean there is a SocketTimeout and no data was actually read, so we
193                 // should execute read() again because no data may have been read.
194                 read();
195             }
196         }
197     }
198 
199     @Override
200     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
201         for (;;) {
202             Object msg = in.current();
203             if (msg == null) {
204                 // nothing left to write
205                 break;
206             }
207             if (msg instanceof ByteBuf) {
208                 ByteBuf buf = (ByteBuf) msg;
209                 int readableBytes = buf.readableBytes();
210                 while (readableBytes > 0) {
211                     doWriteBytes(buf);
212                     int newReadableBytes = buf.readableBytes();
213                     in.progress(readableBytes - newReadableBytes);
214                     readableBytes = newReadableBytes;
215                 }
216                 in.remove();
217             } else if (msg instanceof FileRegion) {
218                 FileRegion region = (FileRegion) msg;
219                 long transferred = region.transferred();
220                 doWriteFileRegion(region);
221                 in.progress(region.transferred() - transferred);
222                 in.remove();
223             } else {
224                 in.remove(new UnsupportedOperationException(
225                         "unsupported message type: " + StringUtil.simpleClassName(msg)));
226             }
227         }
228     }
229 
230     @Override
231     protected final Object filterOutboundMessage(Object msg) throws Exception {
232         if (msg instanceof ByteBuf || msg instanceof FileRegion) {
233             return msg;
234         }
235 
236         throw new UnsupportedOperationException(
237                 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
238     }
239 
240     /**
241      * Return the number of bytes ready to read from the underlying Socket.
242      */
243     protected abstract int available();
244 
245     /**
246      * Read bytes from the underlying Socket.
247      *
248      * @param buf           the {@link ByteBuf} into which the read bytes will be written
249      * @return amount       the number of bytes read. This may return a negative amount if the underlying
250      *                      Socket was closed
251      * @throws Exception    is thrown if an error occurred
252      */
253     protected abstract int doReadBytes(ByteBuf buf) throws Exception;
254 
255     /**
256      * Write the data which is hold by the {@link ByteBuf} to the underlying Socket.
257      *
258      * @param buf           the {@link ByteBuf} which holds the data to transfer
259      * @throws Exception    is thrown if an error occurred
260      */
261     protected abstract void doWriteBytes(ByteBuf buf) throws Exception;
262 
263     /**
264      * Write the data which is hold by the {@link FileRegion} to the underlying Socket.
265      *
266      * @param region        the {@link FileRegion} which holds the data to transfer
267      * @throws Exception    is thrown if an error occurred
268      */
269     protected abstract void doWriteFileRegion(FileRegion region) throws Exception;
270 }