1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.uring;
17
18 import java.util.concurrent.atomic.AtomicLong;
19
20 final class PendingOpMap {
21 private static final float LOAD_FACTOR = 0.5f;
22
23 private static final long HASH_MULTIPLIER = 0x9E3779B97F4A7C15L;
24 private static final long EMPTY = 0;
25 private static final long TOMBSTONE = 1;
26
27 private long[] tokens;
28 private int[] registrationIds;
29 private byte[] ops;
30 private long[] userDatas;
31 private int mask;
32 private int maxSize;
33 private int size;
34 private int tombstones;
35 private final AtomicLong nextSequence = new AtomicLong(3);
36
37 PendingOpMap(int initialCapacity) {
38 tokens = new long[initialCapacity];
39 registrationIds = new int[initialCapacity];
40 ops = new byte[initialCapacity];
41 userDatas = new long[initialCapacity];
42 mask = initialCapacity - 1;
43 maxSize = calcMaxSize(initialCapacity);
44 }
45
46 long nextToken() {
47 long sequence = nextSequence.getAndIncrement();
48 if (sequence <= 0) {
49
50
51 throw new IllegalStateException("slow path sequence overflow");
52 }
53 return token(sequence);
54 }
55
56 void registerNormal(long token, int registrationId, byte op, long userData) {
57 for (;;) {
58 int startIndex = hashIndex(token, mask);
59 int index = startIndex;
60 int firstTombstone = -1;
61 for (;;) {
62 long existing = tokens[index];
63 if (existing == EMPTY) {
64 insertAt(firstTombstone == -1 ? index : firstTombstone, token, registrationId, op, userData);
65 return;
66 }
67 if (existing == TOMBSTONE && firstTombstone == -1) {
68 firstTombstone = index;
69 }
70 if ((index = probeNext(index)) == startIndex) {
71 rehash(expandCapacity(tokens.length));
72 break;
73 }
74 }
75 }
76 }
77
78 int findSlot(long token) {
79 int startIndex = hashIndex(token, mask);
80 int index = startIndex;
81 for (;;) {
82 long existing = tokens[index];
83 if (existing == EMPTY) {
84 return -1;
85 }
86 if (existing == token) {
87 return index;
88 }
89 if ((index = probeNext(index)) == startIndex) {
90 return -1;
91 }
92 }
93 }
94
95 int registrationId(int slot) {
96 return registrationIds[slot];
97 }
98
99 byte op(int slot) {
100 return ops[slot];
101 }
102
103 long userData(int slot) {
104 return userDatas[slot];
105 }
106
107 void release(int slot) {
108
109
110 tokens[slot] = TOMBSTONE;
111 size--;
112 tombstones++;
113
114 if (size != 0 && tombstones > size) {
115 rehash(tokens.length);
116 }
117 }
118
119 private void insertAt(int index, long token, int registrationId, byte op, long userData) {
120 if (tokens[index] == TOMBSTONE) {
121 tombstones--;
122 }
123 tokens[index] = token;
124 registrationIds[index] = registrationId;
125 ops[index] = op;
126 userDatas[index] = userData;
127 size++;
128 if (size + tombstones > maxSize) {
129 rehash(size > maxSize ? expandCapacity(tokens.length) : tokens.length);
130 }
131 }
132
133 private void rehash(int newCapacity) {
134 long[] oldTokens = tokens;
135 int[] oldRegistrationIds = registrationIds;
136 byte[] oldOps = ops;
137 long[] oldUserDatas = userDatas;
138
139 tokens = new long[newCapacity];
140 registrationIds = new int[newCapacity];
141 ops = new byte[newCapacity];
142 userDatas = new long[newCapacity];
143 mask = newCapacity - 1;
144 maxSize = calcMaxSize(newCapacity);
145 size = 0;
146 tombstones = 0;
147
148 for (int i = 0; i < oldTokens.length; i++) {
149 long token = oldTokens[i];
150
151 if (token < 0) {
152 insertDuringRehash(token, oldRegistrationIds[i], oldOps[i], oldUserDatas[i]);
153 }
154 }
155 }
156
157 private void insertDuringRehash(long token, int registrationId, byte op, long userData) {
158 int index = hashIndex(token, mask);
159 for (;;) {
160 if (tokens[index] == EMPTY) {
161 tokens[index] = token;
162 registrationIds[index] = registrationId;
163 ops[index] = op;
164 userDatas[index] = userData;
165 size++;
166 return;
167 }
168 index = probeNext(index);
169 }
170 }
171
172
173
174
175
176 static long token(long sequence) {
177
178
179
180 return Long.MIN_VALUE | sequence;
181 }
182
183 static long tokenSequence(long token) {
184 return token & Long.MAX_VALUE;
185 }
186
187 private static int hashIndex(long key, int mask) {
188 if (mask == 0) {
189 return 0;
190 }
191 return (int) (key * HASH_MULTIPLIER >>> Long.numberOfLeadingZeros(mask));
192 }
193
194 private int probeNext(int index) {
195 return index + 1 & mask;
196 }
197
198 private static int calcMaxSize(int capacity) {
199 return Math.min(capacity - 1, (int) (capacity * LOAD_FACTOR));
200 }
201
202 private static int expandCapacity(int capacity) {
203 int newCapacity = capacity << 1;
204 if (newCapacity <= 0) {
205 throw new IllegalStateException("slow path table overflow");
206 }
207 return newCapacity;
208 }
209 }