1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  package io.netty5.channel;
16  
17  import io.netty5.buffer.api.Buffer;
18  import io.netty5.buffer.api.BufferAllocator;
19  import io.netty5.buffer.api.CompositeBuffer;
20  import io.netty5.util.concurrent.Future;
21  import io.netty5.util.concurrent.FutureListener;
22  import io.netty5.util.concurrent.Promise;
23  import io.netty5.util.internal.SilentDispose;
24  import io.netty5.util.internal.UnstableApi;
25  import io.netty5.util.internal.logging.InternalLogger;
26  import io.netty5.util.internal.logging.InternalLoggerFactory;
27  
28  import java.util.ArrayDeque;
29  import java.util.List;
30  
31  import static io.netty5.util.internal.ObjectUtil.checkPositiveOrZero;
32  import static java.util.Objects.requireNonNull;
33  
34  @SuppressWarnings("unchecked")
35  @UnstableApi
36  public abstract class AbstractCoalescingBufferQueue {
37      private static final InternalLogger logger = InternalLoggerFactory.getInstance(
38              AbstractCoalescingBufferQueue.class);
39      private final ArrayDeque<Object> bufAndListenerPairs;
40      private int readableBytes;
41  
42      
43  
44  
45  
46  
47      protected AbstractCoalescingBufferQueue(int initSize) {
48          bufAndListenerPairs = new ArrayDeque<>(initSize);
49      }
50  
51      
52  
53  
54  
55  
56  
57      public final void addFirst(Buffer buf, Promise<Void> promise) {
58          addFirst(buf, f -> f.cascadeTo(promise));
59      }
60  
61      private void addFirst(Buffer buf, FutureListener<Void> listener) {
62          if (listener != null) {
63              bufAndListenerPairs.addFirst(listener);
64          }
65          bufAndListenerPairs.addFirst(buf);
66          incrementReadableBytes(buf.readableBytes());
67      }
68  
69      
70  
71  
72      public final void add(Buffer buf) {
73          add(buf, (FutureListener<Void>) null);
74      }
75  
76      
77  
78  
79  
80  
81  
82      public final void add(Buffer buf, Promise<Void> promise) {
83          
84          
85          add(buf, f -> f.cascadeTo(promise));
86      }
87  
88      
89  
90  
91  
92  
93  
94      public final void add(Buffer buf, FutureListener<Void> listener) {
95          
96          
97          bufAndListenerPairs.add(buf);
98          if (listener != null) {
99              bufAndListenerPairs.add(listener);
100         }
101         incrementReadableBytes(buf.readableBytes());
102     }
103 
104     
105 
106 
107 
108 
109     public final Buffer removeFirst(Promise<Void> aggregatePromise) {
110         Object entry = bufAndListenerPairs.poll();
111         if (entry == null) {
112             return null;
113         }
114         assert entry instanceof Buffer;
115         Buffer result = (Buffer) entry;
116 
117         decrementReadableBytes(result.readableBytes());
118 
119         entry = bufAndListenerPairs.peek();
120         if (entry instanceof FutureListener) {
121             aggregatePromise.asFuture().addListener((FutureListener<Void>) entry);
122             bufAndListenerPairs.poll();
123         }
124         return result;
125     }
126 
127     
128 
129 
130 
131 
132 
133 
134 
135 
136 
137 
138     public final Buffer remove(BufferAllocator alloc, int bytes, Promise<Void> aggregatePromise) {
139         checkPositiveOrZero(bytes, "bytes");
140         requireNonNull(aggregatePromise, "aggregatePromise");
141 
142         
143         if (bufAndListenerPairs.isEmpty()) {
144             assert readableBytes == 0;
145             return removeEmptyValue();
146         }
147         bytes = Math.min(bytes, readableBytes);
148 
149         Buffer toReturn = null;
150         Buffer entryBuffer = null;
151         int originalBytes = bytes;
152         try {
153             for (;;) {
154                 Object entry = bufAndListenerPairs.poll();
155                 if (entry == null) {
156                     break;
157                 }
158                 if (entry instanceof FutureListener) {
159                     aggregatePromise.asFuture().addListener((FutureListener<Void>) entry);
160                     continue;
161                 }
162                 entryBuffer = (Buffer) entry;
163                 if (entryBuffer.readableBytes() > bytes) {
164                     
165                     bufAndListenerPairs.addFirst(entryBuffer);
166                     if (bytes > 0) {
167                         
168                         entryBuffer = entryBuffer.readSplit(bytes);
169                         toReturn = toReturn == null ? composeFirst(alloc, entryBuffer)
170                                                     : compose(alloc, toReturn, entryBuffer);
171                         bytes = 0;
172                     }
173                     break;
174                 }
175                 bytes -= entryBuffer.readableBytes();
176                 toReturn = toReturn == null ? composeFirst(alloc, entryBuffer)
177                                             : compose(alloc, toReturn, entryBuffer);
178                 entryBuffer = null;
179             }
180         } catch (Throwable cause) {
181             SilentDispose.dispose(entryBuffer, logger);
182             SilentDispose.dispose(toReturn, logger);
183             aggregatePromise.setFailure(cause);
184             throw cause;
185         }
186         decrementReadableBytes(originalBytes - bytes);
187         return toReturn;
188     }
189 
190     
191 
192 
193     public final int readableBytes() {
194         return readableBytes;
195     }
196 
197     
198 
199 
200     public final boolean isEmpty() {
201         return bufAndListenerPairs.isEmpty();
202     }
203 
204     
205 
206 
207     public final void releaseAndFailAll(ChannelOutboundInvoker invoker, Throwable cause) {
208         releaseAndCompleteAll(invoker.newFailedFuture(cause));
209     }
210 
211     
212 
213 
214 
215     public final void copyTo(AbstractCoalescingBufferQueue dest) {
216         dest.bufAndListenerPairs.addAll(bufAndListenerPairs);
217         dest.incrementReadableBytes(readableBytes);
218     }
219 
220     
221 
222 
223 
224     public final void writeAndRemoveAll(ChannelHandlerContext ctx) {
225         Throwable pending = null;
226         Buffer previousBuf = null;
227         for (;;) {
228             Object entry = bufAndListenerPairs.poll();
229             try {
230                 if (entry == null) {
231                     if (previousBuf != null) {
232                         decrementReadableBytes(previousBuf.readableBytes());
233                         
234                         
235                         ctx.write(previousBuf)
236                            .addListener(ctx.channel(), ChannelFutureListeners.FIRE_EXCEPTION_ON_FAILURE);
237                     }
238                     break;
239                 }
240 
241                 if (entry instanceof Buffer) {
242                     if (previousBuf != null) {
243                         decrementReadableBytes(previousBuf.readableBytes());
244                         
245                         
246                         ctx.write(previousBuf)
247                            .addListener(ctx.channel(), ChannelFutureListeners.FIRE_EXCEPTION_ON_FAILURE);
248                     }
249                     previousBuf = (Buffer) entry;
250                 } else if (entry instanceof Promise) {
251                     decrementReadableBytes(previousBuf.readableBytes());
252                     ctx.write(previousBuf).cascadeTo((Promise<? super Void>) entry);
253                     previousBuf = null;
254                 } else {
255                     decrementReadableBytes(previousBuf.readableBytes());
256                     ctx.write(previousBuf).addListener((FutureListener<Void>) entry);
257                     previousBuf = null;
258                 }
259             } catch (Throwable t) {
260                 if (pending == null) {
261                     pending = t;
262                 } else {
263                     logger.info("Throwable being suppressed because Throwable {} is already pending", pending, t);
264                 }
265             }
266         }
267         if (pending != null) {
268             throw new IllegalStateException(pending);
269         }
270     }
271 
272     @Override
273     public String toString() {
274         return "bytes: " + readableBytes + " buffers: " + (size() >> 1);
275     }
276 
277     
278 
279 
280     protected abstract Buffer compose(BufferAllocator alloc, Buffer cumulation, Buffer next);
281 
282     
283 
284 
285     protected final Buffer composeIntoComposite(BufferAllocator alloc, Buffer cumulation, Buffer next) {
286         
287         
288         return alloc.compose(List.of(cumulation.send(), next.send()));
289     }
290 
291     
292 
293 
294 
295 
296 
297 
298 
299     protected final Buffer copyAndCompose(BufferAllocator alloc, Buffer cumulation, Buffer next, int minIncrement) {
300         try (cumulation; next) {
301             int sum = cumulation.readableBytes() + Math.max(minIncrement, next.readableBytes());
302             return alloc.allocate(sum).writeBytes(cumulation).writeBytes(next);
303         }
304     }
305 
306     
307 
308 
309 
310     protected Buffer composeFirst(BufferAllocator allocator, Buffer first) {
311         return first;
312     }
313 
314     
315 
316 
317 
318     protected abstract Buffer removeEmptyValue();
319 
320     
321 
322 
323 
324     protected final int size() {
325         return bufAndListenerPairs.size();
326     }
327 
328     private void releaseAndCompleteAll(Future<Void> future) {
329         Throwable pending = null;
330         for (;;) {
331             Object entry = bufAndListenerPairs.poll();
332             if (entry == null) {
333                 break;
334             }
335             try {
336                 if (entry instanceof Buffer) {
337                     Buffer buffer = (Buffer) entry;
338                     decrementReadableBytes(buffer.readableBytes());
339                     SilentDispose.dispose(buffer, logger);
340                 } else {
341                     ((FutureListener<Void>) entry).operationComplete(future);
342                 }
343             } catch (Throwable t) {
344                 if (pending == null) {
345                     pending = t;
346                 } else {
347                     logger.info("Throwable being suppressed because Throwable {} is already pending", pending, t);
348                 }
349             }
350         }
351         if (pending != null) {
352             throw new IllegalStateException(pending);
353         }
354     }
355 
356     private void incrementReadableBytes(int increment) {
357         int nextReadableBytes = readableBytes + increment;
358         if (nextReadableBytes < readableBytes) {
359             throw new IllegalStateException("buffer queue length overflow: " + readableBytes + " + " + increment);
360         }
361         readableBytes = nextReadableBytes;
362     }
363 
364     private void decrementReadableBytes(int decrement) {
365         readableBytes -= decrement;
366         assert readableBytes >= 0;
367     }
368 }