1 /*
2 * Copyright 2012 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License,
5 * version 2.0 (the "License"); you may not use this file except in compliance
6 * with the License. You may obtain a copy of the License at:
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations
14 * under the License.
15 */
16 package org.jboss.netty.handler.execution;
17
18 import org.jboss.netty.channel.Channel;
19 import org.jboss.netty.channel.ChannelEvent;
20 import org.jboss.netty.channel.ChannelState;
21 import org.jboss.netty.channel.ChannelStateEvent;
22 import org.jboss.netty.util.ObjectSizeEstimator;
23 import org.jboss.netty.util.internal.ConcurrentIdentityWeakKeyHashMap;
24
25 import java.util.IdentityHashMap;
26 import java.util.Queue;
27 import java.util.Set;
28 import java.util.WeakHashMap;
29 import java.util.concurrent.ConcurrentLinkedQueue;
30 import java.util.concurrent.ConcurrentMap;
31 import java.util.concurrent.Executor;
32 import java.util.concurrent.ThreadFactory;
33 import java.util.concurrent.TimeUnit;
34 import java.util.concurrent.atomic.AtomicBoolean;
35
36 /**
37 * A {@link MemoryAwareThreadPoolExecutor} which makes sure the events from the
38 * same {@link Channel} are executed sequentially.
39 * <p>
40 * <b>NOTE</b>: This thread pool inherits most characteristics of its super
41 * type, so please make sure to refer to {@link MemoryAwareThreadPoolExecutor}
42 * to understand how it works basically.
43 *
44 * <h3>Event execution order</h3>
45 *
46 * For example, let's say there are two executor threads that handle the events
47 * from the two channels:
48 * <pre>
49 * -------------------------------------> Timeline ------------------------------------>
50 *
51 * Thread X: --- Channel A (Event A1) --. .-- Channel B (Event B2) --- Channel B (Event B3) --->
52 * \ /
53 * X
54 * / \
55 * Thread Y: --- Channel B (Event B1) --' '-- Channel A (Event A2) --- Channel A (Event A3) --->
56 * </pre>
57 * As you see, the events from different channels are independent from each
58 * other. That is, an event of Channel B will not be blocked by an event of
59 * Channel A and vice versa, unless the thread pool is exhausted.
60 * <p>
61 * Also, it is guaranteed that the invocation will be made sequentially for the
62 * events from the same channel. For example, the event A2 is never executed
63 * before the event A1 is finished. (Although not recommended, if you want the
64 * events from the same channel to be executed simultaneously, please use
65 * {@link MemoryAwareThreadPoolExecutor} instead.)
66 * <p>
67 * However, it is not guaranteed that the invocation will be made by the same
68 * thread for the same channel. The events from the same channel can be
69 * executed by different threads. For example, the Event A2 is executed by the
70 * thread Y while the event A1 was executed by the thread X.
71 *
72 * <h3>Using a different key other than {@link Channel} to maintain event order</h3>
73 * <p>
74 * {@link OrderedMemoryAwareThreadPoolExecutor} uses a {@link Channel} as a key
75 * that is used for maintaining the event execution order, as explained in the
76 * previous section. Alternatively, you can extend it to change its behavior.
77 * For example, you can change the key to the remote IP of the peer:
78 *
79 * <pre>
80 * public class RemoteAddressBasedOMATPE extends {@link OrderedMemoryAwareThreadPoolExecutor} {
81 *
82 * ... Constructors ...
83 *
84 * {@code @Override}
85 * protected ConcurrentMap<Object, Executor> newChildExecutorMap() {
86 * // The default implementation returns a special ConcurrentMap that
87 * // uses identity comparison only (see {@link IdentityHashMap}).
88 * // Because SocketAddress does not work with identity comparison,
89 * // we need to employ more generic implementation.
90 * return new ConcurrentHashMap<Object, Executor>
91 * }
92 *
93 * protected Object getChildExecutorKey({@link ChannelEvent} e) {
94 * // Use the IP of the remote peer as a key.
95 * return ((InetSocketAddress) e.getChannel().getRemoteAddress()).getAddress();
96 * }
97 *
98 * // Make public so that you can call from anywhere.
99 * public boolean removeChildExecutor(Object key) {
100 * super.removeChildExecutor(key);
101 * }
102 * }
103 * </pre>
104 *
105 * Please be very careful of memory leak of the child executor map. You must
106 * call {@link #removeChildExecutor(Object)} when the life cycle of the key
107 * ends (e.g. all connections from the same IP were closed.) Also, please
108 * keep in mind that the key can appear again after calling {@link #removeChildExecutor(Object)}
109 * (e.g. a new connection could come in from the same old IP after removal.)
110 * If in doubt, prune the old unused or stall keys from the child executor map
111 * periodically:
112 *
113 * <pre>
114 * RemoteAddressBasedOMATPE executor = ...;
115 *
116 * on every 3 seconds:
117 *
118 * for (Iterator<Object> i = executor.getChildExecutorKeySet().iterator; i.hasNext();) {
119 * InetAddress ip = (InetAddress) i.next();
120 * if (there is no active connection from 'ip' now &&
121 * there has been no incoming connection from 'ip' for last 10 minutes) {
122 * i.remove();
123 * }
124 * }
125 * </pre>
126 *
127 * If the expected maximum number of keys is small and deterministic, you could
128 * use a weak key map such as <a href="http://goo.gl/TqGl1">ConcurrentWeakHashMap</a>
129 * or synchronized {@link WeakHashMap} instead of managing the life cycle of the
130 * keys by yourself.
131 *
132 * @apiviz.landmark
133 */
134 public class OrderedMemoryAwareThreadPoolExecutor extends
135 MemoryAwareThreadPoolExecutor {
136
137 // TODO Make OMATPE focus on the case where Channel is the key.
138 // Add a new less-efficient TPE that allows custom key.
139
140 protected final ConcurrentMap<Object, Executor> childExecutors = newChildExecutorMap();
141
142 /**
143 * Creates a new instance.
144 *
145 * @param corePoolSize the maximum number of active threads
146 * @param maxChannelMemorySize the maximum total size of the queued events per channel.
147 * Specify {@code 0} to disable.
148 * @param maxTotalMemorySize the maximum total size of the queued events for this pool
149 * Specify {@code 0} to disable.
150 */
151 public OrderedMemoryAwareThreadPoolExecutor(
152 int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize) {
153 super(corePoolSize, maxChannelMemorySize, maxTotalMemorySize);
154 }
155
156 /**
157 * Creates a new instance.
158 *
159 * @param corePoolSize the maximum number of active threads
160 * @param maxChannelMemorySize the maximum total size of the queued events per channel.
161 * Specify {@code 0} to disable.
162 * @param maxTotalMemorySize the maximum total size of the queued events for this pool
163 * Specify {@code 0} to disable.
164 * @param keepAliveTime the amount of time for an inactive thread to shut itself down
165 * @param unit the {@link TimeUnit} of {@code keepAliveTime}
166 */
167 public OrderedMemoryAwareThreadPoolExecutor(
168 int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize,
169 long keepAliveTime, TimeUnit unit) {
170 super(corePoolSize, maxChannelMemorySize, maxTotalMemorySize,
171 keepAliveTime, unit);
172 }
173
174 /**
175 * Creates a new instance.
176 *
177 * @param corePoolSize the maximum number of active threads
178 * @param maxChannelMemorySize the maximum total size of the queued events per channel.
179 * Specify {@code 0} to disable.
180 * @param maxTotalMemorySize the maximum total size of the queued events for this pool
181 * Specify {@code 0} to disable.
182 * @param keepAliveTime the amount of time for an inactive thread to shut itself down
183 * @param unit the {@link TimeUnit} of {@code keepAliveTime}
184 * @param threadFactory the {@link ThreadFactory} of this pool
185 */
186 public OrderedMemoryAwareThreadPoolExecutor(
187 int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize,
188 long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
189 super(corePoolSize, maxChannelMemorySize, maxTotalMemorySize,
190 keepAliveTime, unit, threadFactory);
191 }
192
193 /**
194 * Creates a new instance.
195 *
196 * @param corePoolSize the maximum number of active threads
197 * @param maxChannelMemorySize the maximum total size of the queued events per channel.
198 * Specify {@code 0} to disable.
199 * @param maxTotalMemorySize the maximum total size of the queued events for this pool
200 * Specify {@code 0} to disable.
201 * @param keepAliveTime the amount of time for an inactive thread to shut itself down
202 * @param unit the {@link TimeUnit} of {@code keepAliveTime}
203 * @param threadFactory the {@link ThreadFactory} of this pool
204 * @param objectSizeEstimator the {@link ObjectSizeEstimator} of this pool
205 */
206 public OrderedMemoryAwareThreadPoolExecutor(
207 int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize,
208 long keepAliveTime, TimeUnit unit,
209 ObjectSizeEstimator objectSizeEstimator, ThreadFactory threadFactory) {
210 super(corePoolSize, maxChannelMemorySize, maxTotalMemorySize,
211 keepAliveTime, unit, objectSizeEstimator, threadFactory);
212 }
213
214 protected ConcurrentMap<Object, Executor> newChildExecutorMap() {
215 return new ConcurrentIdentityWeakKeyHashMap<Object, Executor>();
216 }
217
218 protected Object getChildExecutorKey(ChannelEvent e) {
219 return e.getChannel();
220 }
221
222 protected Set<Object> getChildExecutorKeySet() {
223 return childExecutors.keySet();
224 }
225
226 protected boolean removeChildExecutor(Object key) {
227 // FIXME: Succeed only when there is no task in the ChildExecutor's queue.
228 // Note that it will need locking which might slow down task submission.
229 return childExecutors.remove(key) != null;
230 }
231
232 /**
233 * Executes the specified task concurrently while maintaining the event
234 * order.
235 */
236 @Override
237 protected void doExecute(Runnable task) {
238 if (!(task instanceof ChannelEventRunnable)) {
239 doUnorderedExecute(task);
240 } else {
241 ChannelEventRunnable r = (ChannelEventRunnable) task;
242 getChildExecutor(r.getEvent()).execute(task);
243 }
244 }
245
246 protected Executor getChildExecutor(ChannelEvent e) {
247 Object key = getChildExecutorKey(e);
248 Executor executor = childExecutors.get(key);
249 if (executor == null) {
250 executor = new ChildExecutor();
251 Executor oldExecutor = childExecutors.putIfAbsent(key, executor);
252 if (oldExecutor != null) {
253 executor = oldExecutor;
254 }
255 }
256
257 // Remove the entry when the channel closes.
258 if (e instanceof ChannelStateEvent) {
259 Channel channel = e.getChannel();
260 ChannelStateEvent se = (ChannelStateEvent) e;
261 if (se.getState() == ChannelState.OPEN &&
262 !channel.isOpen()) {
263 removeChildExecutor(key);
264 }
265 }
266 return executor;
267 }
268
269 @Override
270 protected boolean shouldCount(Runnable task) {
271 if (task instanceof ChildExecutor) {
272 return false;
273 }
274
275 return super.shouldCount(task);
276 }
277
278 void onAfterExecute(Runnable r, Throwable t) {
279 afterExecute(r, t);
280 }
281
282 protected final class ChildExecutor implements Executor, Runnable {
283 private final Queue<Runnable> tasks = new ConcurrentLinkedQueue<Runnable>();
284 private final AtomicBoolean isRunning = new AtomicBoolean();
285
286 public void execute(Runnable command) {
287 // TODO: What todo if the add return false ?
288 tasks.add(command);
289
290 if (!isRunning.get()) {
291 doUnorderedExecute(this);
292 }
293 }
294
295 public void run() {
296 boolean acquired;
297
298 // check if its already running by using CAS. If so just return here. So in the worst case the thread
299 // is executed and do nothing
300 if (isRunning.compareAndSet(false, true)) {
301 acquired = true;
302 try {
303 Thread thread = Thread.currentThread();
304 for (;;) {
305 final Runnable task = tasks.poll();
306 // if the task is null we should exit the loop
307 if (task == null) {
308 break;
309 }
310
311 boolean ran = false;
312 beforeExecute(thread, task);
313 try {
314 task.run();
315 ran = true;
316 onAfterExecute(task, null);
317 } catch (RuntimeException e) {
318 if (!ran) {
319 onAfterExecute(task, e);
320 }
321 throw e;
322 }
323 }
324 } finally {
325 // set it back to not running
326 isRunning.set(false);
327 }
328
329 if (acquired && !isRunning.get() && tasks.peek() != null) {
330 doUnorderedExecute(this);
331 }
332 }
333 }
334 }
335 }