1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.uring;
17
18 import java.lang.invoke.MethodHandles;
19 import java.lang.invoke.VarHandle;
20 import java.nio.ByteBuffer;
21 import java.nio.ByteOrder;
22 import java.util.StringJoiner;
23
24
25
26
27 final class CompletionQueue {
28 private static final VarHandle INT_HANDLE =
29 MethodHandles.byteBufferViewVarHandle(int[].class, ByteOrder.nativeOrder());
30
31
32
33 private static final int CQE_USER_DATA_FIELD = 0;
34 private static final int CQE_RES_FIELD = 8;
35 private static final int CQE_FLAGS_FIELD = 12;
36
37
38
39 private final ByteBuffer khead;
40 private final ByteBuffer ktail;
41 private final ByteBuffer kflags;
42 private final ByteBuffer completionQueueArray;
43 private final ByteBuffer[] extraCqeData;
44
45 final int ringSize;
46 final long ringAddress;
47 final int ringFd;
48 final int ringEntries;
49 final int ringCapacity;
50 private final int cqeLength;
51
52 private final int ringMask;
53 private int ringHead;
54 private boolean closed;
55
56 CompletionQueue(ByteBuffer kHead, ByteBuffer kTail, int ringMask, int ringEntries, ByteBuffer kflags,
57 ByteBuffer completionQueueArray, int ringSize, long ringAddress,
58 int ringFd, int ringCapacity, int cqeLength, boolean extraCqeDataNeeded) {
59 this.khead = kHead;
60 this.ktail = kTail;
61 this.completionQueueArray = completionQueueArray;
62 this.ringSize = ringSize;
63 this.ringAddress = ringAddress;
64 this.ringFd = ringFd;
65 this.ringCapacity = ringCapacity;
66 this.cqeLength = cqeLength;
67 this.ringEntries = ringEntries;
68 this.kflags = kflags;
69 this.ringMask = ringMask;
70 ringHead = (int) INT_HANDLE.getVolatile(kHead, 0);
71
72 if (extraCqeDataNeeded) {
73
74
75
76 this.extraCqeData = new ByteBuffer[ringEntries];
77 for (int i = 0; i < ringEntries; i++) {
78 int position = i * cqeLength;
79 completionQueueArray.position(position).limit(position + Native.CQE_SIZE);
80 extraCqeData[i] = completionQueueArray.slice();
81 completionQueueArray.clear();
82 }
83 } else {
84 this.extraCqeData = null;
85 }
86 }
87
88 void close() {
89 closed = true;
90 }
91
92 int flags() {
93 if (closed) {
94 return 0;
95 }
96
97 return (int) INT_HANDLE.getOpaque(kflags, 0);
98 }
99
100
101
102
103
104 boolean hasCompletions() {
105 return !closed && ringHead != (int) INT_HANDLE.getVolatile(ktail, 0);
106 }
107
108 int count() {
109 if (closed) {
110 return 0;
111 }
112 return (int) INT_HANDLE.getVolatile(ktail, 0) - ringHead;
113 }
114
115
116
117
118
119 int process(CompletionCallback callback) {
120 if (closed) {
121 return 0;
122 }
123 int tail = (int) INT_HANDLE.getVolatile(ktail, 0);
124 try {
125 int i = 0;
126 while (ringHead != tail) {
127 int cqeIdx = cqeIdx(ringHead, ringMask);
128 int cqePosition = cqeIdx * cqeLength;
129
130 long udata = completionQueueArray.getLong(cqePosition + CQE_USER_DATA_FIELD);
131 int res = completionQueueArray.getInt(cqePosition + CQE_RES_FIELD);
132 int flags = completionQueueArray.getInt(cqePosition + CQE_FLAGS_FIELD);
133
134 ringHead++;
135 final ByteBuffer extraCqeData;
136 if ((flags & Native.IORING_CQE_F_32) != 0) {
137 extraCqeData = extraCqeData(cqeIdx + 1);
138
139 ringHead++;
140 } else if (cqeLength == Native.CQE32_SIZE) {
141 extraCqeData = extraCqeData(cqeIdx + 1);
142 } else {
143 extraCqeData = null;
144 }
145
146 if ((flags & Native.IORING_CQE_F_SKIP) == 0) {
147 i++;
148
149 callback.handle(res, flags, udata, extraCqeData);
150 }
151
152 if (ringHead == tail) {
153
154
155
156 tail = (int) INT_HANDLE.getVolatile(ktail, 0);
157 }
158 }
159 return i;
160 } finally {
161
162 INT_HANDLE.setRelease(khead, 0, ringHead);
163 }
164 }
165
166 private ByteBuffer extraCqeData(int cqeIdx) {
167 if (extraCqeData == null) {
168 return null;
169 }
170 ByteBuffer buffer = extraCqeData[cqeIdx];
171 buffer.clear();
172 return buffer;
173 }
174
175 @Override
176 public String toString() {
177 StringJoiner sb = new StringJoiner(", ", "CompletionQueue [", "]");
178 if (closed) {
179 sb.add("closed");
180 } else {
181 int tail = (int) INT_HANDLE.getVolatile(ktail, 0);
182 int head = ringHead;
183 while (head != tail) {
184 int cqePosition = cqeIdx(head++, ringMask) * cqeLength;
185 long udata = completionQueueArray.getLong(cqePosition + CQE_USER_DATA_FIELD);
186 int res = completionQueueArray.getInt(cqePosition + CQE_RES_FIELD);
187 int flags = completionQueueArray.getInt(cqePosition + CQE_FLAGS_FIELD);
188 if ((flags & Native.IORING_CQE_F_32) != 0) {
189
190 head++;
191 }
192 sb.add("(res=" + res).add(", flags=" + flags).add(", udata=" + udata).add(")");
193 }
194 }
195 return sb.toString();
196 }
197
198 private static int cqeIdx(int ringHead, int ringMask) {
199 return ringHead & ringMask;
200 }
201 }