View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel;
17  
18  import io.netty.util.ReferenceCountUtil;
19  import io.netty.util.concurrent.EventExecutor;
20  import io.netty.util.concurrent.PromiseCombiner;
21  import io.netty.util.internal.ObjectPool;
22  import io.netty.util.internal.ObjectPool.ObjectCreator;
23  import io.netty.util.internal.ObjectUtil;
24  import io.netty.util.internal.SystemPropertyUtil;
25  import io.netty.util.internal.logging.InternalLogger;
26  import io.netty.util.internal.logging.InternalLoggerFactory;
27  
28  /**
29   * A queue of write operations which are pending for later execution. It also updates the
30   * {@linkplain Channel#isWritable() writability} of the associated {@link Channel}, so that
31   * the pending write operations are also considered to determine the writability.
32   */
33  public final class PendingWriteQueue {
34      private static final InternalLogger logger = InternalLoggerFactory.getInstance(PendingWriteQueue.class);
35      // Assuming a 64-bit JVM:
36      //  - 16 bytes object header
37      //  - 4 reference fields
38      //  - 1 long fields
39      private static final int PENDING_WRITE_OVERHEAD =
40              SystemPropertyUtil.getInt("io.netty.transport.pendingWriteSizeOverhead", 64);
41  
42      private final ChannelOutboundInvoker invoker;
43      private final EventExecutor executor;
44      private final PendingBytesTracker tracker;
45  
46      // head and tail pointers for the linked-list structure. If empty head and tail are null.
47      private PendingWrite head;
48      private PendingWrite tail;
49      private int size;
50      private long bytes;
51  
52      public PendingWriteQueue(ChannelHandlerContext ctx) {
53          tracker = PendingBytesTracker.newTracker(ctx.channel());
54          this.invoker = ctx;
55          this.executor = ctx.executor();
56      }
57  
58      public PendingWriteQueue(Channel channel) {
59          tracker = PendingBytesTracker.newTracker(channel);
60          this.invoker = channel;
61          this.executor = channel.eventLoop();
62      }
63  
64      /**
65       * Returns {@code true} if there are no pending write operations left in this queue.
66       */
67      public boolean isEmpty() {
68          assert executor.inEventLoop();
69          return head == null;
70      }
71  
72      /**
73       * Returns the number of pending write operations.
74       */
75      public int size() {
76          assert executor.inEventLoop();
77          return size;
78      }
79  
80      /**
81       * Returns the total number of bytes that are pending because of pending messages. This is only an estimate so
82       * it should only be treated as a hint.
83       */
84      public long bytes() {
85          assert executor.inEventLoop();
86          return bytes;
87      }
88  
89      private int size(Object msg) {
90          // It is possible for writes to be triggered from removeAndFailAll(). To preserve ordering,
91          // we should add them to the queue and let removeAndFailAll() fail them later.
92          int messageSize = tracker.size(msg);
93          if (messageSize < 0) {
94              // Size may be unknown so just use 0
95              messageSize = 0;
96          }
97          return messageSize + PENDING_WRITE_OVERHEAD;
98      }
99  
100     /**
101      * Add the given {@code msg} and {@link ChannelPromise}.
102      */
103     public void add(Object msg, ChannelPromise promise) {
104         assert executor.inEventLoop();
105         ObjectUtil.checkNotNull(msg, "msg");
106         ObjectUtil.checkNotNull(promise, "promise");
107         // It is possible for writes to be triggered from removeAndFailAll(). To preserve ordering,
108         // we should add them to the queue and let removeAndFailAll() fail them later.
109         int messageSize = size(msg);
110 
111         PendingWrite write = PendingWrite.newInstance(msg, messageSize, promise);
112         PendingWrite currentTail = tail;
113         if (currentTail == null) {
114             tail = head = write;
115         } else {
116             currentTail.next = write;
117             tail = write;
118         }
119         size ++;
120         bytes += messageSize;
121         tracker.incrementPendingOutboundBytes(write.size);
122     }
123 
124     /**
125      * Remove all pending write operation and performs them via
126      * {@link ChannelHandlerContext#write(Object, ChannelPromise)}.
127      *
128      * @return  {@link ChannelFuture} if something was written and {@code null}
129      *          if the {@link PendingWriteQueue} is empty.
130      */
131     public ChannelFuture removeAndWriteAll() {
132         assert executor.inEventLoop();
133 
134         if (isEmpty()) {
135             return null;
136         }
137 
138         ChannelPromise p = invoker.newPromise();
139         PromiseCombiner combiner = new PromiseCombiner(executor);
140         try {
141             // It is possible for some of the written promises to trigger more writes. The new writes
142             // will "revive" the queue, so we need to write them up until the queue is empty.
143             for (PendingWrite write = head; write != null; write = head) {
144                 head = tail = null;
145                 size = 0;
146                 bytes = 0;
147 
148                 while (write != null) {
149                     PendingWrite next = write.next;
150                     Object msg = write.msg;
151                     ChannelPromise promise = write.promise;
152                     recycle(write, false);
153                     if (!(promise instanceof VoidChannelPromise)) {
154                         combiner.add(promise);
155                     }
156                     invoker.write(msg, promise);
157                     write = next;
158                 }
159             }
160             combiner.finish(p);
161         } catch (Throwable cause) {
162             p.setFailure(cause);
163         }
164         assertEmpty();
165         return p;
166     }
167 
168     /**
169      * Remove all pending write operation and fail them with the given {@link Throwable}. The message will be released
170      * via {@link ReferenceCountUtil#safeRelease(Object)}.
171      */
172     public void removeAndFailAll(Throwable cause) {
173         assert executor.inEventLoop();
174         ObjectUtil.checkNotNull(cause, "cause");
175         // It is possible for some of the failed promises to trigger more writes. The new writes
176         // will "revive" the queue, so we need to clean them up until the queue is empty.
177         for (PendingWrite write = head; write != null; write = head) {
178             head = tail = null;
179             size = 0;
180             bytes = 0;
181             while (write != null) {
182                 PendingWrite next = write.next;
183                 ReferenceCountUtil.safeRelease(write.msg);
184                 ChannelPromise promise = write.promise;
185                 recycle(write, false);
186                 safeFail(promise, cause);
187                 write = next;
188             }
189         }
190         assertEmpty();
191     }
192 
193     /**
194      * Remove a pending write operation and fail it with the given {@link Throwable}. The message will be released via
195      * {@link ReferenceCountUtil#safeRelease(Object)}.
196      */
197     public void removeAndFail(Throwable cause) {
198         assert executor.inEventLoop();
199         ObjectUtil.checkNotNull(cause, "cause");
200 
201         PendingWrite write = head;
202         if (write == null) {
203             return;
204         }
205         ReferenceCountUtil.safeRelease(write.msg);
206         ChannelPromise promise = write.promise;
207         safeFail(promise, cause);
208         recycle(write, true);
209     }
210 
211     private void assertEmpty() {
212         assert tail == null && head == null && size == 0;
213     }
214 
215     /**
216      * Removes a pending write operation and performs it via
217      * {@link ChannelHandlerContext#write(Object, ChannelPromise)}.
218      *
219      * @return  {@link ChannelFuture} if something was written and {@code null}
220      *          if the {@link PendingWriteQueue} is empty.
221      */
222     public ChannelFuture removeAndWrite() {
223         assert executor.inEventLoop();
224         PendingWrite write = head;
225         if (write == null) {
226             return null;
227         }
228         Object msg = write.msg;
229         ChannelPromise promise = write.promise;
230         recycle(write, true);
231         return invoker.write(msg, promise);
232     }
233 
234     /**
235      * Removes a pending write operation and release it's message via {@link ReferenceCountUtil#safeRelease(Object)}.
236      *
237      * @return  {@link ChannelPromise} of the pending write or {@code null} if the queue is empty.
238      *
239      */
240     public ChannelPromise remove() {
241         assert executor.inEventLoop();
242         PendingWrite write = head;
243         if (write == null) {
244             return null;
245         }
246         ChannelPromise promise = write.promise;
247         ReferenceCountUtil.safeRelease(write.msg);
248         recycle(write, true);
249         return promise;
250     }
251 
252     /**
253      * Return the current message or {@code null} if empty.
254      */
255     public Object current() {
256         assert executor.inEventLoop();
257         PendingWrite write = head;
258         if (write == null) {
259             return null;
260         }
261         return write.msg;
262     }
263 
264     private void recycle(PendingWrite write, boolean update) {
265         final PendingWrite next = write.next;
266         final long writeSize = write.size;
267 
268         if (update) {
269             if (next == null) {
270                 // Handled last PendingWrite so rest head and tail
271                 // Guard against re-entrance by directly reset
272                 head = tail = null;
273                 size = 0;
274                 bytes = 0;
275             } else {
276                 head = next;
277                 size --;
278                 bytes -= writeSize;
279                 assert size > 0 && bytes >= 0;
280             }
281         }
282 
283         write.recycle();
284         tracker.decrementPendingOutboundBytes(writeSize);
285     }
286 
287     private static void safeFail(ChannelPromise promise, Throwable cause) {
288         if (!(promise instanceof VoidChannelPromise) && !promise.tryFailure(cause)) {
289             logger.warn("Failed to mark a promise as failure because it's done already: {}", promise, cause);
290         }
291     }
292 
293     /**
294      * Holds all meta-data and construct the linked-list structure.
295      */
296     static final class PendingWrite {
297         private static final ObjectPool<PendingWrite> RECYCLER = ObjectPool.newPool(new ObjectCreator<PendingWrite>() {
298             @Override
299             public PendingWrite newObject(ObjectPool.Handle<PendingWrite> handle) {
300                 return new PendingWrite(handle);
301             }
302         });
303 
304         private final ObjectPool.Handle<PendingWrite> handle;
305         private PendingWrite next;
306         private long size;
307         private ChannelPromise promise;
308         private Object msg;
309 
310         private PendingWrite(ObjectPool.Handle<PendingWrite> handle) {
311             this.handle = handle;
312         }
313 
314         static PendingWrite newInstance(Object msg, int size, ChannelPromise promise) {
315             PendingWrite write = RECYCLER.get();
316             write.size = size;
317             write.msg = msg;
318             write.promise = promise;
319             return write;
320         }
321 
322         private void recycle() {
323             size = 0;
324             next = null;
325             msg = null;
326             promise = null;
327             handle.recycle(this);
328         }
329     }
330 }