View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.nio;
17  
18  import io.netty.channel.Channel;
19  import io.netty.channel.ChannelConfig;
20  import io.netty.channel.ChannelOutboundBuffer;
21  import io.netty.channel.ChannelPipeline;
22  import io.netty.channel.RecvByteBufAllocator;
23  import io.netty.channel.ServerChannel;
24  
25  import java.io.IOException;
26  import java.net.PortUnreachableException;
27  import java.nio.channels.SelectableChannel;
28  import java.nio.channels.SelectionKey;
29  import java.util.ArrayList;
30  import java.util.List;
31  
32  /**
33   * {@link AbstractNioChannel} base class for {@link Channel}s that operate on messages.
34   */
35  public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
36      boolean inputShutdown;
37  
38      /**
39       * @see AbstractNioChannel#AbstractNioChannel(Channel, SelectableChannel, int)
40       */
41      protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
42          super(parent, ch, readInterestOp);
43      }
44  
45      @Override
46      protected AbstractNioUnsafe newUnsafe() {
47          return new NioMessageUnsafe();
48      }
49  
50      @Override
51      protected void doBeginRead() throws Exception {
52          if (inputShutdown) {
53              return;
54          }
55          super.doBeginRead();
56      }
57  
58      protected boolean continueReading(RecvByteBufAllocator.Handle allocHandle) {
59          return allocHandle.continueReading();
60      }
61  
62      private final class NioMessageUnsafe extends AbstractNioUnsafe {
63  
64          private final List<Object> readBuf = new ArrayList<Object>();
65  
66          @Override
67          public void read() {
68              assert eventLoop().inEventLoop();
69              final ChannelConfig config = config();
70              final ChannelPipeline pipeline = pipeline();
71              final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
72              allocHandle.reset(config);
73  
74              boolean closed = false;
75              Throwable exception = null;
76              try {
77                  try {
78                      do {
79                          int localRead = doReadMessages(readBuf);
80                          if (localRead == 0) {
81                              break;
82                          }
83                          if (localRead < 0) {
84                              closed = true;
85                              break;
86                          }
87  
88                          allocHandle.incMessagesRead(localRead);
89                      } while (continueReading(allocHandle));
90                  } catch (Throwable t) {
91                      exception = t;
92                  }
93  
94                  int size = readBuf.size();
95                  for (int i = 0; i < size; i ++) {
96                      readPending = false;
97                      pipeline.fireChannelRead(readBuf.get(i));
98                  }
99                  readBuf.clear();
100                 allocHandle.readComplete();
101                 pipeline.fireChannelReadComplete();
102 
103                 if (exception != null) {
104                     closed = closeOnReadError(exception);
105 
106                     pipeline.fireExceptionCaught(exception);
107                 }
108 
109                 if (closed) {
110                     inputShutdown = true;
111                     if (isOpen()) {
112                         close(voidPromise());
113                     }
114                 }
115             } finally {
116                 // Check if there is a readPending which was not processed yet.
117                 // This could be for two reasons:
118                 // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
119                 // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
120                 //
121                 // See https://github.com/netty/netty/issues/2254
122                 if (!readPending && !config.isAutoRead()) {
123                     removeReadOp();
124                 }
125             }
126         }
127     }
128 
129     @Override
130     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
131         final SelectionKey key = selectionKey();
132         final int interestOps = key.interestOps();
133 
134         int maxMessagesPerWrite = maxMessagesPerWrite();
135         while (maxMessagesPerWrite > 0) {
136             Object msg = in.current();
137             if (msg == null) {
138                 break;
139             }
140             try {
141                 boolean done = false;
142                 for (int i = config().getWriteSpinCount() - 1; i >= 0; i--) {
143                     if (doWriteMessage(msg, in)) {
144                         done = true;
145                         break;
146                     }
147                 }
148 
149                 if (done) {
150                     maxMessagesPerWrite--;
151                     in.remove();
152                 } else {
153                     break;
154                 }
155             } catch (Exception e) {
156                 if (continueOnWriteError()) {
157                     maxMessagesPerWrite--;
158                     in.remove(e);
159                 } else {
160                     throw e;
161                 }
162             }
163         }
164         if (in.isEmpty()) {
165             // Wrote all messages.
166             if ((interestOps & SelectionKey.OP_WRITE) != 0) {
167                 key.interestOps(interestOps & ~SelectionKey.OP_WRITE);
168             }
169         } else {
170             // Did not write all messages.
171             if ((interestOps & SelectionKey.OP_WRITE) == 0) {
172                 key.interestOps(interestOps | SelectionKey.OP_WRITE);
173             }
174         }
175     }
176 
177     /**
178      * Returns {@code true} if we should continue the write loop on a write error.
179      */
180     protected boolean continueOnWriteError() {
181         return false;
182     }
183 
184     protected boolean closeOnReadError(Throwable cause) {
185         if (!isActive()) {
186             // If the channel is not active anymore for whatever reason we should not try to continue reading.
187             return true;
188         }
189         if (cause instanceof PortUnreachableException) {
190             return false;
191         }
192         if (cause instanceof IOException) {
193             // ServerChannel should not be closed even on IOException because it can often continue
194             // accepting incoming connections. (e.g. too many open files)
195             return !(this instanceof ServerChannel);
196         }
197         return true;
198     }
199 
200     /**
201      * Read messages into the given array and return the amount which was read.
202      */
203     protected abstract int doReadMessages(List<Object> buf) throws Exception;
204 
205     /**
206      * Write a message to the underlying {@link java.nio.channels.Channel}.
207      *
208      * @return {@code true} if and only if the message has been written
209      */
210     protected abstract boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception;
211 }