View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.handler.execution;
17  
18  import java.util.IdentityHashMap;
19  import java.util.Queue;
20  import java.util.Set;
21  import java.util.WeakHashMap;
22  import java.util.concurrent.ConcurrentLinkedQueue;
23  import java.util.concurrent.ConcurrentMap;
24  import java.util.concurrent.Executor;
25  import java.util.concurrent.ThreadFactory;
26  import java.util.concurrent.TimeUnit;
27  import java.util.concurrent.atomic.AtomicBoolean;
28  
29  import org.jboss.netty.channel.Channel;
30  import org.jboss.netty.channel.ChannelEvent;
31  import org.jboss.netty.channel.ChannelState;
32  import org.jboss.netty.channel.ChannelStateEvent;
33  import org.jboss.netty.util.ObjectSizeEstimator;
34  import org.jboss.netty.util.internal.ConcurrentIdentityWeakKeyHashMap;
35  
36  /**
37   * A {@link MemoryAwareThreadPoolExecutor} which makes sure the events from the
38   * same {@link Channel} are executed sequentially.
39   * <p>
40   * <b>NOTE</b>: This thread pool inherits most characteristics of its super
41   * type, so please make sure to refer to {@link MemoryAwareThreadPoolExecutor}
42   * to understand how it works basically.
43   *
44   * <h3>Event execution order</h3>
45   *
46   * For example, let's say there are two executor threads that handle the events
47   * from the two channels:
48   * <pre>
49   *           -------------------------------------&gt; Timeline ------------------------------------&gt;
50   *
51   * Thread X: --- Channel A (Event A1) --.   .-- Channel B (Event B2) --- Channel B (Event B3) ---&gt;
52   *                                      \ /
53   *                                       X
54   *                                      / \
55   * Thread Y: --- Channel B (Event B1) --'   '-- Channel A (Event A2) --- Channel A (Event A3) ---&gt;
56   * </pre>
57   * As you see, the events from different channels are independent from each
58   * other.  That is, an event of Channel B will not be blocked by an event of
59   * Channel A and vice versa, unless the thread pool is exhausted.
60   * <p>
61   * Also, it is guaranteed that the invocation will be made sequentially for the
62   * events from the same channel.  For example, the event A2 is never executed
63   * before the event A1 is finished.  (Although not recommended, if you want the
64   * events from the same channel to be executed simultaneously, please use
65   * {@link MemoryAwareThreadPoolExecutor} instead.)
66   * <p>
67   * However, it is not guaranteed that the invocation will be made by the same
68   * thread for the same channel.  The events from the same channel can be
69   * executed by different threads.  For example, the Event A2 is executed by the
70   * thread Y while the event A1 was executed by the thread X.
71   *
72   * <h3>Using a different key other than {@link Channel} to maintain event order</h3>
73   * <p>
74   * {@link OrderedMemoryAwareThreadPoolExecutor} uses a {@link Channel} as a key
75   * that is used for maintaining the event execution order, as explained in the
76   * previous section.  Alternatively, you can extend it to change its behavior.
77   * For example, you can change the key to the remote IP of the peer:
78   *
79   * <pre>
80   * public class RemoteAddressBasedOMATPE extends {@link OrderedMemoryAwareThreadPoolExecutor} {
81   *
82   *     ... Constructors ...
83   *
84   *     {@code @Override}
85   *     protected ConcurrentMap&lt;Object, Executor&gt; newChildExecutorMap() {
86   *         // The default implementation returns a special ConcurrentMap that
87   *         // uses identity comparison only (see {@link IdentityHashMap}).
88   *         // Because SocketAddress does not work with identity comparison,
89   *         // we need to employ more generic implementation.
90   *         return new ConcurrentHashMap&lt;Object, Executor&gt;
91   *     }
92   *
93   *     protected Object getChildExecutorKey({@link ChannelEvent} e) {
94   *         // Use the IP of the remote peer as a key.
95   *         return ((InetSocketAddress) e.getChannel().getRemoteAddress()).getAddress();
96   *     }
97   *
98   *     // Make public so that you can call from anywhere.
99   *     public boolean removeChildExecutor(Object key) {
100  *         super.removeChildExecutor(key);
101  *     }
102  * }
103  * </pre>
104  *
105  * Please be very careful of memory leak of the child executor map.  You must
106  * call {@link #removeChildExecutor(Object)} when the life cycle of the key
107  * ends (e.g. all connections from the same IP were closed.)  Also, please
108  * keep in mind that the key can appear again after calling {@link #removeChildExecutor(Object)}
109  * (e.g. a new connection could come in from the same old IP after removal.)
110  * If in doubt, prune the old unused or stall keys from the child executor map
111  * periodically:
112  *
113  * <pre>
114  * RemoteAddressBasedOMATPE executor = ...;
115  *
116  * on every 3 seconds:
117  *
118  *   for (Iterator&lt;Object&gt; i = executor.getChildExecutorKeySet().iterator; i.hasNext();) {
119  *       InetAddress ip = (InetAddress) i.next();
120  *       if (there is no active connection from 'ip' now &&
121  *           there has been no incoming connection from 'ip' for last 10 minutes) {
122  *           i.remove();
123  *       }
124  *   }
125  * </pre>
126  *
127  * If the expected maximum number of keys is small and deterministic, you could
128  * use a weak key map such as <a href="http://goo.gl/TqGl1">ConcurrentWeakHashMap</a>
129  * or synchronized {@link WeakHashMap} instead of managing the life cycle of the
130  * keys by yourself.
131  *
132  * @apiviz.landmark
133  */
134 public class OrderedMemoryAwareThreadPoolExecutor extends
135         MemoryAwareThreadPoolExecutor {
136 
137     // TODO Make OMATPE focus on the case where Channel is the key.
138     //      Add a new less-efficient TPE that allows custom key.
139 
140     protected final ConcurrentMap<Object, Executor> childExecutors = newChildExecutorMap();
141 
142     /**
143      * Creates a new instance.
144      *
145      * @param corePoolSize          the maximum number of active threads
146      * @param maxChannelMemorySize  the maximum total size of the queued events per channel.
147      *                              Specify {@code 0} to disable.
148      * @param maxTotalMemorySize    the maximum total size of the queued events for this pool
149      *                              Specify {@code 0} to disable.
150      */
151     public OrderedMemoryAwareThreadPoolExecutor(
152             int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize) {
153         super(corePoolSize, maxChannelMemorySize, maxTotalMemorySize);
154     }
155 
156     /**
157      * Creates a new instance.
158      *
159      * @param corePoolSize          the maximum number of active threads
160      * @param maxChannelMemorySize  the maximum total size of the queued events per channel.
161      *                              Specify {@code 0} to disable.
162      * @param maxTotalMemorySize    the maximum total size of the queued events for this pool
163      *                              Specify {@code 0} to disable.
164      * @param keepAliveTime         the amount of time for an inactive thread to shut itself down
165      * @param unit                  the {@link TimeUnit} of {@code keepAliveTime}
166      */
167     public OrderedMemoryAwareThreadPoolExecutor(
168             int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize,
169             long keepAliveTime, TimeUnit unit) {
170         super(corePoolSize, maxChannelMemorySize, maxTotalMemorySize,
171                 keepAliveTime, unit);
172     }
173 
174     /**
175      * Creates a new instance.
176      *
177      * @param corePoolSize          the maximum number of active threads
178      * @param maxChannelMemorySize  the maximum total size of the queued events per channel.
179      *                              Specify {@code 0} to disable.
180      * @param maxTotalMemorySize    the maximum total size of the queued events for this pool
181      *                              Specify {@code 0} to disable.
182      * @param keepAliveTime         the amount of time for an inactive thread to shut itself down
183      * @param unit                  the {@link TimeUnit} of {@code keepAliveTime}
184      * @param threadFactory         the {@link ThreadFactory} of this pool
185      */
186     public OrderedMemoryAwareThreadPoolExecutor(
187             int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize,
188             long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
189         super(corePoolSize, maxChannelMemorySize, maxTotalMemorySize,
190                 keepAliveTime, unit, threadFactory);
191     }
192 
193     /**
194      * Creates a new instance.
195      *
196      * @param corePoolSize          the maximum number of active threads
197      * @param maxChannelMemorySize  the maximum total size of the queued events per channel.
198      *                              Specify {@code 0} to disable.
199      * @param maxTotalMemorySize    the maximum total size of the queued events for this pool
200      *                              Specify {@code 0} to disable.
201      * @param keepAliveTime         the amount of time for an inactive thread to shut itself down
202      * @param unit                  the {@link TimeUnit} of {@code keepAliveTime}
203      * @param threadFactory         the {@link ThreadFactory} of this pool
204      * @param objectSizeEstimator   the {@link ObjectSizeEstimator} of this pool
205      */
206     public OrderedMemoryAwareThreadPoolExecutor(
207             int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize,
208             long keepAliveTime, TimeUnit unit,
209             ObjectSizeEstimator objectSizeEstimator, ThreadFactory threadFactory) {
210         super(corePoolSize, maxChannelMemorySize, maxTotalMemorySize,
211                 keepAliveTime, unit, objectSizeEstimator, threadFactory);
212     }
213 
214     protected ConcurrentMap<Object, Executor> newChildExecutorMap() {
215         return new ConcurrentIdentityWeakKeyHashMap<Object, Executor>();
216     }
217 
218     protected Object getChildExecutorKey(ChannelEvent e) {
219         return e.getChannel();
220     }
221 
222     protected Set<Object> getChildExecutorKeySet() {
223         return childExecutors.keySet();
224     }
225 
226     protected boolean removeChildExecutor(Object key) {
227         // FIXME: Succeed only when there is no task in the ChildExecutor's queue.
228         //        Note that it will need locking which might slow down task submission.
229         return childExecutors.remove(key) != null;
230     }
231 
232     /**
233      * Executes the specified task concurrently while maintaining the event
234      * order.
235      */
236     @Override
237     protected void doExecute(Runnable task) {
238         if (!(task instanceof ChannelEventRunnable)) {
239             doUnorderedExecute(task);
240         } else {
241             ChannelEventRunnable r = (ChannelEventRunnable) task;
242             getChildExecutor(r.getEvent()).execute(task);
243         }
244     }
245 
246     protected Executor getChildExecutor(ChannelEvent e) {
247         Object key = getChildExecutorKey(e);
248         Executor executor = childExecutors.get(key);
249         if (executor == null) {
250             executor = new ChildExecutor();
251             Executor oldExecutor = childExecutors.putIfAbsent(key, executor);
252             if (oldExecutor != null) {
253                 executor = oldExecutor;
254             }
255         }
256 
257         // Remove the entry when the channel closes.
258         if (e instanceof ChannelStateEvent) {
259             Channel channel = e.getChannel();
260             ChannelStateEvent se = (ChannelStateEvent) e;
261             if (se.getState() == ChannelState.OPEN &&
262                 !channel.isOpen()) {
263                 removeChildExecutor(key);
264             }
265         }
266         return executor;
267     }
268 
269     @Override
270     protected boolean shouldCount(Runnable task) {
271         if (task instanceof ChildExecutor) {
272             return false;
273         }
274 
275         return super.shouldCount(task);
276     }
277 
278     void onAfterExecute(Runnable r, Throwable t) {
279         afterExecute(r, t);
280     }
281 
282     protected final class ChildExecutor implements Executor, Runnable {
283         private final Queue<Runnable> tasks = new ConcurrentLinkedQueue<Runnable>();
284         private final AtomicBoolean isRunning = new AtomicBoolean();
285 
286         public void execute(Runnable command) {
287             // TODO: What todo if the add return false ?
288             tasks.add(command);
289 
290 
291             if (!isRunning.get()) {
292                 doUnorderedExecute(this);
293             }
294         }
295 
296         public void run() {
297             boolean acquired = false;
298 
299             // check if its already running by using CAS. If so just return here. So in the worst case the thread
300             // is executed and do nothing
301             if (isRunning.compareAndSet(false, true)) {
302                 acquired = true;
303                 try {
304                     Thread thread = Thread.currentThread();
305                     for (;;) {
306                         final Runnable task = tasks.poll();
307                         // if the task is null we should exit the loop
308                         if (task == null) {
309                             break;
310                         }
311 
312                         boolean ran = false;
313                         beforeExecute(thread, task);
314                         try {
315                             task.run();
316                             ran = true;
317                             onAfterExecute(task, null);
318                         } catch (RuntimeException e) {
319                             if (!ran) {
320                                 onAfterExecute(task, e);
321                             }
322                             throw e;
323                         }
324                     }
325                 } finally {
326                     // set it back to not running
327                     isRunning.set(false);
328                 }
329 
330                 if (acquired && !isRunning.get() && tasks.peek() != null) {
331                     doUnorderedExecute(this);
332                 }
333 
334             }
335         }
336     }
337 }