View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.oio;
17  
18  import io.netty.channel.Channel;
19  import io.netty.channel.ChannelConfig;
20  import io.netty.channel.ChannelPipeline;
21  import io.netty.channel.RecvByteBufAllocator;
22  
23  import java.io.IOException;
24  import java.util.ArrayList;
25  import java.util.List;
26  
27  /**
28   * Abstract base class for OIO which reads and writes objects from/to a Socket
29   *
30   * @deprecated use NIO / EPOLL / KQUEUE transport.
31   */
32  @Deprecated
33  public abstract class AbstractOioMessageChannel extends AbstractOioChannel {
34  
35      private final List<Object> readBuf = new ArrayList<Object>();
36  
37      protected AbstractOioMessageChannel(Channel parent) {
38          super(parent);
39      }
40  
41      @Override
42      protected void doRead() {
43          if (!readPending) {
44              // We have to check readPending here because the Runnable to read could have been scheduled and later
45              // during the same read loop readPending was set to false.
46              return;
47          }
48          // In OIO we should set readPending to false even if the read was not successful so we can schedule
49          // another read on the event loop if no reads are done.
50          readPending = false;
51  
52          final ChannelConfig config = config();
53          final ChannelPipeline pipeline = pipeline();
54          final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
55          allocHandle.reset(config);
56  
57          boolean closed = false;
58          Throwable exception = null;
59          try {
60              do {
61                  // Perform a read.
62                  int localRead = doReadMessages(readBuf);
63                  if (localRead == 0) {
64                      break;
65                  }
66                  if (localRead < 0) {
67                      closed = true;
68                      break;
69                  }
70  
71                  allocHandle.incMessagesRead(localRead);
72              } while (allocHandle.continueReading());
73          } catch (Throwable t) {
74              exception = t;
75          }
76  
77          boolean readData = false;
78          int size = readBuf.size();
79          if (size > 0) {
80              readData = true;
81              for (int i = 0; i < size; i++) {
82                  readPending = false;
83                  pipeline.fireChannelRead(readBuf.get(i));
84              }
85              readBuf.clear();
86              allocHandle.readComplete();
87              pipeline.fireChannelReadComplete();
88          }
89  
90          if (exception != null) {
91              if (exception instanceof IOException) {
92                  closed = true;
93              }
94  
95              pipeline.fireExceptionCaught(exception);
96          }
97  
98          if (closed) {
99              if (isOpen()) {
100                 unsafe().close(unsafe().voidPromise());
101             }
102         } else if (readPending || config.isAutoRead() || !readData && isActive()) {
103             // Reading 0 bytes could mean there is a SocketTimeout and no data was actually read, so we
104             // should execute read() again because no data may have been read.
105             read();
106         }
107     }
108 
109     /**
110      * Read messages into the given array and return the amount which was read.
111      */
112     protected abstract int doReadMessages(List<Object> msgs) throws Exception;
113 }