1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.uring;
17
18 import java.lang.invoke.MethodHandles;
19 import java.lang.invoke.VarHandle;
20 import java.nio.ByteBuffer;
21 import java.nio.ByteOrder;
22 import java.util.StringJoiner;
23
24
25
26
27 final class CompletionQueue {
28 private static final VarHandle INT_HANDLE =
29 MethodHandles.byteBufferViewVarHandle(int[].class, ByteOrder.nativeOrder());
30
31
32
33 private static final int CQE_USER_DATA_FIELD = 0;
34 private static final int CQE_RES_FIELD = 8;
35 private static final int CQE_FLAGS_FIELD = 12;
36
37 static final int CQE_SIZE = 16;
38
39
40
41 private final ByteBuffer khead;
42 private final ByteBuffer ktail;
43 private final ByteBuffer completionQueueArray;
44
45 final int ringSize;
46 final long ringAddress;
47 final int ringFd;
48 final int ringEntries;
49 final int ringCapacity;
50
51 private final int ringMask;
52 private int ringHead;
53 private boolean closed;
54
55 CompletionQueue(ByteBuffer kHead, ByteBuffer kTail, int ringMask, int ringEntries,
56 ByteBuffer completionQueueArray, int ringSize, long ringAddress,
57 int ringFd, int ringCapacity) {
58 this.khead = kHead;
59 this.ktail = kTail;
60 this.completionQueueArray = completionQueueArray;
61 this.ringSize = ringSize;
62 this.ringAddress = ringAddress;
63 this.ringFd = ringFd;
64 this.ringCapacity = ringCapacity;
65
66 this.ringEntries = ringEntries;
67 this.ringMask = ringMask;
68 ringHead = (int) INT_HANDLE.getVolatile(kHead, 0);
69 }
70
71 void close() {
72 closed = true;
73 }
74
75
76
77
78
79 boolean hasCompletions() {
80 return !closed && ringHead != (int) INT_HANDLE.getVolatile(ktail, 0);
81 }
82
83 int count() {
84 if (closed) {
85 return 0;
86 }
87 return (int) INT_HANDLE.getVolatile(ktail, 0) - ringHead;
88 }
89
90
91
92
93
94 int process(CompletionCallback callback) {
95 if (closed) {
96 return 0;
97 }
98 int tail = (int) INT_HANDLE.getVolatile(ktail, 0);
99 try {
100 int i = 0;
101 while (ringHead != tail) {
102 int cqePosition = cqeIdx(ringHead, ringMask);
103
104 long udata = completionQueueArray.getLong(cqePosition + CQE_USER_DATA_FIELD);
105 int res = completionQueueArray.getInt(cqePosition + CQE_RES_FIELD);
106 int flags = completionQueueArray.getInt(cqePosition + CQE_FLAGS_FIELD);
107
108 ringHead++;
109
110 i++;
111 if (!callback.handle(res, flags, udata)) {
112
113 break;
114 }
115 if (ringHead == tail) {
116
117
118
119 tail = (int) INT_HANDLE.getVolatile(ktail, 0);
120 }
121 }
122 return i;
123 } finally {
124
125 INT_HANDLE.setRelease(khead, 0, ringHead);
126 }
127 }
128
129 @Override
130 public String toString() {
131 StringJoiner sb = new StringJoiner(", ", "CompletionQueue [", "]");
132 if (closed) {
133 sb.add("closed");
134 } else {
135 int tail = (int) INT_HANDLE.getVolatile(ktail, 0);
136 int head = ringHead;
137 while (head != tail) {
138 int cqePosition = cqeIdx(head++, ringMask);
139 long udata = completionQueueArray.getLong(cqePosition + CQE_USER_DATA_FIELD);
140 int res = completionQueueArray.getInt(cqePosition + CQE_RES_FIELD);
141 int flags = completionQueueArray.getInt(cqePosition + CQE_FLAGS_FIELD);
142
143 sb.add("(res=" + res).add(", flags=" + flags).add(", udata=" + udata).add(")");
144 }
145 }
146 return sb.toString();
147 }
148
149 private static int cqeIdx(int ringHead, int ringMask) {
150 return (ringHead & ringMask) * CQE_SIZE;
151 }
152 }