1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.uring;
17
18 import java.lang.invoke.MethodHandles;
19 import java.lang.invoke.VarHandle;
20 import java.nio.ByteBuffer;
21 import java.nio.ByteOrder;
22 import java.util.StringJoiner;
23
24
25
26
27 final class CompletionQueue {
28 private static final VarHandle INT_HANDLE =
29 MethodHandles.byteBufferViewVarHandle(int[].class, ByteOrder.nativeOrder());
30
31
32
33 private static final int CQE_USER_DATA_FIELD = 0;
34 private static final int CQE_RES_FIELD = 8;
35 private static final int CQE_FLAGS_FIELD = 12;
36
37 static final int CQE_SIZE = 16;
38
39
40
41 private final ByteBuffer khead;
42 private final ByteBuffer ktail;
43 private final ByteBuffer completionQueueArray;
44
45 final int ringSize;
46 final long ringAddress;
47 final int ringFd;
48 final int ringEntries;
49 final int ringCapacity;
50
51 private final int ringMask;
52 private int ringHead;
53 private boolean closed;
54
55 CompletionQueue(ByteBuffer kHead, ByteBuffer kTail, int ringMask, int ringEntries,
56 ByteBuffer completionQueueArray, int ringSize, long ringAddress,
57 int ringFd, int ringCapacity) {
58 this.khead = kHead;
59 this.ktail = kTail;
60 this.completionQueueArray = completionQueueArray;
61 this.ringSize = ringSize;
62 this.ringAddress = ringAddress;
63 this.ringFd = ringFd;
64 this.ringCapacity = ringCapacity;
65
66 this.ringEntries = ringEntries;
67 this.ringMask = ringMask;
68 ringHead = (int) INT_HANDLE.getVolatile(kHead, 0);
69 }
70
71 void close() {
72 closed = true;
73 }
74
75
76
77
78
79 boolean hasCompletions() {
80 return !closed && ringHead != (int) INT_HANDLE.getVolatile(ktail, 0);
81 }
82
83 int count() {
84 if (closed) {
85 return 0;
86 }
87 return (int) INT_HANDLE.getVolatile(ktail, 0) - ringHead;
88 }
89
90
91
92
93
94 int process(CompletionCallback callback) {
95 if (closed) {
96 return 0;
97 }
98 int tail = (int) INT_HANDLE.getVolatile(ktail, 0);
99 try {
100 int i = 0;
101 while (ringHead != tail) {
102 int cqePosition = cqeIdx(ringHead, ringMask);
103
104 long udata = completionQueueArray.getLong(cqePosition + CQE_USER_DATA_FIELD);
105 int res = completionQueueArray.getInt(cqePosition + CQE_RES_FIELD);
106 int flags = completionQueueArray.getInt(cqePosition + CQE_FLAGS_FIELD);
107
108 ringHead++;
109
110 i++;
111 if (!callback.handle(res, flags, udata)) {
112
113 break;
114 }
115 }
116 return i;
117 } finally {
118
119 INT_HANDLE.setRelease(khead, 0, ringHead);
120 }
121 }
122
123 @Override
124 public String toString() {
125 StringJoiner sb = new StringJoiner(", ", "CompletionQueue [", "]");
126 if (closed) {
127 sb.add("closed");
128 } else {
129 int tail = (int) INT_HANDLE.getVolatile(ktail, 0);
130 int head = ringHead;
131 while (head != tail) {
132 int cqePosition = cqeIdx(head++, ringMask);
133 long udata = completionQueueArray.getLong(cqePosition + CQE_USER_DATA_FIELD);
134 int res = completionQueueArray.getInt(cqePosition + CQE_RES_FIELD);
135 int flags = completionQueueArray.getInt(cqePosition + CQE_FLAGS_FIELD);
136
137 sb.add("(res=" + res).add(", flags=" + flags).add(", udata=" + udata).add(")");
138 }
139 }
140 return sb.toString();
141 }
142
143 private static int cqeIdx(int ringHead, int ringMask) {
144 return (ringHead & ringMask) * CQE_SIZE;
145 }
146 }