View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.oio;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.channel.Channel;
21  import io.netty.channel.ChannelConfig;
22  import io.netty.channel.ChannelFuture;
23  import io.netty.channel.ChannelMetadata;
24  import io.netty.channel.ChannelOption;
25  import io.netty.channel.ChannelOutboundBuffer;
26  import io.netty.channel.ChannelPipeline;
27  import io.netty.channel.FileRegion;
28  import io.netty.channel.RecvByteBufAllocator;
29  import io.netty.channel.socket.ChannelInputShutdownEvent;
30  import io.netty.channel.socket.ChannelInputShutdownReadComplete;
31  import io.netty.util.LeakPresenceDetector;
32  import io.netty.util.internal.StringUtil;
33  
34  import java.io.IOException;
35  
36  /**
37   * Abstract base class for OIO which reads and writes bytes from/to a Socket
38   *
39   * @deprecated use NIO / EPOLL / KQUEUE transport.
40   */
41  public abstract class AbstractOioByteChannel extends AbstractOioChannel {
42  
43      private static final ChannelMetadata METADATA = new ChannelMetadata(false);
44      private static final String EXPECTED_TYPES =
45              " (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " +
46              StringUtil.simpleClassName(FileRegion.class) + ')';
47  
48      /**
49       * @see AbstractOioByteChannel#AbstractOioByteChannel(Channel)
50       */
51      protected AbstractOioByteChannel(Channel parent) {
52          super(parent);
53      }
54  
55      @Override
56      public ChannelMetadata metadata() {
57          return METADATA;
58      }
59  
60      /**
61       * Determine if the input side of this channel is shutdown.
62       * @return {@code true} if the input side of this channel is shutdown.
63       */
64      protected abstract boolean isInputShutdown();
65  
66      /**
67       * Shutdown the input side of this channel.
68       * @return A channel future that will complete when the shutdown is complete.
69       */
70      protected abstract ChannelFuture shutdownInput();
71  
72      private void closeOnRead(ChannelPipeline pipeline) {
73          if (isOpen()) {
74              if (Boolean.TRUE.equals(config().getOption(ChannelOption.ALLOW_HALF_CLOSURE))) {
75                  shutdownInput();
76                  pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
77              } else {
78                  unsafe().close(unsafe().voidPromise());
79              }
80              pipeline.fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
81          }
82      }
83  
84      private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close,
85              RecvByteBufAllocator.Handle allocHandle) {
86          if (byteBuf != null) {
87              if (byteBuf.isReadable()) {
88                  readPending = false;
89                  pipeline.fireChannelRead(byteBuf);
90              } else {
91                  byteBuf.release();
92              }
93          }
94          allocHandle.readComplete();
95          pipeline.fireChannelReadComplete();
96          pipeline.fireExceptionCaught(cause);
97  
98          // If oom will close the read event, release connection.
99          // See https://github.com/netty/netty/issues/10434
100         if (close ||
101                 cause instanceof OutOfMemoryError ||
102                 cause instanceof LeakPresenceDetector.AllocationProhibitedException ||
103                 cause instanceof IOException) {
104             closeOnRead(pipeline);
105         }
106     }
107 
108     @Override
109     protected void doRead() {
110         final ChannelConfig config = config();
111         if (isInputShutdown() || !readPending) {
112             // We have to check readPending here because the Runnable to read could have been scheduled and later
113             // during the same read loop readPending was set to false.
114             return;
115         }
116         // In OIO we should set readPending to false even if the read was not successful so we can schedule
117         // another read on the event loop if no reads are done.
118         readPending = false;
119 
120         final ChannelPipeline pipeline = pipeline();
121         final ByteBufAllocator allocator = config.getAllocator();
122         final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
123         allocHandle.reset(config);
124 
125         ByteBuf byteBuf = null;
126         boolean close = false;
127         boolean readData = false;
128         try {
129             byteBuf = allocHandle.allocate(allocator);
130             do {
131                 allocHandle.lastBytesRead(doReadBytes(byteBuf));
132                 if (allocHandle.lastBytesRead() <= 0) {
133                     if (!byteBuf.isReadable()) { // nothing was read. release the buffer.
134                         byteBuf.release();
135                         byteBuf = null;
136                         close = allocHandle.lastBytesRead() < 0;
137                         if (close) {
138                             // There is nothing left to read as we received an EOF.
139                             readPending = false;
140                         }
141                     }
142                     break;
143                 } else {
144                     readData = true;
145                 }
146 
147                 final int available = available();
148                 if (available <= 0) {
149                     break;
150                 }
151 
152                 // Oio collects consecutive read operations into 1 ByteBuf before propagating up the pipeline.
153                 if (!byteBuf.isWritable()) {
154                     final int capacity = byteBuf.capacity();
155                     final int maxCapacity = byteBuf.maxCapacity();
156                     if (capacity == maxCapacity) {
157                         allocHandle.incMessagesRead(1);
158                         readPending = false;
159                         pipeline.fireChannelRead(byteBuf);
160                         byteBuf = allocHandle.allocate(allocator);
161                     } else {
162                         final int writerIndex = byteBuf.writerIndex();
163                         if (writerIndex + available > maxCapacity) {
164                             byteBuf.capacity(maxCapacity);
165                         } else {
166                             byteBuf.ensureWritable(available);
167                         }
168                     }
169                 }
170             } while (allocHandle.continueReading());
171 
172             if (byteBuf != null) {
173                 // It is possible we allocated a buffer because the previous one was not writable, but then didn't use
174                 // it because allocHandle.continueReading() returned false.
175                 if (byteBuf.isReadable()) {
176                     readPending = false;
177                     pipeline.fireChannelRead(byteBuf);
178                 } else {
179                     byteBuf.release();
180                 }
181                 byteBuf = null;
182             }
183 
184             if (readData) {
185                 allocHandle.readComplete();
186                 pipeline.fireChannelReadComplete();
187             }
188 
189             if (close) {
190                 closeOnRead(pipeline);
191             }
192         } catch (Throwable t) {
193             handleReadException(pipeline, byteBuf, t, close, allocHandle);
194         } finally {
195             if (readPending || config.isAutoRead() || !readData && isActive()) {
196                 // Reading 0 bytes could mean there is a SocketTimeout and no data was actually read, so we
197                 // should execute read() again because no data may have been read.
198                 read();
199             }
200         }
201     }
202 
203     @Override
204     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
205         for (;;) {
206             Object msg = in.current();
207             if (msg == null) {
208                 // nothing left to write
209                 break;
210             }
211             if (msg instanceof ByteBuf) {
212                 ByteBuf buf = (ByteBuf) msg;
213                 int readableBytes = buf.readableBytes();
214                 while (readableBytes > 0) {
215                     doWriteBytes(buf);
216                     int newReadableBytes = buf.readableBytes();
217                     in.progress(readableBytes - newReadableBytes);
218                     readableBytes = newReadableBytes;
219                 }
220                 in.remove();
221             } else if (msg instanceof FileRegion) {
222                 FileRegion region = (FileRegion) msg;
223                 long transferred = region.transferred();
224                 doWriteFileRegion(region);
225                 in.progress(region.transferred() - transferred);
226                 in.remove();
227             } else {
228                 in.remove(new UnsupportedOperationException(
229                         "unsupported message type: " + StringUtil.simpleClassName(msg)));
230             }
231         }
232     }
233 
234     @Override
235     protected final Object filterOutboundMessage(Object msg) throws Exception {
236         if (msg instanceof ByteBuf || msg instanceof FileRegion) {
237             return msg;
238         }
239 
240         throw new UnsupportedOperationException(
241                 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
242     }
243 
244     /**
245      * Return the number of bytes ready to read from the underlying Socket.
246      */
247     protected abstract int available();
248 
249     /**
250      * Read bytes from the underlying Socket.
251      *
252      * @param buf           the {@link ByteBuf} into which the read bytes will be written
253      * @return amount       the number of bytes read. This may return a negative amount if the underlying
254      *                      Socket was closed
255      * @throws Exception    is thrown if an error occurred
256      */
257     protected abstract int doReadBytes(ByteBuf buf) throws Exception;
258 
259     /**
260      * Write the data which is hold by the {@link ByteBuf} to the underlying Socket.
261      *
262      * @param buf           the {@link ByteBuf} which holds the data to transfer
263      * @throws Exception    is thrown if an error occurred
264      */
265     protected abstract void doWriteBytes(ByteBuf buf) throws Exception;
266 
267     /**
268      * Write the data which is hold by the {@link FileRegion} to the underlying Socket.
269      *
270      * @param region        the {@link FileRegion} which holds the data to transfer
271      * @throws Exception    is thrown if an error occurred
272      */
273     protected abstract void doWriteFileRegion(FileRegion region) throws Exception;
274 }