View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.nio;
17  
18  import io.netty.channel.Channel;
19  import io.netty.channel.ChannelConfig;
20  import io.netty.channel.ChannelOutboundBuffer;
21  import io.netty.channel.ChannelPipeline;
22  import io.netty.channel.RecvByteBufAllocator;
23  import io.netty.channel.ServerChannel;
24  
25  import java.io.IOException;
26  import java.net.PortUnreachableException;
27  import java.nio.channels.SelectableChannel;
28  import java.nio.channels.SelectionKey;
29  import java.util.ArrayList;
30  import java.util.List;
31  
32  /**
33   * {@link AbstractNioChannel} base class for {@link Channel}s that operate on messages.
34   */
35  public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
36      boolean inputShutdown;
37  
38      /**
39       * @see AbstractNioChannel#AbstractNioChannel(Channel, SelectableChannel, int)
40       */
41      protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
42          super(parent, ch, readInterestOp);
43      }
44  
45      @Override
46      protected AbstractNioUnsafe newUnsafe() {
47          return new NioMessageUnsafe();
48      }
49  
50      @Override
51      protected void doBeginRead() throws Exception {
52          if (inputShutdown) {
53              return;
54          }
55          super.doBeginRead();
56      }
57  
58      private final class NioMessageUnsafe extends AbstractNioUnsafe {
59  
60          private final List<Object> readBuf = new ArrayList<Object>();
61  
62          @Override
63          public void read() {
64              assert eventLoop().inEventLoop();
65              final ChannelConfig config = config();
66              final ChannelPipeline pipeline = pipeline();
67              final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
68              allocHandle.reset(config);
69  
70              boolean closed = false;
71              Throwable exception = null;
72              try {
73                  try {
74                      do {
75                          int localRead = doReadMessages(readBuf);
76                          if (localRead == 0) {
77                              break;
78                          }
79                          if (localRead < 0) {
80                              closed = true;
81                              break;
82                          }
83  
84                          allocHandle.incMessagesRead(localRead);
85                      } while (allocHandle.continueReading());
86                  } catch (Throwable t) {
87                      exception = t;
88                  }
89  
90                  int size = readBuf.size();
91                  for (int i = 0; i < size; i ++) {
92                      readPending = false;
93                      pipeline.fireChannelRead(readBuf.get(i));
94                  }
95                  readBuf.clear();
96                  allocHandle.readComplete();
97                  pipeline.fireChannelReadComplete();
98  
99                  if (exception != null) {
100                     closed = closeOnReadError(exception);
101 
102                     pipeline.fireExceptionCaught(exception);
103                 }
104 
105                 if (closed) {
106                     inputShutdown = true;
107                     if (isOpen()) {
108                         close(voidPromise());
109                     }
110                 }
111             } finally {
112                 // Check if there is a readPending which was not processed yet.
113                 // This could be for two reasons:
114                 // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
115                 // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
116                 //
117                 // See https://github.com/netty/netty/issues/2254
118                 if (!readPending && !config.isAutoRead()) {
119                     removeReadOp();
120                 }
121             }
122         }
123     }
124 
125     @Override
126     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
127         final SelectionKey key = selectionKey();
128         final int interestOps = key.interestOps();
129 
130         for (;;) {
131             Object msg = in.current();
132             if (msg == null) {
133                 // Wrote all messages.
134                 if ((interestOps & SelectionKey.OP_WRITE) != 0) {
135                     key.interestOps(interestOps & ~SelectionKey.OP_WRITE);
136                 }
137                 break;
138             }
139             try {
140                 boolean done = false;
141                 for (int i = config().getWriteSpinCount() - 1; i >= 0; i--) {
142                     if (doWriteMessage(msg, in)) {
143                         done = true;
144                         break;
145                     }
146                 }
147 
148                 if (done) {
149                     in.remove();
150                 } else {
151                     // Did not write all messages.
152                     if ((interestOps & SelectionKey.OP_WRITE) == 0) {
153                         key.interestOps(interestOps | SelectionKey.OP_WRITE);
154                     }
155                     break;
156                 }
157             } catch (Exception e) {
158                 if (continueOnWriteError()) {
159                     in.remove(e);
160                 } else {
161                     throw e;
162                 }
163             }
164         }
165     }
166 
167     /**
168      * Returns {@code true} if we should continue the write loop on a write error.
169      */
170     protected boolean continueOnWriteError() {
171         return false;
172     }
173 
174     protected boolean closeOnReadError(Throwable cause) {
175         if (!isActive()) {
176             // If the channel is not active anymore for whatever reason we should not try to continue reading.
177             return true;
178         }
179         if (cause instanceof PortUnreachableException) {
180             return false;
181         }
182         if (cause instanceof IOException) {
183             // ServerChannel should not be closed even on IOException because it can often continue
184             // accepting incoming connections. (e.g. too many open files)
185             return !(this instanceof ServerChannel);
186         }
187         return true;
188     }
189 
190     /**
191      * Read messages into the given array and return the amount which was read.
192      */
193     protected abstract int doReadMessages(List<Object> buf) throws Exception;
194 
195     /**
196      * Write a message to the underlying {@link java.nio.channels.Channel}.
197      *
198      * @return {@code true} if and only if the message has been written
199      */
200     protected abstract boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception;
201 }