1 /*
2 * Copyright 2012 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License,
5 * version 2.0 (the "License"); you may not use this file except in compliance
6 * with the License. You may obtain a copy of the License at:
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations
14 * under the License.
15 */
16 package io.netty.channel.nio;
17
18 import io.netty.channel.Channel;
19 import io.netty.channel.ChannelConfig;
20 import io.netty.channel.ChannelOutboundBuffer;
21 import io.netty.channel.ChannelPipeline;
22 import io.netty.channel.ServerChannel;
23
24 import java.io.IOException;
25 import java.net.PortUnreachableException;
26 import java.nio.channels.SelectableChannel;
27 import java.nio.channels.SelectionKey;
28 import java.util.ArrayList;
29 import java.util.List;
30
31 /**
32 * {@link AbstractNioChannel} base class for {@link Channel}s that operate on messages.
33 */
34 public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
35
36 /**
37 * @see AbstractNioChannel#AbstractNioChannel(Channel, SelectableChannel, int)
38 */
39 protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
40 super(parent, ch, readInterestOp);
41 }
42
43 @Override
44 protected AbstractNioUnsafe newUnsafe() {
45 return new NioMessageUnsafe();
46 }
47
48 private final class NioMessageUnsafe extends AbstractNioUnsafe {
49
50 private final List<Object> readBuf = new ArrayList<Object>();
51
52 @Override
53 public void read() {
54 assert eventLoop().inEventLoop();
55 final ChannelConfig config = config();
56 if (!config.isAutoRead() && !isReadPending()) {
57 // ChannelConfig.setAutoRead(false) was called in the meantime
58 removeReadOp();
59 return;
60 }
61
62 final int maxMessagesPerRead = config.getMaxMessagesPerRead();
63 final ChannelPipeline pipeline = pipeline();
64 boolean closed = false;
65 Throwable exception = null;
66 try {
67 try {
68 for (;;) {
69 int localRead = doReadMessages(readBuf);
70 if (localRead == 0) {
71 break;
72 }
73 if (localRead < 0) {
74 closed = true;
75 break;
76 }
77
78 // stop reading and remove op
79 if (!config.isAutoRead()) {
80 break;
81 }
82
83 if (readBuf.size() >= maxMessagesPerRead) {
84 break;
85 }
86 }
87 } catch (Throwable t) {
88 exception = t;
89 }
90 setReadPending(false);
91 int size = readBuf.size();
92 for (int i = 0; i < size; i ++) {
93 pipeline.fireChannelRead(readBuf.get(i));
94 }
95
96 readBuf.clear();
97 pipeline.fireChannelReadComplete();
98
99 if (exception != null) {
100 closed = closeOnReadError(exception);
101
102 pipeline.fireExceptionCaught(exception);
103 }
104
105 if (closed) {
106 if (isOpen()) {
107 close(voidPromise());
108 }
109 }
110 } finally {
111 // Check if there is a readPending which was not processed yet.
112 // This could be for two reasons:
113 // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
114 // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
115 //
116 // See https://github.com/netty/netty/issues/2254
117 if (!config.isAutoRead() && !isReadPending()) {
118 removeReadOp();
119 }
120 }
121 }
122 }
123
124 @Override
125 protected void doWrite(ChannelOutboundBuffer in) throws Exception {
126 final SelectionKey key = selectionKey();
127 final int interestOps = key.interestOps();
128
129 for (;;) {
130 Object msg = in.current();
131 if (msg == null) {
132 // Wrote all messages.
133 if ((interestOps & SelectionKey.OP_WRITE) != 0) {
134 key.interestOps(interestOps & ~SelectionKey.OP_WRITE);
135 }
136 break;
137 }
138 try {
139 boolean done = false;
140 for (int i = config().getWriteSpinCount() - 1; i >= 0; i--) {
141 if (doWriteMessage(msg, in)) {
142 done = true;
143 break;
144 }
145 }
146
147 if (done) {
148 in.remove();
149 } else {
150 // Did not write all messages.
151 if ((interestOps & SelectionKey.OP_WRITE) == 0) {
152 key.interestOps(interestOps | SelectionKey.OP_WRITE);
153 }
154 break;
155 }
156 } catch (Exception e) {
157 if (continueOnWriteError()) {
158 in.remove(e);
159 } else {
160 throw e;
161 }
162 }
163 }
164 }
165
166 /**
167 * Returns {@code true} if we should continue the write loop on a write error.
168 */
169 protected boolean continueOnWriteError() {
170 return false;
171 }
172
173 protected boolean closeOnReadError(Throwable cause) {
174 // ServerChannel should not be closed even on IOException because it can often continue
175 // accepting incoming connections. (e.g. too many open files)
176 return cause instanceof IOException &&
177 !(cause instanceof PortUnreachableException) &&
178 !(this instanceof ServerChannel);
179 }
180
181 /**
182 * Read messages into the given array and return the amount which was read.
183 */
184 protected abstract int doReadMessages(List<Object> buf) throws Exception;
185
186 /**
187 * Write a message to the underlying {@link java.nio.channels.Channel}.
188 *
189 * @return {@code true} if and only if the message has been written
190 */
191 protected abstract boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception;
192 }