View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.oio;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.channel.Channel;
21  import io.netty.channel.ChannelConfig;
22  import io.netty.channel.ChannelFuture;
23  import io.netty.channel.ChannelMetadata;
24  import io.netty.channel.ChannelOption;
25  import io.netty.channel.ChannelOutboundBuffer;
26  import io.netty.channel.ChannelPipeline;
27  import io.netty.channel.FileRegion;
28  import io.netty.channel.RecvByteBufAllocator;
29  import io.netty.channel.socket.ChannelInputShutdownEvent;
30  import io.netty.util.internal.StringUtil;
31  
32  import java.io.IOException;
33  
34  /**
35   * Abstract base class for OIO which reads and writes bytes from/to a Socket
36   */
37  public abstract class AbstractOioByteChannel extends AbstractOioChannel {
38  
39      private static final ChannelMetadata METADATA = new ChannelMetadata(false);
40      private static final String EXPECTED_TYPES =
41              " (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " +
42              StringUtil.simpleClassName(FileRegion.class) + ')';
43  
44      /**
45       * @see AbstractOioByteChannel#AbstractOioByteChannel(Channel)
46       */
47      protected AbstractOioByteChannel(Channel parent) {
48          super(parent);
49      }
50  
51      @Override
52      public ChannelMetadata metadata() {
53          return METADATA;
54      }
55  
56      /**
57       * Determine if the input side of this channel is shutdown.
58       * @return {@code true} if the input side of this channel is shutdown.
59       */
60      protected abstract boolean isInputShutdown();
61  
62      /**
63       * Shutdown the input side of this channel.
64       * @return A channel future that will complete when the shutdown is complete.
65       */
66      protected abstract ChannelFuture shutdownInput();
67  
68      private void closeOnRead(ChannelPipeline pipeline) {
69          if (isOpen()) {
70              if (Boolean.TRUE.equals(config().getOption(ChannelOption.ALLOW_HALF_CLOSURE))) {
71                  shutdownInput();
72                  pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
73              } else {
74                  unsafe().close(unsafe().voidPromise());
75              }
76          }
77      }
78  
79      private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close,
80              RecvByteBufAllocator.Handle allocHandle) {
81          if (byteBuf != null) {
82              if (byteBuf.isReadable()) {
83                  readPending = false;
84                  pipeline.fireChannelRead(byteBuf);
85              } else {
86                  byteBuf.release();
87              }
88          }
89          allocHandle.readComplete();
90          pipeline.fireChannelReadComplete();
91          pipeline.fireExceptionCaught(cause);
92          if (close || cause instanceof IOException) {
93              closeOnRead(pipeline);
94          }
95      }
96  
97      @Override
98      protected void doRead() {
99          final ChannelConfig config = config();
100         if (isInputShutdown() || !readPending) {
101             // We have to check readPending here because the Runnable to read could have been scheduled and later
102             // during the same read loop readPending was set to false.
103             return;
104         }
105         // In OIO we should set readPending to false even if the read was not successful so we can schedule
106         // another read on the event loop if no reads are done.
107         readPending = false;
108 
109         final ChannelPipeline pipeline = pipeline();
110         final ByteBufAllocator allocator = config.getAllocator();
111         final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
112         allocHandle.reset(config);
113 
114         ByteBuf byteBuf = null;
115         boolean close = false;
116         boolean readData = false;
117         try {
118             byteBuf = allocHandle.allocate(allocator);
119             do {
120                 allocHandle.lastBytesRead(doReadBytes(byteBuf));
121                 if (allocHandle.lastBytesRead() <= 0) {
122                     if (!byteBuf.isReadable()) { // nothing was read. release the buffer.
123                         byteBuf.release();
124                         byteBuf = null;
125                         close = allocHandle.lastBytesRead() < 0;
126                         if (close) {
127                             // There is nothing left to read as we received an EOF.
128                             readPending = false;
129                         }
130                     }
131                     break;
132                 } else {
133                     readData = true;
134                 }
135 
136                 final int available = available();
137                 if (available <= 0) {
138                     break;
139                 }
140 
141                 // Oio collects consecutive read operations into 1 ByteBuf before propagating up the pipeline.
142                 if (!byteBuf.isWritable()) {
143                     final int capacity = byteBuf.capacity();
144                     final int maxCapacity = byteBuf.maxCapacity();
145                     if (capacity == maxCapacity) {
146                         allocHandle.incMessagesRead(1);
147                         readPending = false;
148                         pipeline.fireChannelRead(byteBuf);
149                         byteBuf = allocHandle.allocate(allocator);
150                     } else {
151                         final int writerIndex = byteBuf.writerIndex();
152                         if (writerIndex + available > maxCapacity) {
153                             byteBuf.capacity(maxCapacity);
154                         } else {
155                             byteBuf.ensureWritable(available);
156                         }
157                     }
158                 }
159             } while (allocHandle.continueReading());
160 
161             if (byteBuf != null) {
162                 // It is possible we allocated a buffer because the previous one was not writable, but then didn't use
163                 // it because allocHandle.continueReading() returned false.
164                 if (byteBuf.isReadable()) {
165                     readPending = false;
166                     pipeline.fireChannelRead(byteBuf);
167                 } else {
168                     byteBuf.release();
169                 }
170                 byteBuf = null;
171             }
172 
173             if (readData) {
174                 allocHandle.readComplete();
175                 pipeline.fireChannelReadComplete();
176             }
177 
178             if (close) {
179                 closeOnRead(pipeline);
180             }
181         } catch (Throwable t) {
182             handleReadException(pipeline, byteBuf, t, close, allocHandle);
183         } finally {
184             if (readPending || config.isAutoRead() || !readData && isActive()) {
185                 // Reading 0 bytes could mean there is a SocketTimeout and no data was actually read, so we
186                 // should execute read() again because no data may have been read.
187                 read();
188             }
189         }
190     }
191 
192     @Override
193     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
194         for (;;) {
195             Object msg = in.current();
196             if (msg == null) {
197                 // nothing left to write
198                 break;
199             }
200             if (msg instanceof ByteBuf) {
201                 ByteBuf buf = (ByteBuf) msg;
202                 int readableBytes = buf.readableBytes();
203                 while (readableBytes > 0) {
204                     doWriteBytes(buf);
205                     int newReadableBytes = buf.readableBytes();
206                     in.progress(readableBytes - newReadableBytes);
207                     readableBytes = newReadableBytes;
208                 }
209                 in.remove();
210             } else if (msg instanceof FileRegion) {
211                 FileRegion region = (FileRegion) msg;
212                 long transferred = region.transferred();
213                 doWriteFileRegion(region);
214                 in.progress(region.transferred() - transferred);
215                 in.remove();
216             } else {
217                 in.remove(new UnsupportedOperationException(
218                         "unsupported message type: " + StringUtil.simpleClassName(msg)));
219             }
220         }
221     }
222 
223     @Override
224     protected final Object filterOutboundMessage(Object msg) throws Exception {
225         if (msg instanceof ByteBuf || msg instanceof FileRegion) {
226             return msg;
227         }
228 
229         throw new UnsupportedOperationException(
230                 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
231     }
232 
233     /**
234      * Return the number of bytes ready to read from the underlying Socket.
235      */
236     protected abstract int available();
237 
238     /**
239      * Read bytes from the underlying Socket.
240      *
241      * @param buf           the {@link ByteBuf} into which the read bytes will be written
242      * @return amount       the number of bytes read. This may return a negative amount if the underlying
243      *                      Socket was closed
244      * @throws Exception    is thrown if an error occurred
245      */
246     protected abstract int doReadBytes(ByteBuf buf) throws Exception;
247 
248     /**
249      * Write the data which is hold by the {@link ByteBuf} to the underlying Socket.
250      *
251      * @param buf           the {@link ByteBuf} which holds the data to transfer
252      * @throws Exception    is thrown if an error occurred
253      */
254     protected abstract void doWriteBytes(ByteBuf buf) throws Exception;
255 
256     /**
257      * Write the data which is hold by the {@link FileRegion} to the underlying Socket.
258      *
259      * @param region        the {@link FileRegion} which holds the data to transfer
260      * @throws Exception    is thrown if an error occurred
261      */
262     protected abstract void doWriteFileRegion(FileRegion region) throws Exception;
263 }