View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.nio;
17  
18  import io.netty.channel.Channel;
19  import io.netty.channel.ChannelConfig;
20  import io.netty.channel.ChannelOutboundBuffer;
21  import io.netty.channel.ChannelPipeline;
22  import io.netty.channel.ServerChannel;
23  
24  import java.io.IOException;
25  import java.net.PortUnreachableException;
26  import java.nio.channels.SelectableChannel;
27  import java.nio.channels.SelectionKey;
28  import java.util.ArrayList;
29  import java.util.List;
30  
31  /**
32   * {@link AbstractNioChannel} base class for {@link Channel}s that operate on messages.
33   */
34  public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
35  
36      /**
37       * @see {@link AbstractNioChannel#AbstractNioChannel(Channel, SelectableChannel, int)}
38       */
39      protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
40          super(parent, ch, readInterestOp);
41      }
42  
43      @Override
44      protected AbstractNioUnsafe newUnsafe() {
45          return new NioMessageUnsafe();
46      }
47  
48      private final class NioMessageUnsafe extends AbstractNioUnsafe {
49  
50          private final List<Object> readBuf = new ArrayList<Object>();
51  
52          @Override
53          public void read() {
54              assert eventLoop().inEventLoop();
55              final ChannelConfig config = config();
56              if (!config.isAutoRead() && !isReadPending()) {
57                  // ChannelConfig.setAutoRead(false) was called in the meantime
58                  removeReadOp();
59                  return;
60              }
61  
62              final int maxMessagesPerRead = config.getMaxMessagesPerRead();
63              final ChannelPipeline pipeline = pipeline();
64              boolean closed = false;
65              Throwable exception = null;
66              try {
67                  try {
68                      for (;;) {
69                          int localRead = doReadMessages(readBuf);
70                          if (localRead == 0) {
71                              break;
72                          }
73                          if (localRead < 0) {
74                              closed = true;
75                              break;
76                          }
77  
78                          // stop reading and remove op
79                          if (!config.isAutoRead()) {
80                              break;
81                          }
82  
83                          if (readBuf.size() >= maxMessagesPerRead) {
84                              break;
85                          }
86                      }
87                  } catch (Throwable t) {
88                      exception = t;
89                  }
90                  setReadPending(false);
91                  int size = readBuf.size();
92                  for (int i = 0; i < size; i ++) {
93                      pipeline.fireChannelRead(readBuf.get(i));
94                  }
95  
96                  readBuf.clear();
97                  pipeline.fireChannelReadComplete();
98  
99                  if (exception != null) {
100                     if (exception instanceof IOException && !(exception instanceof PortUnreachableException)) {
101                         // ServerChannel should not be closed even on IOException because it can often continue
102                         // accepting incoming connections. (e.g. too many open files)
103                         closed = !(AbstractNioMessageChannel.this instanceof ServerChannel);
104                     }
105 
106                     pipeline.fireExceptionCaught(exception);
107                 }
108 
109                 if (closed) {
110                     if (isOpen()) {
111                         close(voidPromise());
112                     }
113                 }
114             } finally {
115                 // Check if there is a readPending which was not processed yet.
116                 // This could be for two reasons:
117                 // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
118                 // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
119                 //
120                 // See https://github.com/netty/netty/issues/2254
121                 if (!config.isAutoRead() && !isReadPending()) {
122                     removeReadOp();
123                 }
124             }
125         }
126     }
127 
128     @Override
129     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
130         final SelectionKey key = selectionKey();
131         final int interestOps = key.interestOps();
132 
133         for (;;) {
134             Object msg = in.current();
135             if (msg == null) {
136                 // Wrote all messages.
137                 if ((interestOps & SelectionKey.OP_WRITE) != 0) {
138                     key.interestOps(interestOps & ~SelectionKey.OP_WRITE);
139                 }
140                 break;
141             }
142             try {
143                 boolean done = false;
144                 for (int i = config().getWriteSpinCount() - 1; i >= 0; i--) {
145                     if (doWriteMessage(msg, in)) {
146                         done = true;
147                         break;
148                     }
149                 }
150 
151                 if (done) {
152                     in.remove();
153                 } else {
154                     // Did not write all messages.
155                     if ((interestOps & SelectionKey.OP_WRITE) == 0) {
156                         key.interestOps(interestOps | SelectionKey.OP_WRITE);
157                     }
158                     break;
159                 }
160             } catch (IOException e) {
161                 if (continueOnWriteError()) {
162                     in.remove(e);
163                 } else {
164                     throw e;
165                 }
166             }
167         }
168     }
169 
170     /**
171      * Returns {@code true} if we should continue the write loop on a write error.
172      */
173     protected boolean continueOnWriteError() {
174         return false;
175     }
176 
177     /**
178      * Read messages into the given array and return the amount which was read.
179      */
180     protected abstract int doReadMessages(List<Object> buf) throws Exception;
181 
182     /**
183      * Write a message to the underlying {@link java.nio.channels.Channel}.
184      *
185      * @return {@code true} if and only if the message has been written
186      */
187     protected abstract boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception;
188 }