View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.oio;
17  
18  import io.netty.channel.Channel;
19  import io.netty.channel.ChannelConfig;
20  import io.netty.channel.ChannelPipeline;
21  
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.List;
25  
26  /**
27   * Abstract base class for OIO which reads and writes objects from/to a Socket
28   */
29  public abstract class AbstractOioMessageChannel extends AbstractOioChannel {
30  
31      private final List<Object> readBuf = new ArrayList<Object>();
32  
33      protected AbstractOioMessageChannel(Channel parent) {
34          super(parent);
35      }
36  
37      @Override
38      protected void doRead() {
39          final ChannelConfig config = config();
40          final ChannelPipeline pipeline = pipeline();
41          boolean closed = false;
42          final int maxMessagesPerRead = config.getMaxMessagesPerRead();
43  
44          Throwable exception = null;
45          int localRead = 0;
46          int totalRead = 0;
47          try {
48              for (;;) {
49                  // Perform a read.
50                  localRead = doReadMessages(readBuf);
51                  if (localRead == 0) {
52                      break;
53                  }
54                  if (localRead < 0) {
55                      closed = true;
56                      break;
57                  }
58  
59                  // Notify with the received messages and clear the buffer.
60                  int size = readBuf.size();
61                  for (int i = 0; i < size; i ++) {
62                      pipeline.fireChannelRead(readBuf.get(i));
63                  }
64                  readBuf.clear();
65  
66                  // Do not read beyond maxMessagesPerRead.
67                  // Do not continue reading if autoRead has been turned off.
68                  totalRead += localRead;
69                  if (totalRead >= maxMessagesPerRead || !config.isAutoRead()) {
70                      break;
71                  }
72              }
73          } catch (Throwable t) {
74              exception = t;
75          }
76  
77          pipeline.fireChannelReadComplete();
78  
79          if (exception != null) {
80              if (exception instanceof IOException) {
81                  closed = true;
82              }
83  
84              pipeline().fireExceptionCaught(exception);
85          }
86  
87          if (closed) {
88              if (isOpen()) {
89                  unsafe().close(unsafe().voidPromise());
90              }
91          } else if (localRead == 0 && isActive()) {
92              // If the read amount was 0 and the channel is still active we need to trigger a new read()
93              // as otherwise we will never try to read again and the user will never know.
94              // Just call read() is ok here as it will be submitted to the EventLoop as a task and so we are
95              // able to process the rest of the tasks in the queue first.
96              //
97              // See https://github.com/netty/netty/issues/2404
98              read();
99          }
100     }
101 
102     /**
103      * Read messages into the given array and return the amount which was read.
104      */
105     protected abstract int doReadMessages(List<Object> msgs) throws Exception;
106 }