1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty5.channel.nio;
17
18 import io.netty5.buffer.api.Buffer;
19 import io.netty5.buffer.api.BufferAllocator;
20 import io.netty5.channel.AdaptiveRecvBufferAllocator;
21 import io.netty5.channel.ChannelShutdownDirection;
22 import io.netty5.util.Resource;
23 import io.netty5.channel.Channel;
24 import io.netty5.channel.ChannelMetadata;
25 import io.netty5.channel.ChannelOutboundBuffer;
26 import io.netty5.channel.ChannelPipeline;
27 import io.netty5.channel.EventLoop;
28 import io.netty5.channel.FileRegion;
29 import io.netty5.channel.RecvBufferAllocator;
30 import io.netty5.channel.internal.ChannelUtils;
31 import io.netty5.util.internal.StringUtil;
32
33 import java.io.IOException;
34 import java.net.SocketAddress;
35 import java.nio.channels.SelectableChannel;
36 import java.nio.channels.SelectionKey;
37
38 import static io.netty5.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL;
39
40
41
42
43 public abstract class AbstractNioByteChannel<P extends Channel, L extends SocketAddress, R extends SocketAddress>
44 extends AbstractNioChannel<P, L, R> {
45 private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
46 private static final String EXPECTED_TYPES =
47 " (expected: " + StringUtil.simpleClassName(Buffer.class) + ", " +
48 StringUtil.simpleClassName(FileRegion.class) + ')';
49
50
51
52 private final Runnable flushTask = this::writeFlushed;
53 private boolean inputClosedSeenErrorOnRead;
54
55
56
57
58
59
60
61
62 protected AbstractNioByteChannel(P parent, EventLoop eventLoop, SelectableChannel ch) {
63 super(parent, eventLoop, METADATA, new AdaptiveRecvBufferAllocator(), ch, SelectionKey.OP_READ);
64 }
65
66 final boolean shouldBreakReadReady() {
67 return isShutdown(ChannelShutdownDirection.Inbound) &&
68 (inputClosedSeenErrorOnRead || !isAllowHalfClosure());
69 }
70
71 private void closeOnRead() {
72 if (!isShutdown(ChannelShutdownDirection.Inbound)) {
73 if (isAllowHalfClosure()) {
74 shutdownTransport(ChannelShutdownDirection.Inbound, newPromise());
75 } else {
76 closeTransport(newPromise());
77 }
78 } else {
79 inputClosedSeenErrorOnRead = true;
80 }
81 }
82
83 private void handleReadException(ChannelPipeline pipeline, Buffer buffer, Throwable cause, boolean close,
84 RecvBufferAllocator.Handle allocHandle) {
85 if (buffer.readableBytes() > 0) {
86 readPending = false;
87 pipeline.fireChannelRead(buffer);
88 } else {
89 buffer.close();
90 }
91 allocHandle.readComplete();
92 pipeline.fireChannelReadComplete();
93 pipeline.fireChannelExceptionCaught(cause);
94
95
96
97 if (close || cause instanceof OutOfMemoryError || cause instanceof IOException) {
98 closeOnRead();
99 } else {
100 readIfIsAutoRead();
101 }
102 }
103
104 @Override
105 protected final void readNow() {
106 if (shouldBreakReadReady()) {
107 clearReadPending();
108 return;
109 }
110 final ChannelPipeline pipeline = pipeline();
111 final BufferAllocator bufferAllocator = bufferAllocator();
112 final RecvBufferAllocator.Handle allocHandle = recvBufAllocHandle();
113 allocHandle.reset();
114
115 Buffer buffer = null;
116 boolean close = false;
117 try {
118 do {
119 buffer = allocHandle.allocate(bufferAllocator);
120 allocHandle.lastBytesRead(doReadBytes(buffer));
121 if (allocHandle.lastBytesRead() <= 0) {
122
123 Resource.dispose(buffer);
124 buffer = null;
125 close = allocHandle.lastBytesRead() < 0;
126 if (close) {
127
128 readPending = false;
129 }
130 break;
131 }
132
133 allocHandle.incMessagesRead(1);
134 readPending = false;
135 pipeline.fireChannelRead(buffer);
136 buffer = null;
137 } while (allocHandle.continueReading(isAutoRead()) && !isShutdown(ChannelShutdownDirection.Inbound));
138
139 allocHandle.readComplete();
140 pipeline.fireChannelReadComplete();
141
142 if (close) {
143 closeOnRead();
144 } else {
145 readIfIsAutoRead();
146 }
147 } catch (Throwable t) {
148 handleReadException(pipeline, buffer, t, close, allocHandle);
149 } finally {
150
151
152
153
154
155
156 if (!readPending && !isAutoRead()) {
157 removeReadOp();
158 }
159 }
160 }
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176 protected final int doWrite0(ChannelOutboundBuffer in) throws Exception {
177 Object msg = in.current();
178 if (msg == null) {
179
180 return 0;
181 }
182 return doWriteInternal(in, in.current());
183 }
184
185 private int doWriteInternal(ChannelOutboundBuffer in, Object msg) throws Exception {
186 if (msg instanceof Buffer) {
187 Buffer buf = (Buffer) msg;
188 if (buf.readableBytes() == 0) {
189 in.remove();
190 return 0;
191 }
192
193 final int localFlushAmount = doWriteBytes(buf);
194 if (localFlushAmount > 0) {
195 in.progress(localFlushAmount);
196 if (buf.readableBytes() == 0) {
197 in.remove();
198 }
199 return 1;
200 }
201 } else if (msg instanceof FileRegion) {
202 FileRegion region = (FileRegion) msg;
203 if (region.transferred() >= region.count()) {
204 in.remove();
205 return 0;
206 }
207
208 long localFlushedAmount = doWriteFileRegion(region);
209 if (localFlushedAmount > 0) {
210 in.progress(localFlushedAmount);
211 if (region.transferred() >= region.count()) {
212 in.remove();
213 }
214 return 1;
215 }
216 } else {
217
218 throw new Error();
219 }
220 return WRITE_STATUS_SNDBUF_FULL;
221 }
222
223 @Override
224 protected void doWrite(ChannelOutboundBuffer in) throws Exception {
225 int writeSpinCount = getWriteSpinCount();
226 do {
227 Object msg = in.current();
228 if (msg == null) {
229
230 clearOpWrite();
231
232 return;
233 }
234 writeSpinCount -= doWriteInternal(in, msg);
235 } while (writeSpinCount > 0);
236
237 incompleteWrite(writeSpinCount < 0);
238 }
239
240 @Override
241 protected final Object filterOutboundMessage(Object msg) {
242 if (msg instanceof Buffer) {
243 Buffer buf = (Buffer) msg;
244 if (buf.isDirect()) {
245 return msg;
246 }
247
248 return newDirectBuffer(buf);
249 }
250
251 if (msg instanceof FileRegion) {
252 return msg;
253 }
254
255 throw new UnsupportedOperationException(
256 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
257 }
258
259 protected final void incompleteWrite(boolean setOpWrite) {
260
261 if (setOpWrite) {
262 setOpWrite();
263 } else {
264
265
266
267
268 clearOpWrite();
269
270
271 executor().execute(flushTask);
272 }
273 }
274
275
276
277
278
279
280
281 protected abstract long doWriteFileRegion(FileRegion region) throws Exception;
282
283
284
285
286 protected abstract int doReadBytes(Buffer buf) throws Exception;
287
288
289
290
291
292
293 protected abstract int doWriteBytes(Buffer buf) throws Exception;
294
295 protected final void setOpWrite() {
296 final SelectionKey key = selectionKey();
297
298
299
300 if (key == null || !key.isValid()) {
301 return;
302 }
303 final int interestOps = key.interestOps();
304 if ((interestOps & SelectionKey.OP_WRITE) == 0) {
305 key.interestOps(interestOps | SelectionKey.OP_WRITE);
306 }
307 }
308
309 protected final void clearOpWrite() {
310 final SelectionKey key = selectionKey();
311
312
313
314 if (key == null || !key.isValid()) {
315 return;
316 }
317 final int interestOps = key.interestOps();
318 if ((interestOps & SelectionKey.OP_WRITE) != 0) {
319 key.interestOps(interestOps & ~SelectionKey.OP_WRITE);
320 }
321 }
322 }