1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.nio;
17
18 import io.netty.channel.Channel;
19 import io.netty.channel.ChannelConfig;
20 import io.netty.channel.ChannelOutboundBuffer;
21 import io.netty.channel.ChannelPipeline;
22 import io.netty.channel.ServerChannel;
23
24 import java.io.IOException;
25 import java.net.PortUnreachableException;
26 import java.nio.channels.SelectableChannel;
27 import java.nio.channels.SelectionKey;
28 import java.util.ArrayList;
29 import java.util.List;
30
31
32
33
34 public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
35
36
37
38
39 protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
40 super(parent, ch, readInterestOp);
41 }
42
43 @Override
44 protected AbstractNioUnsafe newUnsafe() {
45 return new NioMessageUnsafe();
46 }
47
48 private final class NioMessageUnsafe extends AbstractNioUnsafe {
49
50 private final List<Object> readBuf = new ArrayList<Object>();
51
52 @Override
53 public void read() {
54 assert eventLoop().inEventLoop();
55 final ChannelConfig config = config();
56 if (!config.isAutoRead() && !isReadPending()) {
57
58 removeReadOp();
59 return;
60 }
61
62 final int maxMessagesPerRead = config.getMaxMessagesPerRead();
63 final ChannelPipeline pipeline = pipeline();
64 boolean closed = false;
65 Throwable exception = null;
66 try {
67 try {
68 for (;;) {
69 int localRead = doReadMessages(readBuf);
70 if (localRead == 0) {
71 break;
72 }
73 if (localRead < 0) {
74 closed = true;
75 break;
76 }
77
78
79 if (!config.isAutoRead()) {
80 break;
81 }
82
83 if (readBuf.size() >= maxMessagesPerRead) {
84 break;
85 }
86 }
87 } catch (Throwable t) {
88 exception = t;
89 }
90 setReadPending(false);
91 int size = readBuf.size();
92 for (int i = 0; i < size; i ++) {
93 pipeline.fireChannelRead(readBuf.get(i));
94 }
95
96 readBuf.clear();
97 pipeline.fireChannelReadComplete();
98
99 if (exception != null) {
100 closed = closeOnReadError(exception);
101
102 pipeline.fireExceptionCaught(exception);
103 }
104
105 if (closed) {
106 if (isOpen()) {
107 close(voidPromise());
108 }
109 }
110 } finally {
111
112
113
114
115
116
117 if (!config.isAutoRead() && !isReadPending()) {
118 removeReadOp();
119 }
120 }
121 }
122 }
123
124 @Override
125 protected void doWrite(ChannelOutboundBuffer in) throws Exception {
126 final SelectionKey key = selectionKey();
127 final int interestOps = key.interestOps();
128
129 for (;;) {
130 Object msg = in.current();
131 if (msg == null) {
132
133 if ((interestOps & SelectionKey.OP_WRITE) != 0) {
134 key.interestOps(interestOps & ~SelectionKey.OP_WRITE);
135 }
136 break;
137 }
138 try {
139 boolean done = false;
140 for (int i = config().getWriteSpinCount() - 1; i >= 0; i--) {
141 if (doWriteMessage(msg, in)) {
142 done = true;
143 break;
144 }
145 }
146
147 if (done) {
148 in.remove();
149 } else {
150
151 if ((interestOps & SelectionKey.OP_WRITE) == 0) {
152 key.interestOps(interestOps | SelectionKey.OP_WRITE);
153 }
154 break;
155 }
156 } catch (Exception e) {
157 if (continueOnWriteError()) {
158 in.remove(e);
159 } else {
160 throw e;
161 }
162 }
163 }
164 }
165
166
167
168
169 protected boolean continueOnWriteError() {
170 return false;
171 }
172
173 protected boolean closeOnReadError(Throwable cause) {
174
175
176 return cause instanceof IOException &&
177 !(cause instanceof PortUnreachableException) &&
178 !(this instanceof ServerChannel);
179 }
180
181
182
183
184 protected abstract int doReadMessages(List<Object> buf) throws Exception;
185
186
187
188
189
190
191 protected abstract boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception;
192 }