View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.nio;
17  
18  import io.netty.channel.Channel;
19  import io.netty.channel.ChannelConfig;
20  import io.netty.channel.ChannelOutboundBuffer;
21  import io.netty.channel.ChannelPipeline;
22  import io.netty.channel.ServerChannel;
23  
24  import java.io.IOException;
25  import java.net.PortUnreachableException;
26  import java.nio.channels.SelectableChannel;
27  import java.nio.channels.SelectionKey;
28  import java.util.ArrayList;
29  import java.util.List;
30  
31  /**
32   * {@link AbstractNioChannel} base class for {@link Channel}s that operate on messages.
33   */
34  public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
35  
36      /**
37       * @see AbstractNioChannel#AbstractNioChannel(Channel, SelectableChannel, int)
38       */
39      protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
40          super(parent, ch, readInterestOp);
41      }
42  
43      @Override
44      protected AbstractNioUnsafe newUnsafe() {
45          return new NioMessageUnsafe();
46      }
47  
48      private final class NioMessageUnsafe extends AbstractNioUnsafe {
49  
50          private final List<Object> readBuf = new ArrayList<Object>();
51  
52          @Override
53          public void read() {
54              assert eventLoop().inEventLoop();
55              final ChannelConfig config = config();
56              if (!config.isAutoRead() && !isReadPending()) {
57                  // ChannelConfig.setAutoRead(false) was called in the meantime
58                  removeReadOp();
59                  return;
60              }
61  
62              final int maxMessagesPerRead = config.getMaxMessagesPerRead();
63              final ChannelPipeline pipeline = pipeline();
64              boolean closed = false;
65              Throwable exception = null;
66              try {
67                  try {
68                      for (;;) {
69                          int localRead = doReadMessages(readBuf);
70                          if (localRead == 0) {
71                              break;
72                          }
73                          if (localRead < 0) {
74                              closed = true;
75                              break;
76                          }
77  
78                          // stop reading and remove op
79                          if (!config.isAutoRead()) {
80                              break;
81                          }
82  
83                          if (readBuf.size() >= maxMessagesPerRead) {
84                              break;
85                          }
86                      }
87                  } catch (Throwable t) {
88                      exception = t;
89                  }
90                  setReadPending(false);
91                  int size = readBuf.size();
92                  for (int i = 0; i < size; i ++) {
93                      pipeline.fireChannelRead(readBuf.get(i));
94                  }
95  
96                  readBuf.clear();
97                  pipeline.fireChannelReadComplete();
98  
99                  if (exception != null) {
100                     closed = closeOnReadError(exception);
101 
102                     pipeline.fireExceptionCaught(exception);
103                 }
104 
105                 if (closed) {
106                     if (isOpen()) {
107                         close(voidPromise());
108                     }
109                 }
110             } finally {
111                 // Check if there is a readPending which was not processed yet.
112                 // This could be for two reasons:
113                 // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
114                 // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
115                 //
116                 // See https://github.com/netty/netty/issues/2254
117                 if (!config.isAutoRead() && !isReadPending()) {
118                     removeReadOp();
119                 }
120             }
121         }
122     }
123 
124     @Override
125     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
126         final SelectionKey key = selectionKey();
127         final int interestOps = key.interestOps();
128 
129         for (;;) {
130             Object msg = in.current();
131             if (msg == null) {
132                 // Wrote all messages.
133                 if ((interestOps & SelectionKey.OP_WRITE) != 0) {
134                     key.interestOps(interestOps & ~SelectionKey.OP_WRITE);
135                 }
136                 break;
137             }
138             try {
139                 boolean done = false;
140                 for (int i = config().getWriteSpinCount() - 1; i >= 0; i--) {
141                     if (doWriteMessage(msg, in)) {
142                         done = true;
143                         break;
144                     }
145                 }
146 
147                 if (done) {
148                     in.remove();
149                 } else {
150                     // Did not write all messages.
151                     if ((interestOps & SelectionKey.OP_WRITE) == 0) {
152                         key.interestOps(interestOps | SelectionKey.OP_WRITE);
153                     }
154                     break;
155                 }
156             } catch (Exception e) {
157                 if (continueOnWriteError()) {
158                     in.remove(e);
159                 } else {
160                     throw e;
161                 }
162             }
163         }
164     }
165 
166     /**
167      * Returns {@code true} if we should continue the write loop on a write error.
168      */
169     protected boolean continueOnWriteError() {
170         return false;
171     }
172 
173     protected boolean closeOnReadError(Throwable cause) {
174         // ServerChannel should not be closed even on IOException because it can often continue
175         // accepting incoming connections. (e.g. too many open files)
176         return cause instanceof IOException &&
177                 !(cause instanceof PortUnreachableException) &&
178                 !(this instanceof ServerChannel);
179     }
180 
181     /**
182      * Read messages into the given array and return the amount which was read.
183      */
184     protected abstract int doReadMessages(List<Object> buf) throws Exception;
185 
186     /**
187      * Write a message to the underlying {@link java.nio.channels.Channel}.
188      *
189      * @return {@code true} if and only if the message has been written
190      */
191     protected abstract boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception;
192 }