1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.uring;
17
18 import java.lang.invoke.MethodHandles;
19 import java.lang.invoke.VarHandle;
20 import java.nio.ByteBuffer;
21 import java.nio.ByteOrder;
22 import java.util.StringJoiner;
23
24
25
26
27 final class CompletionQueue {
28 private static final VarHandle INT_HANDLE =
29 MethodHandles.byteBufferViewVarHandle(int[].class, ByteOrder.nativeOrder());
30
31
32
33 private static final int CQE_USER_DATA_FIELD = 0;
34 private static final int CQE_RES_FIELD = 8;
35 private static final int CQE_FLAGS_FIELD = 12;
36
37
38
39 private final ByteBuffer khead;
40 private final ByteBuffer ktail;
41 private final ByteBuffer kflags;
42 private final ByteBuffer completionQueueArray;
43 private final ByteBuffer[] extraCqeData;
44
45 final int ringSize;
46 final long ringAddress;
47 final int ringFd;
48 final int ringEntries;
49 final int ringCapacity;
50 private final int cqeLength;
51
52 private final int ringMask;
53 private int ringHead;
54 private boolean closed;
55
56 CompletionQueue(ByteBuffer kHead, ByteBuffer kTail, int ringMask, int ringEntries, ByteBuffer kflags,
57 ByteBuffer completionQueueArray, int ringSize, long ringAddress,
58 int ringFd, int ringCapacity, int cqeLength) {
59 this.khead = kHead;
60 this.ktail = kTail;
61 this.completionQueueArray = completionQueueArray;
62 this.ringSize = ringSize;
63 this.ringAddress = ringAddress;
64 this.ringFd = ringFd;
65 this.ringCapacity = ringCapacity;
66 this.cqeLength = cqeLength;
67 this.ringEntries = ringEntries;
68 this.kflags = kflags;
69 this.ringMask = ringMask;
70 ringHead = (int) INT_HANDLE.getVolatile(kHead, 0);
71
72 if (cqeLength == Native.CQE32_SIZE) {
73
74
75 this.extraCqeData = new ByteBuffer[ringEntries];
76 for (int i = 0; i < ringEntries; i++) {
77 int position = i * cqeLength + Native.CQE_SIZE;
78 completionQueueArray.position(position).limit(position + Native.CQE_SIZE);
79 extraCqeData[i] = completionQueueArray.slice();
80 completionQueueArray.clear();
81 }
82 completionQueueArray.clear();
83 } else {
84 assert cqeLength == Native.CQE_SIZE;
85 this.extraCqeData = null;
86 }
87 }
88
89 void close() {
90 closed = true;
91 }
92
93 int flags() {
94 if (closed) {
95 return 0;
96 }
97
98 return (int) INT_HANDLE.getOpaque(kflags, 0);
99 }
100
101
102
103
104
105 boolean hasCompletions() {
106 return !closed && ringHead != (int) INT_HANDLE.getVolatile(ktail, 0);
107 }
108
109 int count() {
110 if (closed) {
111 return 0;
112 }
113 return (int) INT_HANDLE.getVolatile(ktail, 0) - ringHead;
114 }
115
116
117
118
119
120 int process(CompletionCallback callback) {
121 if (closed) {
122 return 0;
123 }
124 int tail = (int) INT_HANDLE.getVolatile(ktail, 0);
125 try {
126 int i = 0;
127 while (ringHead != tail) {
128 int cqeIdx = cqeIdx(ringHead, ringMask);
129 int cqePosition = cqeIdx * cqeLength;
130
131 long udata = completionQueueArray.getLong(cqePosition + CQE_USER_DATA_FIELD);
132 int res = completionQueueArray.getInt(cqePosition + CQE_RES_FIELD);
133 int flags = completionQueueArray.getInt(cqePosition + CQE_FLAGS_FIELD);
134
135 ringHead++;
136
137 i++;
138 if (!callback.handle(res, flags, udata, extraCqeData(cqeIdx))) {
139
140 break;
141 }
142 if (ringHead == tail) {
143
144
145
146 tail = (int) INT_HANDLE.getVolatile(ktail, 0);
147 }
148 }
149 return i;
150 } finally {
151
152 INT_HANDLE.setRelease(khead, 0, ringHead);
153 }
154 }
155
156 private ByteBuffer extraCqeData(int cqeIdx) {
157 if (extraCqeData == null) {
158 return null;
159 }
160 ByteBuffer buffer = extraCqeData[cqeIdx];
161 buffer.clear();
162 return buffer;
163 }
164
165 @Override
166 public String toString() {
167 StringJoiner sb = new StringJoiner(", ", "CompletionQueue [", "]");
168 if (closed) {
169 sb.add("closed");
170 } else {
171 int tail = (int) INT_HANDLE.getVolatile(ktail, 0);
172 int head = ringHead;
173 while (head != tail) {
174 int cqePosition = cqeIdx(head++, ringMask) * cqeLength;
175 long udata = completionQueueArray.getLong(cqePosition + CQE_USER_DATA_FIELD);
176 int res = completionQueueArray.getInt(cqePosition + CQE_RES_FIELD);
177 int flags = completionQueueArray.getInt(cqePosition + CQE_FLAGS_FIELD);
178
179 sb.add("(res=" + res).add(", flags=" + flags).add(", udata=" + udata).add(")");
180 }
181 }
182 return sb.toString();
183 }
184
185 private static int cqeIdx(int ringHead, int ringMask) {
186 return ringHead & ringMask;
187 }
188 }