1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty5.channel;
17
18 import io.netty5.util.Resource;
19 import io.netty5.util.concurrent.EventExecutor;
20 import io.netty5.util.concurrent.Future;
21 import io.netty5.util.concurrent.Promise;
22 import io.netty5.util.concurrent.PromiseCombiner;
23 import io.netty5.util.internal.ObjectPool;
24 import io.netty5.util.internal.SilentDispose;
25 import io.netty5.util.internal.SystemPropertyUtil;
26 import io.netty5.util.internal.logging.InternalLogger;
27 import io.netty5.util.internal.logging.InternalLoggerFactory;
28
29 import java.util.Objects;
30 import java.util.function.Function;
31
32 import static java.util.Objects.requireNonNull;
33
34
35
36
37 public final class PendingWriteQueue {
38 private static final InternalLogger logger = InternalLoggerFactory.getInstance(PendingWriteQueue.class);
39
40
41
42
43 private static final int PENDING_WRITE_OVERHEAD =
44 SystemPropertyUtil.getInt("io.netty5.transport.pendingWriteSizeOverhead", 64);
45
46 private final EventExecutor executor;
47 private final MessageSizeEstimator.Handle sizeEstimatorHandle;
48
49
50 private PendingWrite head;
51 private PendingWrite tail;
52 private int size;
53 private long bytes;
54
55 public PendingWriteQueue(EventExecutor executor, MessageSizeEstimator.Handle handle) {
56 this.executor = Objects.requireNonNull(executor, "executor");
57 this.sizeEstimatorHandle = Objects.requireNonNull(handle, "handle");
58 }
59
60
61
62
63 public boolean isEmpty() {
64 assert executor.inEventLoop();
65 return head == null;
66 }
67
68
69
70
71 public int size() {
72 assert executor.inEventLoop();
73 return size;
74 }
75
76
77
78
79
80 public long bytes() {
81 assert executor.inEventLoop();
82 return bytes;
83 }
84
85 private int size(Object msg) {
86
87
88 int messageSize = sizeEstimatorHandle.size(msg);
89 if (messageSize < 0) {
90
91 messageSize = 0;
92 }
93 return messageSize + PENDING_WRITE_OVERHEAD;
94 }
95
96
97
98
99 public void add(Object msg, Promise<Void> promise) {
100 assert executor.inEventLoop();
101 requireNonNull(msg, "msg");
102 requireNonNull(promise, "promise");
103
104
105 int messageSize = size(msg);
106
107 PendingWrite write = PendingWrite.newInstance(msg, messageSize, promise);
108 PendingWrite currentTail = tail;
109 if (currentTail == null) {
110 tail = head = write;
111 } else {
112 currentTail.next = write;
113 tail = write;
114 }
115 size ++;
116 bytes += messageSize;
117 }
118
119
120
121
122
123
124
125
126 public Future<Void> removeAndTransferAll(Function<Object, Future<Void>> transferFunc) {
127 assert executor.inEventLoop();
128
129 if (isEmpty()) {
130 return null;
131 }
132
133 Promise<Void> p = executor.newPromise();
134 PromiseCombiner combiner = new PromiseCombiner(executor);
135 try {
136
137
138 for (PendingWrite write = head; write != null; write = head) {
139 head = tail = null;
140 size = 0;
141 bytes = 0;
142
143 while (write != null) {
144 PendingWrite next = write.next;
145 Object msg = write.msg;
146 Promise<Void> promise = write.promise;
147 recycle(write, false);
148 transferFunc.apply(msg).cascadeTo(promise);
149 write = next;
150 }
151 }
152 combiner.finish(p);
153 } catch (Throwable cause) {
154 p.setFailure(cause);
155 }
156 assertEmpty();
157 return p.asFuture();
158 }
159
160
161
162
163
164 public void removeAndFailAll(Throwable cause) {
165 assert executor.inEventLoop();
166 requireNonNull(cause, "cause");
167
168
169 for (PendingWrite write = head; write != null; write = head) {
170 head = tail = null;
171 size = 0;
172 bytes = 0;
173 while (write != null) {
174 PendingWrite next = write.next;
175 SilentDispose.dispose(write.msg, logger);
176 Promise<Void> promise = write.promise;
177 recycle(write, false);
178 safeFail(promise, cause);
179 write = next;
180 }
181 }
182 assertEmpty();
183 }
184
185
186
187
188
189 public void removeAndFail(Throwable cause) {
190 assert executor.inEventLoop();
191 requireNonNull(cause, "cause");
192 PendingWrite write = head;
193
194 if (write == null) {
195 return;
196 }
197 SilentDispose.dispose(write.msg, logger);
198 Promise<Void> promise = write.promise;
199 safeFail(promise, cause);
200 recycle(write, true);
201 }
202
203 private void assertEmpty() {
204 assert tail == null && head == null && size == 0;
205 }
206
207
208
209
210
211
212
213
214 public Future<Void> removeAndTransfer(Function<Object, Future<Void>> transferFunc) {
215 assert executor.inEventLoop();
216 PendingWrite write = head;
217 if (write == null) {
218 return null;
219 }
220 Object msg = write.msg;
221 Promise<Void> promise = write.promise;
222 recycle(write, true);
223
224 Future<Void> future = transferFunc.apply(msg);
225 future.cascadeTo(promise);
226 return future;
227 }
228
229
230
231
232
233
234
235 public Promise<Void> remove() {
236 assert executor.inEventLoop();
237 PendingWrite write = head;
238 if (write == null) {
239 return null;
240 }
241 Promise<Void> promise = write.promise;
242 SilentDispose.dispose(write.msg, logger);
243 recycle(write, true);
244 return promise;
245 }
246
247
248
249
250 public Object current() {
251 assert executor.inEventLoop();
252 PendingWrite write = head;
253 if (write == null) {
254 return null;
255 }
256 return write.msg;
257 }
258
259 private void recycle(PendingWrite write, boolean update) {
260 final PendingWrite next = write.next;
261 final long writeSize = write.size;
262
263 if (update) {
264 if (next == null) {
265
266
267 head = tail = null;
268 size = 0;
269 bytes = 0;
270 } else {
271 head = next;
272 size --;
273 bytes -= writeSize;
274 assert size > 0 && bytes >= 0;
275 }
276 }
277
278 write.recycle();
279 }
280
281 private static void safeFail(Promise<Void> promise, Throwable cause) {
282 if (!promise.tryFailure(cause)) {
283 logger.warn("Failed to mark a promise as failure because it's done already: {}", promise, cause);
284 }
285 }
286
287
288
289
290 static final class PendingWrite {
291 private static final ObjectPool<PendingWrite> RECYCLER = ObjectPool.newPool(PendingWrite::new);
292
293 private final ObjectPool.Handle<PendingWrite> handle;
294 private PendingWrite next;
295 private long size;
296 private Promise<Void> promise;
297 private Object msg;
298
299 private PendingWrite(ObjectPool.Handle<PendingWrite> handle) {
300 this.handle = handle;
301 }
302
303 static PendingWrite newInstance(Object msg, int size, Promise<Void> promise) {
304 PendingWrite write = RECYCLER.get();
305 write.size = size;
306 write.msg = msg;
307 write.promise = promise;
308 return write;
309 }
310
311 private void recycle() {
312 size = 0;
313 next = null;
314 msg = null;
315 promise = null;
316 handle.recycle(this);
317 }
318 }
319 }