1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty5.channel.nio;
17
18 import io.netty5.channel.Channel;
19 import io.netty5.channel.ChannelMetadata;
20 import io.netty5.channel.ChannelOutboundBuffer;
21 import io.netty5.channel.ChannelPipeline;
22 import io.netty5.channel.ChannelShutdownDirection;
23 import io.netty5.channel.EventLoop;
24 import io.netty5.channel.RecvBufferAllocator;
25 import io.netty5.channel.ServerChannel;
26
27 import java.io.IOException;
28 import java.net.PortUnreachableException;
29 import java.net.SocketAddress;
30 import java.nio.channels.SelectableChannel;
31 import java.nio.channels.SelectionKey;
32 import java.util.ArrayList;
33 import java.util.List;
34
35
36
37
38 public abstract class AbstractNioMessageChannel<P extends Channel, L extends SocketAddress, R extends SocketAddress>
39 extends AbstractNioChannel<P, L, R> {
40 boolean inputShutdown;
41 private final List<Object> readBuf = new ArrayList<>();
42
43
44
45
46
47 protected AbstractNioMessageChannel(P parent, EventLoop eventLoop, ChannelMetadata metadata,
48 RecvBufferAllocator defaultRecvAllocator,
49 SelectableChannel ch, int readInterestOp) {
50 super(parent, eventLoop, metadata, defaultRecvAllocator, ch, readInterestOp);
51 }
52
53 @Override
54 protected void doBeginRead() throws Exception {
55 if (inputShutdown) {
56 return;
57 }
58 super.doBeginRead();
59 }
60
61 protected boolean continueReading(RecvBufferAllocator.Handle allocHandle) {
62 return allocHandle.continueReading(isAutoRead());
63 }
64
65 @Override
66 protected final void readNow() {
67 assert executor().inEventLoop();
68 final ChannelPipeline pipeline = pipeline();
69 final RecvBufferAllocator.Handle allocHandle = recvBufAllocHandle();
70 allocHandle.reset();
71
72 boolean closed = false;
73 Throwable exception = null;
74 try {
75 try {
76 do {
77 int localRead = doReadMessages(readBuf);
78 if (localRead == 0) {
79 break;
80 }
81 if (localRead < 0) {
82 closed = true;
83 break;
84 }
85
86 allocHandle.incMessagesRead(localRead);
87 } while (continueReading(allocHandle) && !isShutdown(ChannelShutdownDirection.Inbound));
88 } catch (Throwable t) {
89 exception = t;
90 }
91
92 int size = readBuf.size();
93 for (int i = 0; i < size; i ++) {
94 readPending = false;
95 pipeline.fireChannelRead(readBuf.get(i));
96 }
97 readBuf.clear();
98 allocHandle.readComplete();
99 pipeline.fireChannelReadComplete();
100
101 if (exception != null) {
102 closed = closeOnReadError(exception);
103
104 pipeline.fireChannelExceptionCaught(exception);
105 }
106
107 if (closed) {
108 inputShutdown = true;
109 if (isOpen()) {
110 closeTransport(newPromise());
111 }
112 } else {
113 readIfIsAutoRead();
114 }
115 } finally {
116
117
118
119
120
121
122 if (!readPending && !isAutoRead()) {
123 removeReadOp();
124 }
125 }
126 }
127
128 @Override
129 protected void doWrite(ChannelOutboundBuffer in) throws Exception {
130 final SelectionKey key = selectionKey();
131 if (key == null) {
132 return;
133 }
134 final int interestOps = key.interestOps();
135
136 int maxMessagesPerWrite = getMaxMessagesPerWrite();
137 while (maxMessagesPerWrite > 0) {
138 Object msg = in.current();
139 if (msg == null) {
140 break;
141 }
142 try {
143 boolean done = false;
144 for (int i = getWriteSpinCount() - 1; i >= 0; i--) {
145 if (doWriteMessage(msg, in)) {
146 done = true;
147 break;
148 }
149 }
150
151 if (done) {
152 maxMessagesPerWrite--;
153 in.remove();
154 } else {
155 break;
156 }
157 } catch (Exception e) {
158 if (continueOnWriteError()) {
159 maxMessagesPerWrite--;
160 in.remove(e);
161 } else {
162 throw e;
163 }
164 }
165 }
166 if (in.isEmpty()) {
167
168 if ((interestOps & SelectionKey.OP_WRITE) != 0) {
169 key.interestOps(interestOps & ~SelectionKey.OP_WRITE);
170 }
171 } else {
172
173 if ((interestOps & SelectionKey.OP_WRITE) == 0) {
174 key.interestOps(interestOps | SelectionKey.OP_WRITE);
175 }
176 }
177 }
178
179
180
181
182 protected boolean continueOnWriteError() {
183 return false;
184 }
185
186 protected boolean closeOnReadError(Throwable cause) {
187 if (!isActive()) {
188
189 return true;
190 }
191 if (cause instanceof PortUnreachableException) {
192 return false;
193 }
194 if (cause instanceof IOException) {
195
196
197 return !(this instanceof ServerChannel);
198 }
199 return true;
200 }
201
202
203
204
205 protected abstract int doReadMessages(List<Object> buf) throws Exception;
206
207
208
209
210
211
212 protected abstract boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception;
213 }