1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.uring;
17
18 import io.netty.util.internal.PlatformDependent;
19 import io.netty.util.internal.logging.InternalLogger;
20 import io.netty.util.internal.logging.InternalLoggerFactory;
21
22 import java.util.StringJoiner;
23 import java.util.function.IntSupplier;
24
25 import static io.netty.channel.uring.UserData.decode;
26
27
28
29
30 final class CompletionQueue implements IntSupplier {
31 private static final InternalLogger logger = InternalLoggerFactory.getInstance(CompletionQueue.class);
32
33
34
35 private static final int CQE_USER_DATA_FIELD = 0;
36 private static final int CQE_RES_FIELD = 8;
37 private static final int CQE_FLAGS_FIELD = 12;
38
39 private static final long CQE_SIZE = 16;
40
41
42 private final long kHeadAddress;
43 private final long kTailAddress;
44
45 private final long completionQueueArrayAddress;
46
47 final int ringSize;
48 final long ringAddress;
49 final int ringFd;
50
51 private final int ringEntries;
52 private final int ringMask;
53 private int ringHead;
54
55 CompletionQueue(long kHeadAddress, long kTailAddress, long kRingMaskAddress, long kRingEntriesAddress,
56 long kOverflowAddress, long completionQueueArrayAddress, int ringSize, long ringAddress,
57 int ringFd) {
58 this.kHeadAddress = kHeadAddress;
59 this.kTailAddress = kTailAddress;
60 this.completionQueueArrayAddress = completionQueueArrayAddress;
61 this.ringSize = ringSize;
62 this.ringAddress = ringAddress;
63 this.ringFd = ringFd;
64
65 ringEntries = PlatformDependent.getIntVolatile(kRingEntriesAddress);
66 ringMask = PlatformDependent.getIntVolatile(kRingMaskAddress);
67 ringHead = PlatformDependent.getIntVolatile(kHeadAddress);
68 }
69
70
71
72
73
74 boolean hasCompletions() {
75 return ringHead != PlatformDependent.getIntVolatile(kTailAddress);
76 }
77
78 @Override
79 public int getAsInt() {
80 return count();
81 }
82
83 int count() {
84 return PlatformDependent.getIntVolatile(kTailAddress) - ringHead;
85 }
86
87
88
89
90
91 int process(CompletionCallback callback) {
92 int tail = PlatformDependent.getIntVolatile(kTailAddress);
93 int i = 0;
94 boolean isTraceEnabled = logger.isTraceEnabled();
95 while (ringHead != tail) {
96 long cqeAddress = completionQueueArrayAddress + (ringHead & ringMask) * CQE_SIZE;
97
98 long udata = PlatformDependent.getLong(cqeAddress + CQE_USER_DATA_FIELD);
99 int res = PlatformDependent.getInt(cqeAddress + CQE_RES_FIELD);
100 int flags = PlatformDependent.getInt(cqeAddress + CQE_FLAGS_FIELD);
101
102
103 ringHead++;
104 PlatformDependent.putIntOrdered(kHeadAddress, ringHead);
105
106 i++;
107
108 if (isTraceEnabled) {
109 logger.trace("completed(ring {}): {}(id={}, res={})",
110 ringFd, Native.opToStr(UserData.decodeOp(udata)), UserData.decodeId(udata), res);
111 }
112 try {
113 decode(res, flags, udata, callback);
114 } catch (Error e) {
115 throw e;
116 } catch (Throwable throwable) {
117 handleLoopException(throwable);
118 }
119 }
120 return i;
121 }
122
123 private static void handleLoopException(Throwable throwable) {
124 logger.warn("Unexpected exception in the IO event loop.", throwable);
125
126
127
128 try {
129 Thread.sleep(100);
130 } catch (InterruptedException ignore) {
131 }
132 }
133
134 @Override
135 public String toString() {
136 StringJoiner sb = new StringJoiner(", ", "CompletionQueue [", "]");
137 int tail = PlatformDependent.getIntVolatile(kTailAddress);
138 int head = ringHead;
139 while (head != tail) {
140 long cqeAddress = completionQueueArrayAddress + (ringHead & ringMask) * CQE_SIZE;
141 long udata = PlatformDependent.getLong(cqeAddress + CQE_USER_DATA_FIELD);
142 int res = PlatformDependent.getInt(cqeAddress + CQE_RES_FIELD);
143 sb.add(Native.opToStr(UserData.decodeOp(udata)) + "(id=" + UserData.decodeId(udata) + ",res=" + res + ')');
144 head++;
145 }
146 return sb.toString();
147 }
148
149
150
151
152 void ioUringWaitCqe() {
153 int ret = Native.ioUringEnter(ringFd, 0, 1, Native.IORING_ENTER_GETEVENTS);
154 if (logger.isTraceEnabled()) {
155 logger.trace("completed(ring {}): {}", ringFd, this);
156 }
157 if (ret < 0) {
158 throw new RuntimeException("ioUringEnter syscall returned " + ret);
159 }
160 }
161 }