View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty5.channel.nio;
17  
18  import io.netty5.channel.Channel;
19  import io.netty5.channel.ChannelMetadata;
20  import io.netty5.channel.ChannelOutboundBuffer;
21  import io.netty5.channel.ChannelPipeline;
22  import io.netty5.channel.ChannelShutdownDirection;
23  import io.netty5.channel.EventLoop;
24  import io.netty5.channel.RecvBufferAllocator;
25  import io.netty5.channel.ServerChannel;
26  
27  import java.io.IOException;
28  import java.net.PortUnreachableException;
29  import java.net.SocketAddress;
30  import java.nio.channels.SelectableChannel;
31  import java.nio.channels.SelectionKey;
32  import java.util.ArrayList;
33  import java.util.List;
34  
35  /**
36   * {@link AbstractNioChannel} base class for {@link Channel}s that operate on messages.
37   */
38  public abstract class AbstractNioMessageChannel<P extends Channel, L extends SocketAddress, R extends SocketAddress>
39          extends AbstractNioChannel<P, L, R> {
40      boolean inputShutdown;
41      private final List<Object> readBuf = new ArrayList<>();
42  
43      /**
44       * @see AbstractNioChannel#AbstractNioChannel(Channel, EventLoop,
45       * ChannelMetadata, RecvBufferAllocator, SelectableChannel, int)
46       */
47      protected AbstractNioMessageChannel(P parent, EventLoop eventLoop, ChannelMetadata metadata,
48                                          RecvBufferAllocator defaultRecvAllocator,
49                                          SelectableChannel ch, int readInterestOp) {
50          super(parent, eventLoop, metadata, defaultRecvAllocator, ch, readInterestOp);
51      }
52  
53      @Override
54      protected void doBeginRead() throws Exception {
55          if (inputShutdown) {
56              return;
57          }
58          super.doBeginRead();
59      }
60  
61      protected boolean continueReading(RecvBufferAllocator.Handle allocHandle) {
62          return allocHandle.continueReading(isAutoRead());
63      }
64  
65      @Override
66      protected final void readNow() {
67          assert executor().inEventLoop();
68          final ChannelPipeline pipeline = pipeline();
69          final RecvBufferAllocator.Handle allocHandle = recvBufAllocHandle();
70          allocHandle.reset();
71  
72          boolean closed = false;
73          Throwable exception = null;
74          try {
75              try {
76                  do {
77                      int localRead = doReadMessages(readBuf);
78                      if (localRead == 0) {
79                          break;
80                      }
81                      if (localRead < 0) {
82                          closed = true;
83                          break;
84                      }
85  
86                      allocHandle.incMessagesRead(localRead);
87                  } while (continueReading(allocHandle) && !isShutdown(ChannelShutdownDirection.Inbound));
88              } catch (Throwable t) {
89                  exception = t;
90              }
91  
92              int size = readBuf.size();
93              for (int i = 0; i < size; i ++) {
94                  readPending = false;
95                  pipeline.fireChannelRead(readBuf.get(i));
96              }
97              readBuf.clear();
98              allocHandle.readComplete();
99              pipeline.fireChannelReadComplete();
100 
101             if (exception != null) {
102                 closed = closeOnReadError(exception);
103 
104                 pipeline.fireChannelExceptionCaught(exception);
105             }
106 
107             if (closed) {
108                 inputShutdown = true;
109                 if (isOpen()) {
110                     closeTransport(newPromise());
111                 }
112             } else {
113                 readIfIsAutoRead();
114             }
115         } finally {
116             // Check if there is a readPending which was not processed yet.
117             // This could be for two reasons:
118             // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
119             // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
120             //
121             // See https://github.com/netty/netty/issues/2254
122             if (!readPending && !isAutoRead()) {
123                 removeReadOp();
124             }
125         }
126     }
127 
128     @Override
129     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
130         final SelectionKey key = selectionKey();
131         if (key == null) {
132             return;
133         }
134         final int interestOps = key.interestOps();
135 
136         int maxMessagesPerWrite = getMaxMessagesPerWrite();
137         while (maxMessagesPerWrite > 0) {
138             Object msg = in.current();
139             if (msg == null) {
140                 break;
141             }
142             try {
143                 boolean done = false;
144                 for (int i = getWriteSpinCount() - 1; i >= 0; i--) {
145                     if (doWriteMessage(msg, in)) {
146                         done = true;
147                         break;
148                     }
149                 }
150 
151                 if (done) {
152                     maxMessagesPerWrite--;
153                     in.remove();
154                 } else {
155                     break;
156                 }
157             } catch (Exception e) {
158                 if (continueOnWriteError()) {
159                     maxMessagesPerWrite--;
160                     in.remove(e);
161                 } else {
162                     throw e;
163                 }
164             }
165         }
166         if (in.isEmpty()) {
167             // Wrote all messages.
168             if ((interestOps & SelectionKey.OP_WRITE) != 0) {
169                 key.interestOps(interestOps & ~SelectionKey.OP_WRITE);
170             }
171         } else {
172             // Did not write all messages.
173             if ((interestOps & SelectionKey.OP_WRITE) == 0) {
174                 key.interestOps(interestOps | SelectionKey.OP_WRITE);
175             }
176         }
177     }
178 
179     /**
180      * Returns {@code true} if we should continue the write loop on a write error.
181      */
182     protected boolean continueOnWriteError() {
183         return false;
184     }
185 
186     protected boolean closeOnReadError(Throwable cause) {
187         if (!isActive()) {
188             // If the channel is not active anymore for whatever reason we should not try to continue reading.
189             return true;
190         }
191         if (cause instanceof PortUnreachableException) {
192             return false;
193         }
194         if (cause instanceof IOException) {
195             // ServerChannel should not be closed even on IOException because it can often continue
196             // accepting incoming connections. (e.g. too many open files)
197             return !(this instanceof ServerChannel);
198         }
199         return true;
200     }
201 
202     /**
203      * Read messages into the given array and return the amount which was read.
204      */
205     protected abstract int doReadMessages(List<Object> buf) throws Exception;
206 
207     /**
208      * Write a message to the underlying {@link java.nio.channels.Channel}.
209      *
210      * @return {@code true} if and only if the message has been written
211      */
212     protected abstract boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception;
213 }