1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.uring;
17
18 import io.netty.util.internal.MathUtil;
19
20
21
22
23 final class CompletionBuffer {
24 private final CompletionCallback callback = this::add;
25
26 private final long[] array;
27 private final int capacity;
28 private final int mask;
29 private final long tombstone;
30 private int size;
31 private int head;
32 private int tail = -1;
33
34 CompletionBuffer(int numCompletions, long tombstone) {
35 capacity = MathUtil.findNextPositivePowerOfTwo(numCompletions * 2);
36 array = new long[capacity];
37 mask = capacity - 1;
38 for (int i = 0; i < capacity; i += 2) {
39 array[i] = tombstone;
40 }
41 this.tombstone = tombstone;
42 }
43
44 private boolean add(int res, int flags, long udata) {
45 if (udata == tombstone) {
46 throw new IllegalStateException("udata can't be the same as the tombstone");
47 }
48
49 array[combinedIdx(tail + 1)] = (((long) res) << 32) | (flags & 0xffffffffL);
50 array[udataIdx(tail + 1)] = udata;
51
52 tail += 2;
53 size += 2;
54 return size < capacity;
55 }
56
57
58
59
60
61
62
63 boolean drain(CompletionQueue queue) {
64 if (size == capacity) {
65
66 return false;
67 }
68 queue.process(callback);
69 return !queue.hasCompletions();
70 }
71
72
73
74
75
76
77
78 int processNow(CompletionCallback callback) {
79 int i = 0;
80
81 boolean processing = true;
82 do {
83 if (size == 0) {
84 break;
85 }
86 long combined = array[combinedIdx(head)];
87 long udata = array[udataIdx(head)];
88
89 head += 2;
90 size -= 2;
91
92 if (udata != tombstone) {
93 processing = handle(callback, combined, udata);
94 i++;
95 }
96 } while (processing);
97 return i;
98 }
99
100 private int combinedIdx(int idx) {
101 return idx & mask;
102 }
103
104 private int udataIdx(int idx) {
105 return (idx + 1) & mask;
106 }
107
108 private static boolean handle(CompletionCallback callback, long combined, long udata) {
109 int res = (int) (combined >> 32);
110 int flags = (int) combined;
111 return callback.handle(res, flags, udata);
112 }
113 }