View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.handler.queue;
17  
18  import org.jboss.netty.buffer.ChannelBuffer;
19  import org.jboss.netty.buffer.ChannelBuffers;
20  import org.jboss.netty.channel.Channel;
21  import org.jboss.netty.channel.ChannelConfig;
22  import org.jboss.netty.channel.ChannelFuture;
23  import org.jboss.netty.channel.ChannelFutureListener;
24  import org.jboss.netty.channel.ChannelHandlerContext;
25  import org.jboss.netty.channel.ChannelStateEvent;
26  import org.jboss.netty.channel.Channels;
27  import org.jboss.netty.channel.LifeCycleAwareChannelHandler;
28  import org.jboss.netty.channel.MessageEvent;
29  import org.jboss.netty.channel.SimpleChannelHandler;
30  import org.jboss.netty.channel.socket.nio.NioSocketChannelConfig;
31  import org.jboss.netty.util.HashedWheelTimer;
32  
33  import java.io.IOException;
34  import java.nio.channels.ClosedChannelException;
35  import java.util.ArrayList;
36  import java.util.List;
37  import java.util.Queue;
38  import java.util.concurrent.BlockingQueue;
39  import java.util.concurrent.ConcurrentLinkedQueue;
40  import java.util.concurrent.atomic.AtomicBoolean;
41  
42  /**
43   * Emulates buffered write operation.  This handler stores all write requests
44   * into an unbounded {@link Queue} and flushes them to the downstream when
45   * {@link #flush()} method is called.
46   * <p>
47   * Here is an example that demonstrates the usage:
48   * <pre>
49   * BufferedWriteHandler bufferedWriter = new BufferedWriteHandler();
50   * ChannelPipeline p = ...;
51   * p.addFirst("buffer", bufferedWriter);
52   *
53   * ...
54   *
55   * Channel ch = ...;
56   *
57   * // msg1, 2, and 3 are stored in the queue of bufferedWriter.
58   * ch.write(msg1);
59   * ch.write(msg2);
60   * ch.write(msg3);
61   *
62   * // and will be flushed on request.
63   * bufferedWriter.flush();
64   * </pre>
65   *
66   * <h3>Auto-flush</h3>
67   * The write request queue is automatically flushed when the associated
68   * {@link Channel} is disconnected or closed.  However, it does not flush the
69   * queue otherwise.  It means you have to call {@link #flush()} before the size
70   * of the queue increases too much.  You can implement your own auto-flush
71   * strategy by extending this handler:
72   * <pre>
73   * public class AutoFlusher extends {@link BufferedWriteHandler} {
74   *
75   *     private final AtomicLong bufferSize = new AtomicLong();
76   *
77   *     {@literal @Override}
78   *     public void writeRequested({@link ChannelHandlerContext} ctx, {@link MessageEvent} e) {
79   *         super.writeRequested(ctx, e);
80   *
81   *         {@link ChannelBuffer} data = ({@link ChannelBuffer}) e.getMessage();
82   *         int newBufferSize = bufferSize.addAndGet(data.readableBytes());
83   *
84   *         // Flush the queue if it gets larger than 8KiB.
85   *         if (newBufferSize > 8192) {
86   *             flush();
87   *             bufferSize.set(0);
88   *         }
89   *     }
90   * }
91   * </pre>
92   *
93   * <h3>Consolidate on flush</h3>
94   *
95   * If there are two or more write requests in the queue and all their message
96   * type is {@link ChannelBuffer}, they can be merged into a single write request
97   * to save the number of system calls.
98   * <pre>
99   * BEFORE consolidation:            AFTER consolidation:
100  * +-------+-------+-------+        +-------------+
101  * | Req C | Req B | Req A |------\\| Request ABC |
102  * | "789" | "456" | "123" |------//| "123456789" |
103  * +-------+-------+-------+        +-------------+
104  * </pre>
105  * This feature is disabled by default.  You can override the default when you
106  * create this handler or call {@link #flush(boolean)}.  If you specified
107  * {@code true} when you call the constructor, calling {@link #flush()} will
108  * always consolidate the queue.  Otherwise, you have to call
109  * {@link #flush(boolean)} with {@code true} to enable this feature for each
110  * flush.
111  * <p>
112  * The disadvantage of consolidation is that the {@link ChannelFuture} and its
113  * {@link ChannelFutureListener}s associated with the original write requests
114  * might be notified later than when they are actually written out.  They will
115  * always be notified when the consolidated write request is fully written.
116  * <p>
117  * The following example implements the consolidation strategy that reduces
118  * the number of write requests based on the writability of a channel:
119  * <pre>
120  * public class ConsolidatingAutoFlusher extends {@link BufferedWriteHandler} {
121  *
122  *     public ConsolidatingAutoFlusher() {
123  *         // Enable consolidation by default.
124  *         super(true);
125  *     }
126  *
127  *     {@literal @Override}
128  *     public void channelOpen({@link ChannelHandlerContext} ctx, {@link ChannelStateEvent} e) throws Exception {
129  *         {@link ChannelConfig} cfg = e.getChannel().getConfig();
130  *         if (cfg instanceof {@link NioSocketChannelConfig}) {
131  *             // Lower the watermark to increase the chance of consolidation.
132  *             cfg.setWriteBufferLowWaterMark(0);
133  *         }
134  *         super.channelOpen(e);
135  *     }
136  *
137  *     {@literal @Override}
138  *     public void writeRequested({@link ChannelHandlerContext} ctx, {@link MessageEvent} e) throws Exception {
139  *         super.writeRequested(ctx, et);
140  *         if (e.getChannel().isWritable()) {
141  *             flush();
142  *         }
143  *     }
144  *
145  *     {@literal @Override}
146  *     public void channelInterestChanged(
147  *             {@link ChannelHandlerContext} ctx, {@link ChannelStateEvent} e) throws Exception {
148  *         if (e.getChannel().isWritable()) {
149  *             flush();
150  *         }
151  *     }
152  * }
153  * </pre>
154  *
155  * <h3>Prioritized Writes</h3>
156  *
157  * You can implement prioritized writes by specifying an unbounded priority
158  * queue in the constructor of this handler.  It will be required to design
159  * the proper strategy to determine how often {@link #flush()} should be called.
160  * For example, you could call {@link #flush()} periodically, using
161  * {@link HashedWheelTimer} every second.
162  * @apiviz.landmark
163  */
164 public class BufferedWriteHandler extends SimpleChannelHandler implements LifeCycleAwareChannelHandler {
165 
166     private final Queue<MessageEvent> queue;
167     private final boolean consolidateOnFlush;
168     private volatile ChannelHandlerContext ctx;
169     private final AtomicBoolean flush = new AtomicBoolean(false);
170 
171     /**
172      * Creates a new instance with the default unbounded {@link BlockingQueue}
173      * implementation and without buffer consolidation.
174      */
175     public BufferedWriteHandler() {
176         this(false);
177     }
178 
179     /**
180      * Creates a new instance with the specified thread-safe unbounded
181      * {@link Queue} and without buffer consolidation.  Please note that
182      * specifying a bounded {@link Queue} or a thread-unsafe {@link Queue} will
183      * result in an unspecified behavior.
184      */
185     public BufferedWriteHandler(Queue<MessageEvent> queue) {
186         this(queue, false);
187     }
188 
189     /**
190      * Creates a new instance with {@link ConcurrentLinkedQueue}
191      *
192      * @param consolidateOnFlush
193      *        {@code true} if and only if the buffered write requests are merged
194      *        into a single write request on {@link #flush()}
195      */
196     public BufferedWriteHandler(boolean consolidateOnFlush) {
197         this(new ConcurrentLinkedQueue<MessageEvent>(), consolidateOnFlush);
198     }
199 
200     /**
201      * Creates a new instance with the specified thread-safe unbounded
202      * {@link Queue}.  Please note that specifying a bounded {@link Queue} or
203      * a thread-unsafe {@link Queue} will result in an unspecified behavior.
204      *
205      * @param consolidateOnFlush
206      *        {@code true} if and only if the buffered write requests are merged
207      *        into a single write request on {@link #flush()}
208      */
209     public BufferedWriteHandler(Queue<MessageEvent> queue, boolean consolidateOnFlush) {
210         if (queue == null) {
211             throw new NullPointerException("queue");
212         }
213         this.queue = queue;
214         this.consolidateOnFlush = consolidateOnFlush;
215     }
216 
217     public boolean isConsolidateOnFlush() {
218         return consolidateOnFlush;
219     }
220 
221     /**
222      * Returns the queue which stores the write requests.  The default
223      * implementation returns the queue which was specified in the constructor.
224      */
225     protected Queue<MessageEvent> getQueue() {
226         return queue;
227     }
228 
229     /**
230      * Sends the queued write requests to the downstream.
231      */
232     public void flush() {
233         flush(consolidateOnFlush);
234     }
235 
236     /**
237      * Sends the queued write requests to the downstream.
238      *
239      * @param consolidateOnFlush
240      *        {@code true} if and only if the buffered write requests are merged
241      *        into a single write request
242      */
243     public void flush(boolean consolidateOnFlush) {
244         final ChannelHandlerContext ctx = this.ctx;
245         if (ctx == null) {
246             // No write request was made.
247             return;
248         }
249         Channel channel = ctx.getChannel();
250         boolean acquired;
251 
252         // use CAS to see if the have flush already running, if so we don't need to take further actions
253         if (acquired = flush.compareAndSet(false, true)) {
254             final Queue<MessageEvent> queue = getQueue();
255             if (consolidateOnFlush) {
256                 if (queue.isEmpty()) {
257                     flush.set(false);
258                     return;
259                 }
260 
261                 List<MessageEvent> pendingWrites = new ArrayList<MessageEvent>();
262                 for (;;) {
263                     MessageEvent e = queue.poll();
264                     if (e == null) {
265                         break;
266                     }
267                     if (!(e.getMessage() instanceof ChannelBuffer)) {
268                         if ((pendingWrites = consolidatedWrite(pendingWrites)) == null) {
269                             pendingWrites = new ArrayList<MessageEvent>();
270                         }
271                         ctx.sendDownstream(e);
272                     } else {
273                         pendingWrites.add(e);
274                     }
275                 }
276                 consolidatedWrite(pendingWrites);
277             } else {
278                 for (;;) {
279                     MessageEvent e = queue.poll();
280                     if (e == null) {
281                         break;
282                     }
283                     ctx.sendDownstream(e);
284                 }
285             }
286            flush.set(false);
287        }
288 
289         if (acquired && (!channel.isConnected() || channel.isWritable() && !queue.isEmpty())) {
290             flush(consolidateOnFlush);
291         }
292     }
293 
294     private List<MessageEvent> consolidatedWrite(final List<MessageEvent> pendingWrites) {
295         final int size = pendingWrites.size();
296         if (size == 1) {
297             ctx.sendDownstream(pendingWrites.remove(0));
298             return pendingWrites;
299         }
300         if (size == 0) {
301             return pendingWrites;
302         }
303 
304         ChannelBuffer[] data = new ChannelBuffer[size];
305         for (int i = 0; i < data.length; i ++) {
306             data[i] = (ChannelBuffer) pendingWrites.get(i).getMessage();
307         }
308 
309         ChannelBuffer composite = ChannelBuffers.wrappedBuffer(data);
310         ChannelFuture future = Channels.future(ctx.getChannel());
311         future.addListener(new ChannelFutureListener() {
312             public void operationComplete(ChannelFuture future)
313                     throws Exception {
314                 if (future.isSuccess()) {
315                     for (MessageEvent e: pendingWrites) {
316                         e.getFuture().setSuccess();
317                     }
318                 } else {
319                     Throwable cause = future.getCause();
320                     for (MessageEvent e: pendingWrites) {
321                         e.getFuture().setFailure(cause);
322                     }
323                 }
324             }
325         });
326 
327         Channels.write(ctx, future, composite);
328         return null;
329     }
330 
331     /**
332      * Stores all write requests to the queue so that they are actually written
333      * on {@link #flush()}.
334      */
335     @Override
336     public void writeRequested(ChannelHandlerContext ctx, MessageEvent e)
337             throws Exception {
338         if (this.ctx == null) {
339             this.ctx = ctx;
340         } else {
341             assert this.ctx == ctx;
342         }
343 
344         getQueue().add(e);
345     }
346 
347     @Override
348     public void disconnectRequested(ChannelHandlerContext ctx,
349             ChannelStateEvent e) throws Exception {
350         try {
351             flush(consolidateOnFlush);
352         } finally {
353             ctx.sendDownstream(e);
354         }
355     }
356 
357     @Override
358     public void closeRequested(ChannelHandlerContext ctx, ChannelStateEvent e)
359             throws Exception {
360         try {
361             flush(consolidateOnFlush);
362         } finally {
363             ctx.sendDownstream(e);
364         }
365     }
366 
367     /**
368      * Fail all buffered writes that are left. See
369      * <a href="https://github.com/netty/netty/issues/308>#308</a> for more details.
370      */
371     @Override
372     public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
373         Throwable cause = null;
374         for (;;) {
375             MessageEvent ev = queue.poll();
376 
377             if (ev == null) {
378                 break;
379             }
380 
381             if (cause == null) {
382                 cause = new ClosedChannelException();
383             }
384             ev.getFuture().setFailure(cause);
385         }
386         if (cause != null) {
387             Channels.fireExceptionCaught(ctx.getChannel(), cause);
388         }
389 
390         super.channelClosed(ctx, e);
391     }
392 
393     public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
394         // Nothing to do
395     }
396 
397     public void afterAdd(ChannelHandlerContext ctx) throws Exception {
398         // Nothing to do
399     }
400 
401     public void beforeRemove(ChannelHandlerContext ctx) throws Exception {
402         // flush a last time before remove the handler
403         flush(consolidateOnFlush);
404     }
405 
406     /**
407      * Fail all buffered writes that are left.
408      * See <a href="https://github.com/netty/netty/issues/308>#308</a> for more details.
409      */
410     public void afterRemove(ChannelHandlerContext ctx) throws Exception {
411         Throwable cause = null;
412         for (;;) {
413             MessageEvent ev = queue.poll();
414 
415             if (ev == null) {
416                 break;
417             }
418 
419             if (cause == null) {
420                 cause = new IOException("Unable to flush message");
421             }
422             ev.getFuture().setFailure(cause);
423         }
424 
425         if (cause != null) {
426             Channels.fireExceptionCaughtLater(ctx.getChannel(), cause);
427         }
428     }
429 }