View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel;
17  
18  import io.netty.buffer.AbstractReferenceCountedByteBuf;
19  import io.netty.util.ReferenceCountUtil;
20  import io.netty.util.concurrent.EventExecutor;
21  import io.netty.util.concurrent.PromiseCombiner;
22  import io.netty.util.internal.ObjectPool;
23  import io.netty.util.internal.ObjectPool.ObjectCreator;
24  import io.netty.util.internal.ObjectUtil;
25  import io.netty.util.internal.SystemPropertyUtil;
26  import io.netty.util.internal.logging.InternalLogger;
27  import io.netty.util.internal.logging.InternalLoggerFactory;
28  
29  /**
30   * A queue of write operations which are pending for later execution. It also updates the
31   * {@linkplain Channel#isWritable() writability} of the associated {@link Channel}, so that
32   * the pending write operations are also considered to determine the writability.
33   */
34  public final class PendingWriteQueue {
35      private static final InternalLogger logger = InternalLoggerFactory.getInstance(PendingWriteQueue.class);
36      // Assuming a 64-bit JVM:
37      //  - 16 bytes object header
38      //  - 4 reference fields
39      //  - 1 long fields
40      private static final int PENDING_WRITE_OVERHEAD =
41              SystemPropertyUtil.getInt("io.netty.transport.pendingWriteSizeOverhead", 64);
42  
43      private final ChannelOutboundInvoker invoker;
44      private final EventExecutor executor;
45      private final PendingBytesTracker tracker;
46  
47      // head and tail pointers for the linked-list structure. If empty head and tail are null.
48      private PendingWrite head;
49      private PendingWrite tail;
50      private int size;
51      private long bytes;
52  
53      public PendingWriteQueue(ChannelHandlerContext ctx) {
54          tracker = PendingBytesTracker.newTracker(ctx.channel());
55          this.invoker = ctx;
56          this.executor = ctx.executor();
57      }
58  
59      public PendingWriteQueue(Channel channel) {
60          tracker = PendingBytesTracker.newTracker(channel);
61          this.invoker = channel;
62          this.executor = channel.eventLoop();
63      }
64  
65      /**
66       * Returns {@code true} if there are no pending write operations left in this queue.
67       */
68      public boolean isEmpty() {
69          assert executor.inEventLoop();
70          return head == null;
71      }
72  
73      /**
74       * Returns the number of pending write operations.
75       */
76      public int size() {
77          assert executor.inEventLoop();
78          return size;
79      }
80  
81      /**
82       * Returns the total number of bytes that are pending because of pending messages. This is only an estimate so
83       * it should only be treated as a hint.
84       */
85      public long bytes() {
86          assert executor.inEventLoop();
87          return bytes;
88      }
89  
90      private int size(Object msg) {
91          // It is possible for writes to be triggered from removeAndFailAll(). To preserve ordering,
92          // we should add them to the queue and let removeAndFailAll() fail them later.
93          int messageSize = tracker.size(msg);
94          if (messageSize < 0) {
95              // Size may be unknown so just use 0
96              messageSize = 0;
97          }
98          return messageSize + PENDING_WRITE_OVERHEAD;
99      }
100 
101     /**
102      * Add the given {@code msg} and {@link ChannelPromise}.
103      */
104     public void add(Object msg, ChannelPromise promise) {
105         assert executor.inEventLoop();
106         ObjectUtil.checkNotNull(msg, "msg");
107         ObjectUtil.checkNotNull(promise, "promise");
108         // It is possible for writes to be triggered from removeAndFailAll(). To preserve ordering,
109         // we should add them to the queue and let removeAndFailAll() fail them later.
110         int messageSize = size(msg);
111 
112         PendingWrite write = PendingWrite.newInstance(msg, messageSize, promise);
113         PendingWrite currentTail = tail;
114         if (currentTail == null) {
115             tail = head = write;
116         } else {
117             currentTail.next = write;
118             tail = write;
119         }
120         size ++;
121         bytes += messageSize;
122         tracker.incrementPendingOutboundBytes(write.size);
123         // Touch the message to make it easier to debug buffer leaks.
124 
125         // this save both checking against the ReferenceCounted interface
126         // and makes better use of virtual calls vs interface ones
127         if (msg instanceof AbstractReferenceCountedByteBuf) {
128             ((AbstractReferenceCountedByteBuf) msg).touch();
129         } else {
130             ReferenceCountUtil.touch(msg);
131         }
132     }
133 
134     /**
135      * Remove all pending write operation and performs them via
136      * {@link ChannelHandlerContext#write(Object, ChannelPromise)}.
137      *
138      * @return  {@link ChannelFuture} if something was written and {@code null}
139      *          if the {@link PendingWriteQueue} is empty.
140      */
141     public ChannelFuture removeAndWriteAll() {
142         assert executor.inEventLoop();
143 
144         if (isEmpty()) {
145             return null;
146         }
147 
148         ChannelPromise p = invoker.newPromise();
149         PromiseCombiner combiner = new PromiseCombiner(executor);
150         try {
151             // It is possible for some of the written promises to trigger more writes. The new writes
152             // will "revive" the queue, so we need to write them up until the queue is empty.
153             for (PendingWrite write = head; write != null; write = head) {
154                 head = tail = null;
155                 size = 0;
156                 bytes = 0;
157 
158                 while (write != null) {
159                     PendingWrite next = write.next;
160                     Object msg = write.msg;
161                     ChannelPromise promise = write.promise;
162                     recycle(write, false);
163                     if (!(promise instanceof VoidChannelPromise)) {
164                         combiner.add(promise);
165                     }
166                     invoker.write(msg, promise);
167                     write = next;
168                 }
169             }
170             combiner.finish(p);
171         } catch (Throwable cause) {
172             p.setFailure(cause);
173         }
174         assertEmpty();
175         return p;
176     }
177 
178     /**
179      * Remove all pending write operation and fail them with the given {@link Throwable}. The message will be released
180      * via {@link ReferenceCountUtil#safeRelease(Object)}.
181      */
182     public void removeAndFailAll(Throwable cause) {
183         assert executor.inEventLoop();
184         ObjectUtil.checkNotNull(cause, "cause");
185         // It is possible for some of the failed promises to trigger more writes. The new writes
186         // will "revive" the queue, so we need to clean them up until the queue is empty.
187         for (PendingWrite write = head; write != null; write = head) {
188             head = tail = null;
189             size = 0;
190             bytes = 0;
191             while (write != null) {
192                 PendingWrite next = write.next;
193                 ReferenceCountUtil.safeRelease(write.msg);
194                 ChannelPromise promise = write.promise;
195                 recycle(write, false);
196                 safeFail(promise, cause);
197                 write = next;
198             }
199         }
200         assertEmpty();
201     }
202 
203     /**
204      * Remove a pending write operation and fail it with the given {@link Throwable}. The message will be released via
205      * {@link ReferenceCountUtil#safeRelease(Object)}.
206      */
207     public void removeAndFail(Throwable cause) {
208         assert executor.inEventLoop();
209         ObjectUtil.checkNotNull(cause, "cause");
210 
211         PendingWrite write = head;
212         if (write == null) {
213             return;
214         }
215         ReferenceCountUtil.safeRelease(write.msg);
216         ChannelPromise promise = write.promise;
217         safeFail(promise, cause);
218         recycle(write, true);
219     }
220 
221     private void assertEmpty() {
222         assert tail == null && head == null && size == 0;
223     }
224 
225     /**
226      * Removes a pending write operation and performs it via
227      * {@link ChannelHandlerContext#write(Object, ChannelPromise)}.
228      *
229      * @return  {@link ChannelFuture} if something was written and {@code null}
230      *          if the {@link PendingWriteQueue} is empty.
231      */
232     public ChannelFuture removeAndWrite() {
233         assert executor.inEventLoop();
234         PendingWrite write = head;
235         if (write == null) {
236             return null;
237         }
238         Object msg = write.msg;
239         ChannelPromise promise = write.promise;
240         recycle(write, true);
241         return invoker.write(msg, promise);
242     }
243 
244     /**
245      * Removes a pending write operation and release it's message via {@link ReferenceCountUtil#safeRelease(Object)}.
246      *
247      * @return  {@link ChannelPromise} of the pending write or {@code null} if the queue is empty.
248      *
249      */
250     public ChannelPromise remove() {
251         assert executor.inEventLoop();
252         PendingWrite write = head;
253         if (write == null) {
254             return null;
255         }
256         ChannelPromise promise = write.promise;
257         ReferenceCountUtil.safeRelease(write.msg);
258         recycle(write, true);
259         return promise;
260     }
261 
262     /**
263      * Return the current message or {@code null} if empty.
264      */
265     public Object current() {
266         assert executor.inEventLoop();
267         PendingWrite write = head;
268         if (write == null) {
269             return null;
270         }
271         return write.msg;
272     }
273 
274     private void recycle(PendingWrite write, boolean update) {
275         final PendingWrite next = write.next;
276         final long writeSize = write.size;
277 
278         if (update) {
279             if (next == null) {
280                 // Handled last PendingWrite so rest head and tail
281                 // Guard against re-entrance by directly reset
282                 head = tail = null;
283                 size = 0;
284                 bytes = 0;
285             } else {
286                 head = next;
287                 size --;
288                 bytes -= writeSize;
289                 assert size > 0 && bytes >= 0;
290             }
291         }
292 
293         write.recycle();
294         tracker.decrementPendingOutboundBytes(writeSize);
295     }
296 
297     private static void safeFail(ChannelPromise promise, Throwable cause) {
298         if (!(promise instanceof VoidChannelPromise) && !promise.tryFailure(cause)) {
299             logger.warn("Failed to mark a promise as failure because it's done already: {}", promise, cause);
300         }
301     }
302 
303     /**
304      * Holds all meta-data and construct the linked-list structure.
305      */
306     static final class PendingWrite {
307         private static final ObjectPool<PendingWrite> RECYCLER = ObjectPool.newPool(new ObjectCreator<PendingWrite>() {
308             @Override
309             public PendingWrite newObject(ObjectPool.Handle<PendingWrite> handle) {
310                 return new PendingWrite(handle);
311             }
312         });
313 
314         private final ObjectPool.Handle<PendingWrite> handle;
315         private PendingWrite next;
316         private long size;
317         private ChannelPromise promise;
318         private Object msg;
319 
320         private PendingWrite(ObjectPool.Handle<PendingWrite> handle) {
321             this.handle = handle;
322         }
323 
324         static PendingWrite newInstance(Object msg, int size, ChannelPromise promise) {
325             PendingWrite write = RECYCLER.get();
326             write.size = size;
327             write.msg = msg;
328             write.promise = promise;
329             return write;
330         }
331 
332         private void recycle() {
333             size = 0;
334             next = null;
335             msg = null;
336             promise = null;
337             handle.recycle(this);
338         }
339     }
340 }