View Javadoc
1   /*
2    * Copyright 2026 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.uring;
17  
18  import java.util.concurrent.atomic.AtomicLong;
19  
20  final class PendingOpMap {
21      private static final float LOAD_FACTOR = 0.5f;
22      // Fibonacci hashing spreads the monotonically increasing token sequence across the table.
23      private static final long HASH_MULTIPLIER = 0x9E3779B97F4A7C15L;
24      private static final long EMPTY = 0;
25      private static final long TOMBSTONE = 1;
26  
27      private long[] tokens;
28      private int[] registrationIds;
29      private byte[] ops;
30      private long[] userDatas;
31      private int mask;
32      private int maxSize;
33      private int size;
34      private int tombstones;
35      private final AtomicLong nextSequence = new AtomicLong(3);
36  
37      PendingOpMap(int initialCapacity) {
38          tokens = new long[initialCapacity];
39          registrationIds = new int[initialCapacity];
40          ops = new byte[initialCapacity];
41          userDatas = new long[initialCapacity];
42          mask = initialCapacity - 1;
43          maxSize = calcMaxSize(initialCapacity);
44      }
45  
46      long nextToken() {
47          long sequence = nextSequence.getAndIncrement();
48          if (sequence <= 0) {
49              // Monotonic sequence starting at 3; ~29k years to exhaust positive long space at 10M/s,
50              // so overflow is purely theoretical.
51              throw new IllegalStateException("slow path sequence overflow");
52          }
53          return token(sequence);
54      }
55  
56      void registerNormal(long token, int registrationId, byte op, long userData) {
57          for (;;) {
58              int startIndex = hashIndex(token, mask);
59              int index = startIndex;
60              int firstTombstone = -1;
61              for (;;) {
62                  long existing = tokens[index];
63                  if (existing == EMPTY) {
64                      insertAt(firstTombstone == -1 ? index : firstTombstone, token, registrationId, op, userData);
65                      return;
66                  }
67                  if (existing == TOMBSTONE && firstTombstone == -1) {
68                      firstTombstone = index;
69                  }
70                  if ((index = probeNext(index)) == startIndex) {
71                      rehash(expandCapacity(tokens.length));
72                      break;
73                  }
74              }
75          }
76      }
77  
78      int findSlot(long token) {
79          int startIndex = hashIndex(token, mask);
80          int index = startIndex;
81          for (;;) {
82              long existing = tokens[index];
83              if (existing == EMPTY) {
84                  return -1;
85              }
86              if (existing == token) {
87                  return index;
88              }
89              if ((index = probeNext(index)) == startIndex) {
90                  return -1;
91              }
92          }
93      }
94  
95      int registrationId(int slot) {
96          return registrationIds[slot];
97      }
98  
99      byte op(int slot) {
100         return ops[slot];
101     }
102 
103     long userData(int slot) {
104         return userDatas[slot];
105     }
106 
107     void release(int slot) {
108         // Only tokens define slot liveness. Payload values are ignored for tombstones and are
109         // either overwritten on slot reuse or discarded when the table is rehashed.
110         tokens[slot] = TOMBSTONE;
111         size--;
112         tombstones++;
113 
114         if (size != 0 && tombstones > size) {
115             rehash(tokens.length);
116         }
117     }
118 
119     private void insertAt(int index, long token, int registrationId, byte op, long userData) {
120         if (tokens[index] == TOMBSTONE) {
121             tombstones--;
122         }
123         tokens[index] = token;
124         registrationIds[index] = registrationId;
125         ops[index] = op;
126         userDatas[index] = userData;
127         size++;
128         if (size + tombstones > maxSize) {
129             rehash(size > maxSize ? expandCapacity(tokens.length) : tokens.length);
130         }
131     }
132 
133     private void rehash(int newCapacity) {
134         long[] oldTokens = tokens;
135         int[] oldRegistrationIds = registrationIds;
136         byte[] oldOps = ops;
137         long[] oldUserDatas = userDatas;
138 
139         tokens = new long[newCapacity];
140         registrationIds = new int[newCapacity];
141         ops = new byte[newCapacity];
142         userDatas = new long[newCapacity];
143         mask = newCapacity - 1;
144         maxSize = calcMaxSize(newCapacity);
145         size = 0;
146         tombstones = 0;
147 
148         for (int i = 0; i < oldTokens.length; i++) {
149             long token = oldTokens[i];
150             // we only move live token to new array
151             if (token < 0) {
152                 insertDuringRehash(token, oldRegistrationIds[i], oldOps[i], oldUserDatas[i]);
153             }
154         }
155     }
156 
157     private void insertDuringRehash(long token, int registrationId, byte op, long userData) {
158         int index = hashIndex(token, mask);
159         for (;;) {
160             if (tokens[index] == EMPTY) {
161                 tokens[index] = token;
162                 registrationIds[index] = registrationId;
163                 ops[index] = op;
164                 userDatas[index] = userData;
165                 size++;
166                 return;
167             }
168             index = probeNext(index);
169         }
170     }
171 
172     /**
173      * Slow-path and internal tokens are negative so they can be distinguished from packed fast-path userdata
174      * by checking bit 63.
175      */
176     static long token(long sequence) {
177         // We intentionally do not handle sequence wrap-around here.
178         // `nextSequence` would need to reach Long.MAX_VALUE and overflow,
179         // which is considered practically impossible in this context.
180         return Long.MIN_VALUE | sequence;
181     }
182 
183     static long tokenSequence(long token) {
184         return token & Long.MAX_VALUE;
185     }
186 
187     private static int hashIndex(long key, int mask) {
188         if (mask == 0) {
189             return 0;
190         }
191         return (int) (key * HASH_MULTIPLIER >>> Long.numberOfLeadingZeros(mask));
192     }
193 
194     private int probeNext(int index) {
195         return index + 1 & mask;
196     }
197 
198     private static int calcMaxSize(int capacity) {
199         return Math.min(capacity - 1, (int) (capacity * LOAD_FACTOR));
200     }
201 
202     private static int expandCapacity(int capacity) {
203         int newCapacity = capacity << 1;
204         if (newCapacity <= 0) {
205             throw new IllegalStateException("slow path table overflow");
206         }
207         return newCapacity;
208     }
209 }