View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel;
17  
18  import io.netty.util.Recycler;
19  import io.netty.util.ReferenceCountUtil;
20  import io.netty.util.concurrent.PromiseCombiner;
21  import io.netty.util.internal.SystemPropertyUtil;
22  import io.netty.util.internal.logging.InternalLogger;
23  import io.netty.util.internal.logging.InternalLoggerFactory;
24  
25  /**
26   * A queue of write operations which are pending for later execution. It also updates the
27   * {@linkplain Channel#isWritable() writability} of the associated {@link Channel}, so that
28   * the pending write operations are also considered to determine the writability.
29   */
30  public final class PendingWriteQueue {
31      private static final InternalLogger logger = InternalLoggerFactory.getInstance(PendingWriteQueue.class);
32      // Assuming a 64-bit JVM:
33      //  - 16 bytes object header
34      //  - 4 reference fields
35      //  - 1 long fields
36      private static final int PENDING_WRITE_OVERHEAD =
37              SystemPropertyUtil.getInt("io.netty.transport.pendingWriteSizeOverhead", 64);
38  
39      private final ChannelHandlerContext ctx;
40      private final ChannelOutboundBuffer buffer;
41      private final MessageSizeEstimator.Handle estimatorHandle;
42  
43      // head and tail pointers for the linked-list structure. If empty head and tail are null.
44      private PendingWrite head;
45      private PendingWrite tail;
46      private int size;
47      private long bytes;
48  
49      public PendingWriteQueue(ChannelHandlerContext ctx) {
50          if (ctx == null) {
51              throw new NullPointerException("ctx");
52          }
53          this.ctx = ctx;
54          buffer = ctx.channel().unsafe().outboundBuffer();
55          estimatorHandle = ctx.channel().config().getMessageSizeEstimator().newHandle();
56      }
57  
58      /**
59       * Returns {@code true} if there are no pending write operations left in this queue.
60       */
61      public boolean isEmpty() {
62          assert ctx.executor().inEventLoop();
63          return head == null;
64      }
65  
66      /**
67       * Returns the number of pending write operations.
68       */
69      public int size() {
70          assert ctx.executor().inEventLoop();
71          return size;
72      }
73  
74      /**
75       * Returns the total number of bytes that are pending because of pending messages. This is only an estimate so
76       * it should only be treated as a hint.
77       */
78      public long bytes() {
79          assert ctx.executor().inEventLoop();
80          return bytes;
81      }
82  
83      private int size(Object msg) {
84          // It is possible for writes to be triggered from removeAndFailAll(). To preserve ordering,
85          // we should add them to the queue and let removeAndFailAll() fail them later.
86          int messageSize = estimatorHandle.size(msg);
87          if (messageSize < 0) {
88              // Size may be unknown so just use 0
89              messageSize = 0;
90          }
91          return messageSize + PENDING_WRITE_OVERHEAD;
92      }
93  
94      /**
95       * Add the given {@code msg} and {@link ChannelPromise}.
96       */
97      public void add(Object msg, ChannelPromise promise) {
98          assert ctx.executor().inEventLoop();
99          if (msg == null) {
100             throw new NullPointerException("msg");
101         }
102         if (promise == null) {
103             throw new NullPointerException("promise");
104         }
105         // It is possible for writes to be triggered from removeAndFailAll(). To preserve ordering,
106         // we should add them to the queue and let removeAndFailAll() fail them later.
107         int messageSize = size(msg);
108 
109         PendingWrite write = PendingWrite.newInstance(msg, messageSize, promise);
110         PendingWrite currentTail = tail;
111         if (currentTail == null) {
112             tail = head = write;
113         } else {
114             currentTail.next = write;
115             tail = write;
116         }
117         size ++;
118         bytes += messageSize;
119         // We need to guard against null as channel.unsafe().outboundBuffer() may returned null
120         // if the channel was already closed when constructing the PendingWriteQueue.
121         // See https://github.com/netty/netty/issues/3967
122         if (buffer != null) {
123             buffer.incrementPendingOutboundBytes(write.size);
124         }
125     }
126 
127     /**
128      * Remove all pending write operation and performs them via
129      * {@link ChannelHandlerContext#write(Object, ChannelPromise)}.
130      *
131      * @return  {@link ChannelFuture} if something was written and {@code null}
132      *          if the {@link PendingWriteQueue} is empty.
133      */
134     public ChannelFuture removeAndWriteAll() {
135         assert ctx.executor().inEventLoop();
136 
137         if (isEmpty()) {
138             return null;
139         }
140 
141         ChannelPromise p = ctx.newPromise();
142         PromiseCombiner combiner = new PromiseCombiner();
143         try {
144             // It is possible for some of the written promises to trigger more writes. The new writes
145             // will "revive" the queue, so we need to write them up until the queue is empty.
146             for (PendingWrite write = head; write != null; write = head) {
147                 head = tail = null;
148                 size = 0;
149                 bytes = 0;
150 
151                 while (write != null) {
152                     PendingWrite next = write.next;
153                     Object msg = write.msg;
154                     ChannelPromise promise = write.promise;
155                     recycle(write, false);
156                     combiner.add(promise);
157                     ctx.write(msg, promise);
158                     write = next;
159                 }
160             }
161             combiner.finish(p);
162         } catch (Throwable cause) {
163             p.setFailure(cause);
164         }
165         assertEmpty();
166         return p;
167     }
168 
169     /**
170      * Remove all pending write operation and fail them with the given {@link Throwable}. The message will be released
171      * via {@link ReferenceCountUtil#safeRelease(Object)}.
172      */
173     public void removeAndFailAll(Throwable cause) {
174         assert ctx.executor().inEventLoop();
175         if (cause == null) {
176             throw new NullPointerException("cause");
177         }
178         // It is possible for some of the failed promises to trigger more writes. The new writes
179         // will "revive" the queue, so we need to clean them up until the queue is empty.
180         for (PendingWrite write = head; write != null; write = head) {
181             head = tail = null;
182             size = 0;
183             bytes = 0;
184             while (write != null) {
185                 PendingWrite next = write.next;
186                 ReferenceCountUtil.safeRelease(write.msg);
187                 ChannelPromise promise = write.promise;
188                 recycle(write, false);
189                 safeFail(promise, cause);
190                 write = next;
191             }
192         }
193         assertEmpty();
194     }
195 
196     /**
197      * Remove a pending write operation and fail it with the given {@link Throwable}. The message will be released via
198      * {@link ReferenceCountUtil#safeRelease(Object)}.
199      */
200     public void removeAndFail(Throwable cause) {
201         assert ctx.executor().inEventLoop();
202         if (cause == null) {
203             throw new NullPointerException("cause");
204         }
205         PendingWrite write = head;
206 
207         if (write == null) {
208             return;
209         }
210         ReferenceCountUtil.safeRelease(write.msg);
211         ChannelPromise promise = write.promise;
212         safeFail(promise, cause);
213         recycle(write, true);
214     }
215 
216     private void assertEmpty() {
217         assert tail == null && head == null && size == 0;
218     }
219 
220     /**
221      * Removes a pending write operation and performs it via
222      * {@link ChannelHandlerContext#write(Object, ChannelPromise)}.
223      *
224      * @return  {@link ChannelFuture} if something was written and {@code null}
225      *          if the {@link PendingWriteQueue} is empty.
226      */
227     public ChannelFuture removeAndWrite() {
228         assert ctx.executor().inEventLoop();
229         PendingWrite write = head;
230         if (write == null) {
231             return null;
232         }
233         Object msg = write.msg;
234         ChannelPromise promise = write.promise;
235         recycle(write, true);
236         return ctx.write(msg, promise);
237     }
238 
239     /**
240      * Removes a pending write operation and release it's message via {@link ReferenceCountUtil#safeRelease(Object)}.
241      *
242      * @return  {@link ChannelPromise} of the pending write or {@code null} if the queue is empty.
243      *
244      */
245     public ChannelPromise remove() {
246         assert ctx.executor().inEventLoop();
247         PendingWrite write = head;
248         if (write == null) {
249             return null;
250         }
251         ChannelPromise promise = write.promise;
252         ReferenceCountUtil.safeRelease(write.msg);
253         recycle(write, true);
254         return promise;
255     }
256 
257     /**
258      * Return the current message or {@code null} if empty.
259      */
260     public Object current() {
261         assert ctx.executor().inEventLoop();
262         PendingWrite write = head;
263         if (write == null) {
264             return null;
265         }
266         return write.msg;
267     }
268 
269     private void recycle(PendingWrite write, boolean update) {
270         final PendingWrite next = write.next;
271         final long writeSize = write.size;
272 
273         if (update) {
274             if (next == null) {
275                 // Handled last PendingWrite so rest head and tail
276                 // Guard against re-entrance by directly reset
277                 head = tail = null;
278                 size = 0;
279                 bytes = 0;
280             } else {
281                 head = next;
282                 size --;
283                 bytes -= writeSize;
284                 assert size > 0 && bytes >= 0;
285             }
286         }
287 
288         write.recycle();
289         // We need to guard against null as channel.unsafe().outboundBuffer() may returned null
290         // if the channel was already closed when constructing the PendingWriteQueue.
291         // See https://github.com/netty/netty/issues/3967
292         if (buffer != null) {
293             buffer.decrementPendingOutboundBytes(writeSize);
294         }
295     }
296 
297     private static void safeFail(ChannelPromise promise, Throwable cause) {
298         if (!(promise instanceof VoidChannelPromise) && !promise.tryFailure(cause)) {
299             logger.warn("Failed to mark a promise as failure because it's done already: {}", promise, cause);
300         }
301     }
302 
303     /**
304      * Holds all meta-data and construct the linked-list structure.
305      */
306     static final class PendingWrite {
307         private static final Recycler<PendingWrite> RECYCLER = new Recycler<PendingWrite>() {
308             @Override
309             protected PendingWrite newObject(Handle handle) {
310                 return new PendingWrite(handle);
311             }
312         };
313 
314         private final Recycler.Handle handle;
315         private PendingWrite next;
316         private long size;
317         private ChannelPromise promise;
318         private Object msg;
319 
320         private PendingWrite(Recycler.Handle handle) {
321             this.handle = handle;
322         }
323 
324         static PendingWrite newInstance(Object msg, int size, ChannelPromise promise) {
325             PendingWrite write = RECYCLER.get();
326             write.size = size;
327             write.msg = msg;
328             write.promise = promise;
329             return write;
330         }
331 
332         private void recycle() {
333             size = 0;
334             next = null;
335             msg = null;
336             promise = null;
337             RECYCLER.recycle(this, handle);
338         }
339     }
340 }