View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty5.channel;
17  
18  import io.netty5.util.Resource;
19  import io.netty5.util.concurrent.EventExecutor;
20  import io.netty5.util.concurrent.Future;
21  import io.netty5.util.concurrent.Promise;
22  import io.netty5.util.concurrent.PromiseCombiner;
23  import io.netty5.util.internal.ObjectPool;
24  import io.netty5.util.internal.SilentDispose;
25  import io.netty5.util.internal.SystemPropertyUtil;
26  import io.netty5.util.internal.logging.InternalLogger;
27  import io.netty5.util.internal.logging.InternalLoggerFactory;
28  
29  import java.util.Objects;
30  import java.util.function.Function;
31  
32  import static java.util.Objects.requireNonNull;
33  
34  /**
35   * A queue of write operations which are pending for later execution.
36   */
37  public final class PendingWriteQueue {
38      private static final InternalLogger logger = InternalLoggerFactory.getInstance(PendingWriteQueue.class);
39      // Assuming a 64-bit JVM:
40      //  - 16 bytes object header
41      //  - 4 reference fields
42      //  - 1 long fields
43      private static final int PENDING_WRITE_OVERHEAD =
44              SystemPropertyUtil.getInt("io.netty5.transport.pendingWriteSizeOverhead", 64);
45  
46      private final EventExecutor executor;
47      private final MessageSizeEstimator.Handle sizeEstimatorHandle;
48  
49      // head and tail pointers for the linked-list structure. If empty head and tail are null.
50      private PendingWrite head;
51      private PendingWrite tail;
52      private int size;
53      private long bytes;
54  
55      public PendingWriteQueue(EventExecutor executor, MessageSizeEstimator.Handle handle) {
56          this.executor = Objects.requireNonNull(executor, "executor");
57          this.sizeEstimatorHandle = Objects.requireNonNull(handle, "handle");
58      }
59  
60      /**
61       * Returns {@code true} if there are no pending write operations left in this queue.
62       */
63      public boolean isEmpty() {
64          assert executor.inEventLoop();
65          return head == null;
66      }
67  
68      /**
69       * Returns the number of pending write operations.
70       */
71      public int size() {
72          assert executor.inEventLoop();
73          return size;
74      }
75  
76      /**
77       * Returns the total number of bytes that are pending because of pending messages. This is only an estimate so
78       * it should only be treated as a hint.
79       */
80      public long bytes() {
81          assert executor.inEventLoop();
82          return bytes;
83      }
84  
85      private int size(Object msg) {
86          // It is possible for writes to be triggered from removeAndFailAll(). To preserve ordering,
87          // we should add them to the queue and let removeAndFailAll() fail them later.
88          int messageSize = sizeEstimatorHandle.size(msg);
89          if (messageSize < 0) {
90              // Size may be unknown so just use 0
91              messageSize = 0;
92          }
93          return messageSize + PENDING_WRITE_OVERHEAD;
94      }
95  
96      /**
97       * Add the given {@code msg} and {@link Promise}.
98       */
99      public void add(Object msg, Promise<Void> promise) {
100         assert executor.inEventLoop();
101         requireNonNull(msg, "msg");
102         requireNonNull(promise, "promise");
103         // It is possible for writes to be triggered from removeAndFailAll(). To preserve ordering,
104         // we should add them to the queue and let removeAndFailAll() fail them later.
105         int messageSize = size(msg);
106 
107         PendingWrite write = PendingWrite.newInstance(msg, messageSize, promise);
108         PendingWrite currentTail = tail;
109         if (currentTail == null) {
110             tail = head = write;
111         } else {
112             currentTail.next = write;
113             tail = write;
114         }
115         size ++;
116         bytes += messageSize;
117     }
118 
119     /**
120      * Remove all pending write operation and performs them via
121      * {@link Function#apply(Object)}.
122      *
123      * @return  {@link Future} if something was transferred and {@code null}
124      *          if the {@link PendingWriteQueue} is empty.
125      */
126     public Future<Void> removeAndTransferAll(Function<Object, Future<Void>> transferFunc) {
127         assert executor.inEventLoop();
128 
129         if (isEmpty()) {
130             return null;
131         }
132 
133         Promise<Void> p = executor.newPromise();
134         PromiseCombiner combiner = new PromiseCombiner(executor);
135         try {
136             // It is possible for some of the written promises to trigger more writes. The new writes
137             // will "revive" the queue, so we need to write them up until the queue is empty.
138             for (PendingWrite write = head; write != null; write = head) {
139                 head = tail = null;
140                 size = 0;
141                 bytes = 0;
142 
143                 while (write != null) {
144                     PendingWrite next = write.next;
145                     Object msg = write.msg;
146                     Promise<Void> promise = write.promise;
147                     recycle(write, false);
148                     transferFunc.apply(msg).cascadeTo(promise);
149                     write = next;
150                 }
151             }
152             combiner.finish(p);
153         } catch (Throwable cause) {
154             p.setFailure(cause);
155         }
156         assertEmpty();
157         return p.asFuture();
158     }
159 
160     /**
161      * Remove all pending write operation and fail them with the given {@link Throwable}. The message will be released
162      * via {@link Resource#dispose(Object)}.
163      */
164     public void removeAndFailAll(Throwable cause) {
165         assert executor.inEventLoop();
166         requireNonNull(cause, "cause");
167         // It is possible for some of the failed promises to trigger more writes. The new writes
168         // will "revive" the queue, so we need to clean them up until the queue is empty.
169         for (PendingWrite write = head; write != null; write = head) {
170             head = tail = null;
171             size = 0;
172             bytes = 0;
173             while (write != null) {
174                 PendingWrite next = write.next;
175                 SilentDispose.dispose(write.msg, logger);
176                 Promise<Void> promise = write.promise;
177                 recycle(write, false);
178                 safeFail(promise, cause);
179                 write = next;
180             }
181         }
182         assertEmpty();
183     }
184 
185     /**
186      * Remove a pending write operation and fail it with the given {@link Throwable}. The message will be released via
187      * {@link Resource#dispose(Object)}.
188      */
189     public void removeAndFail(Throwable cause) {
190         assert executor.inEventLoop();
191         requireNonNull(cause, "cause");
192         PendingWrite write = head;
193 
194         if (write == null) {
195             return;
196         }
197         SilentDispose.dispose(write.msg, logger);
198         Promise<Void> promise = write.promise;
199         safeFail(promise, cause);
200         recycle(write, true);
201     }
202 
203     private void assertEmpty() {
204         assert tail == null && head == null && size == 0;
205     }
206 
207     /**
208      * Removes a pending write operation and performs it via
209      * {@link Function#apply(Object)}.
210      *
211      * @return  {@link Future} if something was transfered and {@code null}
212      *          if the {@link PendingWriteQueue} is empty.
213      */
214     public Future<Void> removeAndTransfer(Function<Object, Future<Void>> transferFunc) {
215         assert executor.inEventLoop();
216         PendingWrite write = head;
217         if (write == null) {
218             return null;
219         }
220         Object msg = write.msg;
221         Promise<Void> promise = write.promise;
222         recycle(write, true);
223 
224         Future<Void> future = transferFunc.apply(msg);
225         future.cascadeTo(promise);
226         return future;
227     }
228 
229     /**
230      * Removes a pending write operation and release its message via {@link Resource#dispose(Object)}.
231      *
232      * @return  {@link Promise} of the pending write or {@code null} if the queue is empty.
233      *
234      */
235     public Promise<Void> remove() {
236         assert executor.inEventLoop();
237         PendingWrite write = head;
238         if (write == null) {
239             return null;
240         }
241         Promise<Void> promise = write.promise;
242         SilentDispose.dispose(write.msg, logger);
243         recycle(write, true);
244         return promise;
245     }
246 
247     /**
248      * Return the current message or {@code null} if empty.
249      */
250     public Object current() {
251         assert executor.inEventLoop();
252         PendingWrite write = head;
253         if (write == null) {
254             return null;
255         }
256         return write.msg;
257     }
258 
259     private void recycle(PendingWrite write, boolean update) {
260         final PendingWrite next = write.next;
261         final long writeSize = write.size;
262 
263         if (update) {
264             if (next == null) {
265                 // Handled last PendingWrite so rest head and tail
266                 // Guard against re-entrance by directly reset
267                 head = tail = null;
268                 size = 0;
269                 bytes = 0;
270             } else {
271                 head = next;
272                 size --;
273                 bytes -= writeSize;
274                 assert size > 0 && bytes >= 0;
275             }
276         }
277 
278         write.recycle();
279     }
280 
281     private static void safeFail(Promise<Void> promise, Throwable cause) {
282         if (!promise.tryFailure(cause)) {
283             logger.warn("Failed to mark a promise as failure because it's done already: {}", promise, cause);
284         }
285     }
286 
287     /**
288      * Holds all meta-data and construct the linked-list structure.
289      */
290     static final class PendingWrite {
291         private static final ObjectPool<PendingWrite> RECYCLER = ObjectPool.newPool(PendingWrite::new);
292 
293         private final ObjectPool.Handle<PendingWrite> handle;
294         private PendingWrite next;
295         private long size;
296         private Promise<Void> promise;
297         private Object msg;
298 
299         private PendingWrite(ObjectPool.Handle<PendingWrite> handle) {
300             this.handle = handle;
301         }
302 
303         static PendingWrite newInstance(Object msg, int size, Promise<Void> promise) {
304             PendingWrite write = RECYCLER.get();
305             write.size = size;
306             write.msg = msg;
307             write.promise = promise;
308             return write;
309         }
310 
311         private void recycle() {
312             size = 0;
313             next = null;
314             msg = null;
315             promise = null;
316             handle.recycle(this);
317         }
318     }
319 }