1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.uring;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.channel.Channel;
20 import io.netty.channel.ChannelFuture;
21 import io.netty.channel.ChannelFutureListener;
22 import io.netty.channel.ChannelMetadata;
23 import io.netty.channel.ChannelOutboundBuffer;
24 import io.netty.channel.ChannelPipeline;
25 import io.netty.channel.ChannelPromise;
26 import io.netty.channel.DefaultFileRegion;
27 import io.netty.channel.EventLoop;
28 import io.netty.channel.IoRegistration;
29 import io.netty.channel.socket.DuplexChannel;
30 import io.netty.channel.unix.IovArray;
31 import io.netty.channel.unix.Limits;
32 import io.netty.util.internal.logging.InternalLogger;
33 import io.netty.util.internal.logging.InternalLoggerFactory;
34
35 import java.io.IOException;
36 import java.net.SocketAddress;
37
38 import static io.netty.channel.unix.Errors.ioResult;
39
40 abstract class AbstractIoUringStreamChannel extends AbstractIoUringChannel implements DuplexChannel {
41 private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractIoUringStreamChannel.class);
42 private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
43
44
45 byte writeOpCode;
46
47 long writeId;
48 byte readOpCode;
49 long readId;
50
51
52 private IoUringBufferRing bufferRing;
53
54 AbstractIoUringStreamChannel(Channel parent, LinuxSocket socket, boolean active) {
55 super(parent, socket, active);
56 }
57
58 AbstractIoUringStreamChannel(Channel parent, LinuxSocket socket, SocketAddress remote) {
59 super(parent, socket, remote);
60 }
61
62 @Override
63 public ChannelMetadata metadata() {
64 return METADATA;
65 }
66
67 @Override
68 protected AbstractUringUnsafe newUnsafe() {
69 return new IoUringStreamUnsafe();
70 }
71
72 @Override
73 public final ChannelFuture shutdown() {
74 return shutdown(newPromise());
75 }
76
77 @Override
78 public final ChannelFuture shutdown(final ChannelPromise promise) {
79 ChannelFuture shutdownOutputFuture = shutdownOutput();
80 if (shutdownOutputFuture.isDone()) {
81 shutdownOutputDone(shutdownOutputFuture, promise);
82 } else {
83 shutdownOutputFuture.addListener(new ChannelFutureListener() {
84 @Override
85 public void operationComplete(final ChannelFuture shutdownOutputFuture) throws Exception {
86 shutdownOutputDone(shutdownOutputFuture, promise);
87 }
88 });
89 }
90 return promise;
91 }
92
93 @Override
94 protected final void doShutdownOutput() throws Exception {
95 socket.shutdown(false, true);
96 }
97
98 private void shutdownInput0(final ChannelPromise promise) {
99 try {
100 socket.shutdown(true, false);
101 promise.setSuccess();
102 } catch (Throwable cause) {
103 promise.setFailure(cause);
104 }
105 }
106
107 @Override
108 public final boolean isOutputShutdown() {
109 return socket.isOutputShutdown();
110 }
111
112 @Override
113 public final boolean isInputShutdown() {
114 return socket.isInputShutdown();
115 }
116
117 @Override
118 public final boolean isShutdown() {
119 return socket.isShutdown();
120 }
121
122 @Override
123 public final ChannelFuture shutdownOutput() {
124 return shutdownOutput(newPromise());
125 }
126
127 @Override
128 public final ChannelFuture shutdownOutput(final ChannelPromise promise) {
129 EventLoop loop = eventLoop();
130 if (loop.inEventLoop()) {
131 ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
132 } else {
133 loop.execute(new Runnable() {
134 @Override
135 public void run() {
136 ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
137 }
138 });
139 }
140
141 return promise;
142 }
143
144 @Override
145 public final ChannelFuture shutdownInput() {
146 return shutdownInput(newPromise());
147 }
148
149 @Override
150 public final ChannelFuture shutdownInput(final ChannelPromise promise) {
151 EventLoop loop = eventLoop();
152 if (loop.inEventLoop()) {
153 shutdownInput0(promise);
154 } else {
155 loop.execute(new Runnable() {
156 @Override
157 public void run() {
158 shutdownInput0(promise);
159 }
160 });
161 }
162 return promise;
163 }
164
165 private void shutdownOutputDone(final ChannelFuture shutdownOutputFuture, final ChannelPromise promise) {
166 ChannelFuture shutdownInputFuture = shutdownInput();
167 if (shutdownInputFuture.isDone()) {
168 shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
169 } else {
170 shutdownInputFuture.addListener(new ChannelFutureListener() {
171 @Override
172 public void operationComplete(ChannelFuture shutdownInputFuture) throws Exception {
173 shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
174 }
175 });
176 }
177 }
178
179 private static void shutdownDone(ChannelFuture shutdownOutputFuture,
180 ChannelFuture shutdownInputFuture,
181 ChannelPromise promise) {
182 Throwable shutdownOutputCause = shutdownOutputFuture.cause();
183 Throwable shutdownInputCause = shutdownInputFuture.cause();
184 if (shutdownOutputCause != null) {
185 if (shutdownInputCause != null) {
186 logger.info("Exception suppressed because a previous exception occurred.",
187 shutdownInputCause);
188 }
189 promise.setFailure(shutdownOutputCause);
190 } else if (shutdownInputCause != null) {
191 promise.setFailure(shutdownInputCause);
192 } else {
193 promise.setSuccess();
194 }
195 }
196
197 @Override
198 protected final void doRegister(ChannelPromise promise) {
199 ChannelPromise registerPromise = this.newPromise();
200
201 registerPromise.addListener(f -> {
202 if (f.isSuccess()) {
203 try {
204 short bgid = ((IoUringStreamChannelConfig) config()).getBufferGroupId();
205 if (bgid >= 0) {
206 final IoUringIoHandler ioUringIoHandler = registration().attachment();
207 bufferRing = ioUringIoHandler.findBufferRing(bgid);
208 }
209 if (active) {
210
211 schedulePollRdHup();
212 }
213 } finally {
214 promise.setSuccess();
215 }
216 } else {
217 promise.setFailure(f.cause());
218 }
219 });
220
221 super.doRegister(registerPromise);
222 }
223
224 @Override
225 protected Object filterOutboundMessage(Object msg) {
226
227
228 if (IoUring.isSpliceSupported() && msg instanceof DefaultFileRegion) {
229 return new IoUringFileRegion((DefaultFileRegion) msg);
230 }
231
232 return super.filterOutboundMessage(msg);
233 }
234
235 protected class IoUringStreamUnsafe extends AbstractUringUnsafe {
236
237 private ByteBuf readBuffer;
238
239 @Override
240 protected int scheduleWriteMultiple(ChannelOutboundBuffer in) {
241 assert writeId == 0;
242 int numElements = Math.min(in.size(), Limits.IOV_MAX);
243 IoUringIoHandler handler = registration().attachment();
244 IovArray iovArray = handler.iovArray();
245 try {
246 int offset = iovArray.count();
247 in.forEachFlushedMessage(iovArray);
248
249 int fd = fd().intValue();
250 IoRegistration registration = registration();
251 IoUringIoOps ops = IoUringIoOps.newWritev(fd, (byte) 0, 0, iovArray.memoryAddress(offset),
252 iovArray.count() - offset, nextOpsId());
253 byte opCode = ops.opcode();
254 writeId = registration.submit(ops);
255 writeOpCode = opCode;
256 if (writeId == 0) {
257 return 0;
258 }
259 } catch (Exception e) {
260
261 scheduleWriteSingle(in.current());
262 }
263 return 1;
264 }
265
266 @Override
267 protected int scheduleWriteSingle(Object msg) {
268 assert writeId == 0;
269
270 int fd = fd().intValue();
271 IoRegistration registration = registration();
272 final IoUringIoOps ops;
273 if (msg instanceof IoUringFileRegion) {
274 IoUringFileRegion fileRegion = (IoUringFileRegion) msg;
275 try {
276 fileRegion.open();
277 } catch (IOException e) {
278 this.handleWriteError(e);
279 return 0;
280 }
281 ops = fileRegion.splice(fd);
282 } else {
283 ByteBuf buf = (ByteBuf) msg;
284 ops = IoUringIoOps.newWrite(fd, (byte) 0, 0,
285 IoUring.memoryAddress(buf) + buf.readerIndex(), buf.readableBytes(), nextOpsId());
286 }
287
288 byte opCode = ops.opcode();
289 writeId = registration.submit(ops);
290 writeOpCode = opCode;
291 if (writeId == 0) {
292 return 0;
293 }
294 return 1;
295 }
296
297 private int calculateRecvFlags(boolean first) {
298
299
300
301
302
303
304 if (first) {
305 return 0;
306 }
307 return Native.MSG_DONTWAIT;
308 }
309
310 private short calculateRecvIoPrio(boolean first, boolean socketIsEmpty) {
311
312
313
314 if (first) {
315
316
317 return socketIsEmpty && IoUring.isCqeFSockNonEmptySupported() ?
318 Native.IORING_RECVSEND_POLL_FIRST : 0;
319 }
320 return 0;
321 }
322
323 @Override
324 protected int scheduleRead0(boolean first, boolean socketIsEmpty) {
325 assert readBuffer == null;
326 assert readId == 0 : readId;
327 final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
328
329 if (bufferRing != null && bufferRing.isUsable()) {
330 return scheduleReadProviderBuffer(bufferRing, first, socketIsEmpty);
331 }
332
333
334 ByteBuf byteBuf = allocHandle.allocate(alloc());
335 try {
336 int fd = fd().intValue();
337 IoRegistration registration = registration();
338 short ioPrio = calculateRecvIoPrio(first, socketIsEmpty);
339 int recvFlags = calculateRecvFlags(first);
340
341 IoUringIoOps ops = IoUringIoOps.newRecv(fd, (byte) 0, ioPrio, recvFlags,
342 IoUring.memoryAddress(byteBuf) + byteBuf.writerIndex(), byteBuf.writableBytes(), nextOpsId());
343 readId = registration.submit(ops);
344 readOpCode = Native.IORING_OP_RECV;
345 if (readId == 0) {
346 return 0;
347 }
348 readBuffer = byteBuf;
349 byteBuf = null;
350 return 1;
351 } finally {
352 if (byteBuf != null) {
353 byteBuf.release();
354 }
355 }
356 }
357
358 private int scheduleReadProviderBuffer(IoUringBufferRing bufferRing, boolean first, boolean socketIsEmpty) {
359 short bgId = bufferRing.bufferGroupId();
360 try {
361 boolean multishot = IoUring.isRecvMultishotEnabled();
362 byte flags = (byte) Native.IOSQE_BUFFER_SELECT;
363 short ioPrio;
364 final int recvFlags;
365 if (multishot) {
366 ioPrio = Native.IORING_RECV_MULTISHOT;
367 recvFlags = 0;
368 } else {
369
370
371 ioPrio = calculateRecvIoPrio(first, socketIsEmpty);
372 recvFlags = calculateRecvFlags(first);
373 }
374 if (IoUring.isRecvsendBundleEnabled()) {
375
376
377 ioPrio |= Native.IORING_RECVSEND_BUNDLE;
378 }
379 IoRegistration registration = registration();
380 int fd = fd().intValue();
381 IoUringIoOps ops = IoUringIoOps.newRecv(
382 fd, flags, ioPrio, recvFlags, 0,
383 0, nextOpsId(), bgId
384 );
385 readId = registration.submit(ops);
386 readOpCode = Native.IORING_OP_RECV;
387 if (readId == 0) {
388 return 0;
389 }
390 if (multishot) {
391
392 return -1;
393 }
394 return 1;
395 } catch (IllegalArgumentException illegalArgumentException) {
396 this.handleReadException(pipeline(), null, illegalArgumentException, false, recvBufAllocHandle());
397 return 0;
398 }
399 }
400
401 @Override
402 protected void readComplete0(byte op, int res, int flags, short data, int outstanding) {
403 ByteBuf byteBuf = readBuffer;
404 readBuffer = null;
405 if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
406 readId = 0;
407
408
409 if (byteBuf != null) {
410
411 byteBuf.release();
412 }
413 return;
414 }
415 boolean rearm = (flags & Native.IORING_CQE_F_MORE) == 0;
416 boolean useBufferRing = (flags & Native.IORING_CQE_F_BUFFER) != 0;
417 short bid = (short) (flags >> Native.IORING_CQE_BUFFER_SHIFT);
418 boolean more = (flags & Native.IORING_CQE_F_BUF_MORE) != 0;
419
420 boolean empty = socketIsEmpty(flags);
421 if (rearm) {
422
423 readId = 0;
424 }
425
426 boolean allDataRead = false;
427
428 final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
429 final ChannelPipeline pipeline = pipeline();
430
431 try {
432 if (res < 0) {
433 if (res == Native.ERRNO_NOBUFS_NEGATIVE) {
434
435
436
437 pipeline.fireUserEventTriggered(bufferRing.getExhaustedEvent());
438
439
440 bufferRing.expand();
441
442
443
444
445 scheduleRead(allocHandle.isFirstRead());
446 return;
447 }
448
449
450
451 allocHandle.lastBytesRead(ioResult("io_uring read", res));
452 } else if (res > 0) {
453 if (useBufferRing) {
454
455
456
457
458
459 int read = res;
460 for (;;) {
461 int attemptedBytesRead = bufferRing.attemptedBytesRead(bid);
462 byteBuf = bufferRing.useBuffer(bid, read, more);
463 read -= byteBuf.readableBytes();
464 allocHandle.attemptedBytesRead(attemptedBytesRead);
465 allocHandle.lastBytesRead(byteBuf.readableBytes());
466
467 assert read >= 0;
468 if (read == 0) {
469
470
471 break;
472 }
473 allocHandle.incMessagesRead(1);
474 pipeline.fireChannelRead(byteBuf);
475 byteBuf = null;
476 bid = bufferRing.nextBid(bid);
477 if (!allocHandle.continueReading()) {
478
479 allocHandle.readComplete();
480 pipeline.fireChannelReadComplete();
481 allocHandle.reset(config());
482 }
483 }
484 } else {
485 int attemptedBytesRead = byteBuf.writableBytes();
486 byteBuf.writerIndex(byteBuf.writerIndex() + res);
487 allocHandle.attemptedBytesRead(attemptedBytesRead);
488 allocHandle.lastBytesRead(res);
489 }
490 } else {
491
492 allocHandle.lastBytesRead(-1);
493 }
494 if (allocHandle.lastBytesRead() <= 0) {
495
496 if (byteBuf != null) {
497
498 byteBuf.release();
499 byteBuf = null;
500 }
501 allDataRead = allocHandle.lastBytesRead() < 0;
502 if (allDataRead) {
503
504 shutdownInput(true);
505 }
506 allocHandle.readComplete();
507 pipeline.fireChannelReadComplete();
508 return;
509 }
510
511 allocHandle.incMessagesRead(1);
512 pipeline.fireChannelRead(byteBuf);
513 byteBuf = null;
514 scheduleNextRead(pipeline, allocHandle, rearm, empty);
515 } catch (Throwable t) {
516 handleReadException(pipeline, byteBuf, t, allDataRead, allocHandle);
517 }
518 }
519
520 private void scheduleNextRead(ChannelPipeline pipeline, IoUringRecvByteAllocatorHandle allocHandle,
521 boolean rearm, boolean empty) {
522 if (allocHandle.continueReading() && !empty) {
523 if (rearm) {
524
525
526 scheduleRead(false);
527 }
528 } else {
529
530 allocHandle.readComplete();
531 pipeline.fireChannelReadComplete();
532 }
533 }
534
535 protected final void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf,
536 Throwable cause, boolean allDataRead,
537 IoUringRecvByteAllocatorHandle allocHandle) {
538 if (byteBuf != null) {
539 if (byteBuf.isReadable()) {
540 pipeline.fireChannelRead(byteBuf);
541 } else {
542 byteBuf.release();
543 }
544 }
545 allocHandle.readComplete();
546 pipeline.fireChannelReadComplete();
547 pipeline.fireExceptionCaught(cause);
548 if (allDataRead || cause instanceof IOException) {
549 shutdownInput(true);
550 }
551 }
552
553 @Override
554 boolean writeComplete0(byte op, int res, int flags, short data, int outstanding) {
555 writeId = 0;
556 writeOpCode = 0;
557
558 ChannelOutboundBuffer channelOutboundBuffer = unsafe().outboundBuffer();
559 Object current = channelOutboundBuffer.current();
560 if (current instanceof IoUringFileRegion) {
561 IoUringFileRegion fileRegion = (IoUringFileRegion) current;
562 try {
563 int result = res >= 0 ? res : ioResult("io_uring splice", res);
564 if (result == 0 && fileRegion.count() > 0) {
565 validateFileRegion(fileRegion.fileRegion, fileRegion.transfered());
566 return false;
567 }
568 int progress = fileRegion.handleResult(result, data);
569 if (progress == -1) {
570
571 channelOutboundBuffer.remove();
572 } else if (progress > 0) {
573 channelOutboundBuffer.progress(progress);
574 }
575 } catch (Throwable cause) {
576 handleWriteError(cause);
577 }
578 return true;
579 }
580
581 if (res >= 0) {
582 channelOutboundBuffer.removeBytes(res);
583 } else if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
584 return true;
585 } else {
586 try {
587 if (ioResult("io_uring write", res) == 0) {
588 return false;
589 }
590 } catch (Throwable cause) {
591 handleWriteError(cause);
592 }
593 }
594 return true;
595 }
596
597 @Override
598 protected void freeResourcesNow(IoRegistration reg) {
599 super.freeResourcesNow(reg);
600 assert readBuffer == null;
601 }
602 }
603
604 @Override
605 protected final void cancelOutstandingReads(IoRegistration registration, int numOutstandingReads) {
606 if (readId != 0) {
607
608 assert numOutstandingReads == 1 || numOutstandingReads == -1;
609 IoUringIoOps ops = IoUringIoOps.newAsyncCancel((byte) 0, readId, readOpCode);
610 long id = registration.submit(ops);
611 assert id != 0;
612 readId = 0;
613 }
614 }
615
616 @Override
617 protected final void cancelOutstandingWrites(IoRegistration registration, int numOutstandingWrites) {
618 if (writeId != 0) {
619
620
621 assert numOutstandingWrites == 1;
622 assert writeOpCode != 0;
623 long id = registration.submit(IoUringIoOps.newAsyncCancel((byte) 0, writeId, writeOpCode));
624 assert id != 0;
625 writeId = 0;
626 }
627 }
628
629 @Override
630 protected boolean socketIsEmpty(int flags) {
631 return IoUring.isCqeFSockNonEmptySupported() && (flags & Native.IORING_CQE_F_SOCK_NONEMPTY) == 0;
632 }
633
634 @Override
635 protected void submitAndRunNow() {
636 if (writeId != 0) {
637
638
639 ((IoUringIoHandler) registration().attachment()).submitAndRunNow(writeId);
640 }
641 super.submitAndRunNow();
642 }
643
644 @Override
645 boolean isPollInFirst() {
646 return bufferRing == null || !bufferRing.isUsable();
647 }
648 }