1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.uring;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.channel.Channel;
20 import io.netty.channel.ChannelFuture;
21 import io.netty.channel.ChannelFutureListener;
22 import io.netty.channel.ChannelMetadata;
23 import io.netty.channel.ChannelOutboundBuffer;
24 import io.netty.channel.ChannelPipeline;
25 import io.netty.channel.ChannelPromise;
26 import io.netty.channel.DefaultFileRegion;
27 import io.netty.channel.EventLoop;
28 import io.netty.channel.socket.DuplexChannel;
29 import io.netty.channel.unix.IovArray;
30 import io.netty.channel.unix.Limits;
31 import io.netty.util.internal.logging.InternalLogger;
32 import io.netty.util.internal.logging.InternalLoggerFactory;
33
34 import java.io.IOException;
35 import java.net.SocketAddress;
36
37 import static io.netty.channel.unix.Errors.ioResult;
38
39 abstract class AbstractIoUringStreamChannel extends AbstractIoUringChannel implements DuplexChannel {
40 private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractIoUringStreamChannel.class);
41 private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
42
43
44 private byte writeOpCode;
45
46
47 private long writeId;
48 private long readId;
49
50 AbstractIoUringStreamChannel(Channel parent, LinuxSocket socket, boolean active) {
51 super(parent, socket, active);
52 }
53
54 AbstractIoUringStreamChannel(Channel parent, LinuxSocket socket, SocketAddress remote) {
55 super(parent, socket, remote);
56 }
57
58 @Override
59 public ChannelMetadata metadata() {
60 return METADATA;
61 }
62
63 @Override
64 protected final AbstractUringUnsafe newUnsafe() {
65 return new IoUringStreamUnsafe();
66 }
67
68 @Override
69 public final ChannelFuture shutdown() {
70 return shutdown(newPromise());
71 }
72
73 @Override
74 public final ChannelFuture shutdown(final ChannelPromise promise) {
75 ChannelFuture shutdownOutputFuture = shutdownOutput();
76 if (shutdownOutputFuture.isDone()) {
77 shutdownOutputDone(shutdownOutputFuture, promise);
78 } else {
79 shutdownOutputFuture.addListener(new ChannelFutureListener() {
80 @Override
81 public void operationComplete(final ChannelFuture shutdownOutputFuture) throws Exception {
82 shutdownOutputDone(shutdownOutputFuture, promise);
83 }
84 });
85 }
86 return promise;
87 }
88
89 @Override
90 protected final void doShutdownOutput() throws Exception {
91 socket.shutdown(false, true);
92 }
93
94 private void shutdownInput0(final ChannelPromise promise) {
95 try {
96 socket.shutdown(true, false);
97 promise.setSuccess();
98 } catch (Throwable cause) {
99 promise.setFailure(cause);
100 }
101 }
102
103 @Override
104 public final boolean isOutputShutdown() {
105 return socket.isOutputShutdown();
106 }
107
108 @Override
109 public final boolean isInputShutdown() {
110 return socket.isInputShutdown();
111 }
112
113 @Override
114 public final boolean isShutdown() {
115 return socket.isShutdown();
116 }
117
118 @Override
119 public final ChannelFuture shutdownOutput() {
120 return shutdownOutput(newPromise());
121 }
122
123 @Override
124 public final ChannelFuture shutdownOutput(final ChannelPromise promise) {
125 EventLoop loop = eventLoop();
126 if (loop.inEventLoop()) {
127 ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
128 } else {
129 loop.execute(new Runnable() {
130 @Override
131 public void run() {
132 ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
133 }
134 });
135 }
136
137 return promise;
138 }
139
140 @Override
141 public final ChannelFuture shutdownInput() {
142 return shutdownInput(newPromise());
143 }
144
145 @Override
146 public final ChannelFuture shutdownInput(final ChannelPromise promise) {
147 EventLoop loop = eventLoop();
148 if (loop.inEventLoop()) {
149 shutdownInput0(promise);
150 } else {
151 loop.execute(new Runnable() {
152 @Override
153 public void run() {
154 shutdownInput0(promise);
155 }
156 });
157 }
158 return promise;
159 }
160
161 private void shutdownOutputDone(final ChannelFuture shutdownOutputFuture, final ChannelPromise promise) {
162 ChannelFuture shutdownInputFuture = shutdownInput();
163 if (shutdownInputFuture.isDone()) {
164 shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
165 } else {
166 shutdownInputFuture.addListener(new ChannelFutureListener() {
167 @Override
168 public void operationComplete(ChannelFuture shutdownInputFuture) throws Exception {
169 shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
170 }
171 });
172 }
173 }
174
175 private static void shutdownDone(ChannelFuture shutdownOutputFuture,
176 ChannelFuture shutdownInputFuture,
177 ChannelPromise promise) {
178 Throwable shutdownOutputCause = shutdownOutputFuture.cause();
179 Throwable shutdownInputCause = shutdownInputFuture.cause();
180 if (shutdownOutputCause != null) {
181 if (shutdownInputCause != null) {
182 logger.info("Exception suppressed because a previous exception occurred.",
183 shutdownInputCause);
184 }
185 promise.setFailure(shutdownOutputCause);
186 } else if (shutdownInputCause != null) {
187 promise.setFailure(shutdownInputCause);
188 } else {
189 promise.setSuccess();
190 }
191 }
192
193 @Override
194 protected final void doRegister(ChannelPromise promise) {
195 super.doRegister(promise);
196 promise.addListener(f -> {
197 if (f.isSuccess()) {
198 if (active) {
199
200 schedulePollRdHup();
201 }
202 }
203 });
204 }
205
206 @Override
207 protected Object filterOutboundMessage(Object msg) {
208
209
210 if (IoUring.isIOUringSpliceSupported() && msg instanceof DefaultFileRegion) {
211 return new IoUringFileRegion((DefaultFileRegion) msg);
212 }
213
214 return super.filterOutboundMessage(msg);
215 }
216
217 private final class IoUringStreamUnsafe extends AbstractUringUnsafe {
218
219 private ByteBuf readBuffer;
220 private IovArray iovArray;
221
222 @Override
223 protected int scheduleWriteMultiple(ChannelOutboundBuffer in) {
224 assert iovArray == null;
225 assert writeId == 0;
226 int numElements = Math.min(in.size(), Limits.IOV_MAX);
227 ByteBuf iovArrayBuffer = alloc().directBuffer(numElements * IovArray.IOV_SIZE);
228 iovArray = new IovArray(iovArrayBuffer);
229 try {
230 int offset = iovArray.count();
231 in.forEachFlushedMessage(iovArray);
232
233 int fd = fd().intValue();
234 IoUringIoRegistration registration = registration();
235 IoUringIoOps ops = IoUringIoOps.newWritev(fd, flags((byte) 0), 0, iovArray.memoryAddress(offset),
236 iovArray.count() - offset, nextOpsId());
237 byte opCode = ops.opcode();
238 writeId = registration.submit(ops);
239 writeOpCode = opCode;
240 if (writeId == 0) {
241 iovArray.release();
242 iovArray = null;
243 return 0;
244 }
245 } catch (Exception e) {
246 iovArray.release();
247 iovArray = null;
248
249
250 scheduleWriteSingle(in.current());
251 }
252 return 1;
253 }
254
255 @Override
256 protected int scheduleWriteSingle(Object msg) {
257 assert iovArray == null;
258 assert writeId == 0;
259
260 int fd = fd().intValue();
261 IoUringIoRegistration registration = registration();
262 final IoUringIoOps ops;
263 if (msg instanceof IoUringFileRegion) {
264 IoUringFileRegion fileRegion = (IoUringFileRegion) msg;
265 try {
266 fileRegion.open();
267 } catch (IOException e) {
268 this.handleWriteError(e);
269 return 0;
270 }
271 ops = fileRegion.splice(fd);
272 } else {
273 ByteBuf buf = (ByteBuf) msg;
274 ops = IoUringIoOps.newWrite(fd, flags((byte) 0), 0,
275 buf.memoryAddress() + buf.readerIndex(), buf.readableBytes(), nextOpsId());
276 }
277
278 byte opCode = ops.opcode();
279 writeId = registration.submit(ops);
280 writeOpCode = opCode;
281 if (writeId == 0) {
282 return 0;
283 }
284 return 1;
285 }
286
287 @Override
288 protected int scheduleRead0(boolean first, boolean socketIsEmpty) {
289 assert readBuffer == null;
290 assert readId == 0;
291
292 final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
293 ByteBuf byteBuf = allocHandle.allocate(alloc());
294 try {
295 allocHandle.attemptedBytesRead(byteBuf.writableBytes());
296 int fd = fd().intValue();
297 IoUringIoRegistration registration = registration();
298
299
300
301
302 final short ioPrio;
303
304
305
306
307
308
309
310 final int recvFlags;
311
312 if (first) {
313
314
315 ioPrio = socketIsEmpty && IoUring.isIOUringCqeFSockNonEmptySupported() ?
316 Native.IORING_RECVSEND_POLL_FIRST : 0;
317 recvFlags = 0;
318 } else {
319 ioPrio = 0;
320 recvFlags = Native.MSG_DONTWAIT;
321 }
322 IoUringIoOps ops = IoUringIoOps.newRecv(fd, flags((byte) 0), ioPrio, recvFlags,
323 byteBuf.memoryAddress() + byteBuf.writerIndex(), byteBuf.writableBytes(), nextOpsId());
324 readId = registration.submit(ops);
325 if (readId == 0) {
326 return 0;
327 }
328 readBuffer = byteBuf;
329 byteBuf = null;
330 return 1;
331 } finally {
332 if (byteBuf != null) {
333 byteBuf.release();
334 }
335 }
336 }
337
338 @Override
339 protected void readComplete0(byte op, int res, int flags, short data, int outstanding) {
340 assert readId != 0;
341 readId = 0;
342 boolean allDataRead = false;
343
344 final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
345 final ChannelPipeline pipeline = pipeline();
346 ByteBuf byteBuf = this.readBuffer;
347 this.readBuffer = null;
348 assert byteBuf != null;
349
350 try {
351 if (res < 0) {
352 if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
353 byteBuf.release();
354 return;
355 }
356
357
358 allocHandle.lastBytesRead(ioResult("io_uring read", res));
359 } else if (res > 0) {
360 byteBuf.writerIndex(byteBuf.writerIndex() + res);
361 allocHandle.lastBytesRead(res);
362 } else {
363
364 allocHandle.lastBytesRead(-1);
365 }
366 if (allocHandle.lastBytesRead() <= 0) {
367
368 byteBuf.release();
369 byteBuf = null;
370 allDataRead = allocHandle.lastBytesRead() < 0;
371 if (allDataRead) {
372
373 shutdownInput(true);
374 }
375 allocHandle.readComplete();
376 pipeline.fireChannelReadComplete();
377 return;
378 }
379
380 allocHandle.incMessagesRead(1);
381 pipeline.fireChannelRead(byteBuf);
382 byteBuf = null;
383 if (allocHandle.continueReading() && !socketIsEmpty(flags)) {
384
385 scheduleRead(false);
386 } else {
387
388 allocHandle.readComplete();
389 pipeline.fireChannelReadComplete();
390 }
391 } catch (Throwable t) {
392 handleReadException(pipeline, byteBuf, t, allDataRead, allocHandle);
393 }
394 }
395
396 private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf,
397 Throwable cause, boolean allDataRead,
398 IoUringRecvByteAllocatorHandle allocHandle) {
399 if (byteBuf != null) {
400 if (byteBuf.isReadable()) {
401 pipeline.fireChannelRead(byteBuf);
402 } else {
403 byteBuf.release();
404 }
405 }
406 allocHandle.readComplete();
407 pipeline.fireChannelReadComplete();
408 pipeline.fireExceptionCaught(cause);
409 if (allDataRead || cause instanceof IOException) {
410 shutdownInput(true);
411 }
412 }
413
414 @Override
415 boolean writeComplete0(byte op, int res, int flags, short data, int outstanding) {
416 assert writeId != 0;
417 writeId = 0;
418 writeOpCode = 0;
419
420 ChannelOutboundBuffer channelOutboundBuffer = unsafe().outboundBuffer();
421 Object current = channelOutboundBuffer.current();
422 if (current instanceof IoUringFileRegion) {
423 IoUringFileRegion fileRegion = (IoUringFileRegion) current;
424 try {
425 int result = res >= 0 ? res : ioResult("io_uring splice", res);
426 if (result == 0 && fileRegion.count() > 0) {
427 validateFileRegion(fileRegion.fileRegion, fileRegion.transfered());
428 return false;
429 }
430 int progress = fileRegion.handleResult(result, data);
431 if (progress == -1) {
432
433 channelOutboundBuffer.remove();
434 } else if (progress > 0) {
435 channelOutboundBuffer.progress(progress);
436 }
437 } catch (Throwable cause) {
438 handleWriteError(cause);
439 }
440 return true;
441 }
442
443 IovArray iovArray = this.iovArray;
444 if (iovArray != null) {
445 this.iovArray = null;
446 iovArray.release();
447 }
448 if (res >= 0) {
449 channelOutboundBuffer.removeBytes(res);
450 } else if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
451 return true;
452 } else {
453 try {
454 if (ioResult("io_uring write", res) == 0) {
455 return false;
456 }
457 } catch (Throwable cause) {
458 handleWriteError(cause);
459 }
460 }
461 return true;
462 }
463
464 @Override
465 protected void freeResourcesNow(IoUringIoRegistration reg) {
466 super.freeResourcesNow(reg);
467 assert readBuffer == null;
468 }
469 }
470
471 @Override
472 protected final void cancelOutstandingReads(IoUringIoRegistration registration, int numOutstandingReads) {
473 if (readId != 0) {
474
475 assert numOutstandingReads == 1;
476 int fd = fd().intValue();
477 IoUringIoOps ops = IoUringIoOps.newAsyncCancel(fd, flags((byte) 0), readId, Native.IORING_OP_RECV);
478 registration.submit(ops);
479 } else {
480 assert numOutstandingReads == 0;
481 }
482 }
483
484 @Override
485 protected final void cancelOutstandingWrites(IoUringIoRegistration registration, int numOutstandingWrites) {
486 if (writeId != 0) {
487
488
489 assert numOutstandingWrites == 1;
490 assert writeOpCode != 0;
491 int fd = fd().intValue();
492 registration.submit(IoUringIoOps.newAsyncCancel(fd, flags((byte) 0), writeId, writeOpCode));
493 } else {
494 assert numOutstandingWrites == 0;
495 }
496 }
497
498 @Override
499 protected boolean socketIsEmpty(int flags) {
500 return IoUring.isIOUringCqeFSockNonEmptySupported() && (flags & Native.IORING_CQE_F_SOCK_NONEMPTY) == 0;
501 }
502 }