View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.nio;
17  
18  import io.netty.channel.Channel;
19  import io.netty.channel.ChannelConfig;
20  import io.netty.channel.ChannelOutboundBuffer;
21  import io.netty.channel.ChannelPipeline;
22  import io.netty.channel.RecvByteBufAllocator;
23  import io.netty.channel.ServerChannel;
24  
25  import java.io.IOException;
26  import java.net.PortUnreachableException;
27  import java.nio.channels.SelectableChannel;
28  import java.util.ArrayList;
29  import java.util.List;
30  
31  /**
32   * {@link AbstractNioChannel} base class for {@link Channel}s that operate on messages.
33   */
34  public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
35      boolean inputShutdown;
36  
37      /**
38       * @see AbstractNioChannel#AbstractNioChannel(Channel, SelectableChannel, int)
39       */
40      protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
41          super(parent, ch, readInterestOp);
42      }
43  
44      protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, NioIoOps readOps) {
45          super(parent, ch, readOps);
46      }
47  
48      @Override
49      protected AbstractNioUnsafe newUnsafe() {
50          return new NioMessageUnsafe();
51      }
52  
53      @Override
54      protected void doBeginRead() throws Exception {
55          if (inputShutdown) {
56              return;
57          }
58          super.doBeginRead();
59      }
60  
61      protected boolean continueReading(RecvByteBufAllocator.Handle allocHandle) {
62          return allocHandle.continueReading();
63      }
64  
65      private final class NioMessageUnsafe extends AbstractNioUnsafe {
66  
67          private final List<Object> readBuf = new ArrayList<Object>();
68  
69          @Override
70          public void read() {
71              assert eventLoop().inEventLoop();
72              final ChannelConfig config = config();
73              final ChannelPipeline pipeline = pipeline();
74              final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
75              allocHandle.reset(config);
76  
77              boolean closed = false;
78              Throwable exception = null;
79              try {
80                  try {
81                      do {
82                          int localRead = doReadMessages(readBuf);
83                          if (localRead == 0) {
84                              break;
85                          }
86                          if (localRead < 0) {
87                              closed = true;
88                              break;
89                          }
90  
91                          allocHandle.incMessagesRead(localRead);
92                      } while (continueReading(allocHandle));
93                  } catch (Throwable t) {
94                      exception = t;
95                  }
96  
97                  int size = readBuf.size();
98                  for (int i = 0; i < size; i ++) {
99                      readPending = false;
100                     pipeline.fireChannelRead(readBuf.get(i));
101                 }
102                 readBuf.clear();
103                 allocHandle.readComplete();
104                 pipeline.fireChannelReadComplete();
105 
106                 if (exception != null) {
107                     closed = closeOnReadError(exception);
108 
109                     pipeline.fireExceptionCaught(exception);
110                 }
111 
112                 if (closed) {
113                     inputShutdown = true;
114                     if (isOpen()) {
115                         close(voidPromise());
116                     }
117                 }
118             } finally {
119                 // Check if there is a readPending which was not processed yet.
120                 // This could be for two reasons:
121                 // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
122                 // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
123                 //
124                 // See https://github.com/netty/netty/issues/2254
125                 if (!readPending && !config.isAutoRead()) {
126                     removeReadOp();
127                 }
128             }
129         }
130     }
131 
132     @Override
133     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
134         int maxMessagesPerWrite = maxMessagesPerWrite();
135         while (maxMessagesPerWrite > 0) {
136             Object msg = in.current();
137             if (msg == null) {
138                 break;
139             }
140             try {
141                 boolean done = false;
142                 for (int i = config().getWriteSpinCount() - 1; i >= 0; i--) {
143                     if (doWriteMessage(msg, in)) {
144                         done = true;
145                         break;
146                     }
147                 }
148 
149                 if (done) {
150                     maxMessagesPerWrite--;
151                     in.remove();
152                 } else {
153                     break;
154                 }
155             } catch (Exception e) {
156                 if (continueOnWriteError()) {
157                     maxMessagesPerWrite--;
158                     in.remove(e);
159                 } else {
160                     throw e;
161                 }
162             }
163         }
164         if (in.isEmpty()) {
165             // Wrote all messages.
166             removeAndSubmit(NioIoOps.WRITE);
167         } else {
168             // Did not write all messages.
169             addAndSubmit(NioIoOps.WRITE);
170         }
171     }
172 
173     /**
174      * Returns {@code true} if we should continue the write loop on a write error.
175      */
176     protected boolean continueOnWriteError() {
177         return false;
178     }
179 
180     protected boolean closeOnReadError(Throwable cause) {
181         if (!isActive()) {
182             // If the channel is not active anymore for whatever reason we should not try to continue reading.
183             return true;
184         }
185         if (cause instanceof PortUnreachableException) {
186             return false;
187         }
188         if (cause instanceof IOException) {
189             // ServerChannel should not be closed even on IOException because it can often continue
190             // accepting incoming connections. (e.g. too many open files)
191             return !(this instanceof ServerChannel);
192         }
193         return true;
194     }
195 
196     /**
197      * Read messages into the given array and return the amount which was read.
198      */
199     protected abstract int doReadMessages(List<Object> buf) throws Exception;
200 
201     /**
202      * Write a message to the underlying {@link java.nio.channels.Channel}.
203      *
204      * @return {@code true} if and only if the message has been written
205      */
206     protected abstract boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception;
207 }