1 /* 2 * Copyright 2012 The Netty Project 3 * 4 * The Netty Project licenses this file to you under the Apache License, 5 * version 2.0 (the "License"); you may not use this file except in compliance 6 * with the License. You may obtain a copy of the License at: 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 13 * License for the specific language governing permissions and limitations 14 * under the License. 15 */ 16 package org.jboss.netty.handler.execution; 17 18 import org.jboss.netty.channel.Channel; 19 import org.jboss.netty.channel.ChannelEvent; 20 import org.jboss.netty.channel.ChannelState; 21 import org.jboss.netty.channel.ChannelStateEvent; 22 import org.jboss.netty.util.ObjectSizeEstimator; 23 import org.jboss.netty.util.internal.ConcurrentIdentityWeakKeyHashMap; 24 25 import java.util.IdentityHashMap; 26 import java.util.Queue; 27 import java.util.Set; 28 import java.util.WeakHashMap; 29 import java.util.concurrent.ConcurrentLinkedQueue; 30 import java.util.concurrent.ConcurrentMap; 31 import java.util.concurrent.Executor; 32 import java.util.concurrent.ThreadFactory; 33 import java.util.concurrent.TimeUnit; 34 import java.util.concurrent.atomic.AtomicBoolean; 35 36 /** 37 * A {@link MemoryAwareThreadPoolExecutor} which makes sure the events from the 38 * same {@link Channel} are executed sequentially. 39 * <p> 40 * <b>NOTE</b>: This thread pool inherits most characteristics of its super 41 * type, so please make sure to refer to {@link MemoryAwareThreadPoolExecutor} 42 * to understand how it works basically. 43 * 44 * <h3>Event execution order</h3> 45 * 46 * For example, let's say there are two executor threads that handle the events 47 * from the two channels: 48 * <pre> 49 * -------------------------------------> Timeline ------------------------------------> 50 * 51 * Thread X: --- Channel A (Event A1) --. .-- Channel B (Event B2) --- Channel B (Event B3) ---> 52 * \ / 53 * X 54 * / \ 55 * Thread Y: --- Channel B (Event B1) --' '-- Channel A (Event A2) --- Channel A (Event A3) ---> 56 * </pre> 57 * As you see, the events from different channels are independent from each 58 * other. That is, an event of Channel B will not be blocked by an event of 59 * Channel A and vice versa, unless the thread pool is exhausted. 60 * <p> 61 * Also, it is guaranteed that the invocation will be made sequentially for the 62 * events from the same channel. For example, the event A2 is never executed 63 * before the event A1 is finished. (Although not recommended, if you want the 64 * events from the same channel to be executed simultaneously, please use 65 * {@link MemoryAwareThreadPoolExecutor} instead.) 66 * <p> 67 * However, it is not guaranteed that the invocation will be made by the same 68 * thread for the same channel. The events from the same channel can be 69 * executed by different threads. For example, the Event A2 is executed by the 70 * thread Y while the event A1 was executed by the thread X. 71 * 72 * <h3>Using a different key other than {@link Channel} to maintain event order</h3> 73 * <p> 74 * {@link OrderedMemoryAwareThreadPoolExecutor} uses a {@link Channel} as a key 75 * that is used for maintaining the event execution order, as explained in the 76 * previous section. Alternatively, you can extend it to change its behavior. 77 * For example, you can change the key to the remote IP of the peer: 78 * 79 * <pre> 80 * public class RemoteAddressBasedOMATPE extends {@link OrderedMemoryAwareThreadPoolExecutor} { 81 * 82 * ... Constructors ... 83 * 84 * {@code @Override} 85 * protected ConcurrentMap<Object, Executor> newChildExecutorMap() { 86 * // The default implementation returns a special ConcurrentMap that 87 * // uses identity comparison only (see {@link IdentityHashMap}). 88 * // Because SocketAddress does not work with identity comparison, 89 * // we need to employ more generic implementation. 90 * return new ConcurrentHashMap<Object, Executor> 91 * } 92 * 93 * protected Object getChildExecutorKey({@link ChannelEvent} e) { 94 * // Use the IP of the remote peer as a key. 95 * return ((InetSocketAddress) e.getChannel().getRemoteAddress()).getAddress(); 96 * } 97 * 98 * // Make public so that you can call from anywhere. 99 * public boolean removeChildExecutor(Object key) { 100 * super.removeChildExecutor(key); 101 * } 102 * } 103 * </pre> 104 * 105 * Please be very careful of memory leak of the child executor map. You must 106 * call {@link #removeChildExecutor(Object)} when the life cycle of the key 107 * ends (e.g. all connections from the same IP were closed.) Also, please 108 * keep in mind that the key can appear again after calling {@link #removeChildExecutor(Object)} 109 * (e.g. a new connection could come in from the same old IP after removal.) 110 * If in doubt, prune the old unused or stall keys from the child executor map 111 * periodically: 112 * 113 * <pre> 114 * RemoteAddressBasedOMATPE executor = ...; 115 * 116 * on every 3 seconds: 117 * 118 * for (Iterator<Object> i = executor.getChildExecutorKeySet().iterator; i.hasNext();) { 119 * InetAddress ip = (InetAddress) i.next(); 120 * if (there is no active connection from 'ip' now && 121 * there has been no incoming connection from 'ip' for last 10 minutes) { 122 * i.remove(); 123 * } 124 * } 125 * </pre> 126 * 127 * If the expected maximum number of keys is small and deterministic, you could 128 * use a weak key map such as <a href="http://netty.io/s/cwhashmap">ConcurrentWeakHashMap</a> 129 * or synchronized {@link WeakHashMap} instead of managing the life cycle of the 130 * keys by yourself. 131 * 132 * @apiviz.landmark 133 */ 134 public class OrderedMemoryAwareThreadPoolExecutor extends 135 MemoryAwareThreadPoolExecutor { 136 137 // TODO Make OMATPE focus on the case where Channel is the key. 138 // Add a new less-efficient TPE that allows custom key. 139 140 protected final ConcurrentMap<Object, Executor> childExecutors = newChildExecutorMap(); 141 142 /** 143 * Creates a new instance. 144 * 145 * @param corePoolSize the maximum number of active threads 146 * @param maxChannelMemorySize the maximum total size of the queued events per channel. 147 * Specify {@code 0} to disable. 148 * @param maxTotalMemorySize the maximum total size of the queued events for this pool 149 * Specify {@code 0} to disable. 150 */ 151 public OrderedMemoryAwareThreadPoolExecutor( 152 int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize) { 153 super(corePoolSize, maxChannelMemorySize, maxTotalMemorySize); 154 } 155 156 /** 157 * Creates a new instance. 158 * 159 * @param corePoolSize the maximum number of active threads 160 * @param maxChannelMemorySize the maximum total size of the queued events per channel. 161 * Specify {@code 0} to disable. 162 * @param maxTotalMemorySize the maximum total size of the queued events for this pool 163 * Specify {@code 0} to disable. 164 * @param keepAliveTime the amount of time for an inactive thread to shut itself down 165 * @param unit the {@link TimeUnit} of {@code keepAliveTime} 166 */ 167 public OrderedMemoryAwareThreadPoolExecutor( 168 int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize, 169 long keepAliveTime, TimeUnit unit) { 170 super(corePoolSize, maxChannelMemorySize, maxTotalMemorySize, 171 keepAliveTime, unit); 172 } 173 174 /** 175 * Creates a new instance. 176 * 177 * @param corePoolSize the maximum number of active threads 178 * @param maxChannelMemorySize the maximum total size of the queued events per channel. 179 * Specify {@code 0} to disable. 180 * @param maxTotalMemorySize the maximum total size of the queued events for this pool 181 * Specify {@code 0} to disable. 182 * @param keepAliveTime the amount of time for an inactive thread to shut itself down 183 * @param unit the {@link TimeUnit} of {@code keepAliveTime} 184 * @param threadFactory the {@link ThreadFactory} of this pool 185 */ 186 public OrderedMemoryAwareThreadPoolExecutor( 187 int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize, 188 long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) { 189 super(corePoolSize, maxChannelMemorySize, maxTotalMemorySize, 190 keepAliveTime, unit, threadFactory); 191 } 192 193 /** 194 * Creates a new instance. 195 * 196 * @param corePoolSize the maximum number of active threads 197 * @param maxChannelMemorySize the maximum total size of the queued events per channel. 198 * Specify {@code 0} to disable. 199 * @param maxTotalMemorySize the maximum total size of the queued events for this pool 200 * Specify {@code 0} to disable. 201 * @param keepAliveTime the amount of time for an inactive thread to shut itself down 202 * @param unit the {@link TimeUnit} of {@code keepAliveTime} 203 * @param threadFactory the {@link ThreadFactory} of this pool 204 * @param objectSizeEstimator the {@link ObjectSizeEstimator} of this pool 205 */ 206 public OrderedMemoryAwareThreadPoolExecutor( 207 int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize, 208 long keepAliveTime, TimeUnit unit, 209 ObjectSizeEstimator objectSizeEstimator, ThreadFactory threadFactory) { 210 super(corePoolSize, maxChannelMemorySize, maxTotalMemorySize, 211 keepAliveTime, unit, objectSizeEstimator, threadFactory); 212 } 213 214 protected ConcurrentMap<Object, Executor> newChildExecutorMap() { 215 return new ConcurrentIdentityWeakKeyHashMap<Object, Executor>(); 216 } 217 218 protected Object getChildExecutorKey(ChannelEvent e) { 219 return e.getChannel(); 220 } 221 222 protected Set<Object> getChildExecutorKeySet() { 223 return childExecutors.keySet(); 224 } 225 226 protected boolean removeChildExecutor(Object key) { 227 // FIXME: Succeed only when there is no task in the ChildExecutor's queue. 228 // Note that it will need locking which might slow down task submission. 229 return childExecutors.remove(key) != null; 230 } 231 232 /** 233 * Executes the specified task concurrently while maintaining the event 234 * order. 235 */ 236 @Override 237 protected void doExecute(Runnable task) { 238 if (!(task instanceof ChannelEventRunnable)) { 239 doUnorderedExecute(task); 240 } else { 241 ChannelEventRunnable r = (ChannelEventRunnable) task; 242 getChildExecutor(r.getEvent()).execute(task); 243 } 244 } 245 246 protected Executor getChildExecutor(ChannelEvent e) { 247 Object key = getChildExecutorKey(e); 248 Executor executor = childExecutors.get(key); 249 if (executor == null) { 250 executor = new ChildExecutor(); 251 Executor oldExecutor = childExecutors.putIfAbsent(key, executor); 252 if (oldExecutor != null) { 253 executor = oldExecutor; 254 } 255 } 256 257 // Remove the entry when the channel closes. 258 if (e instanceof ChannelStateEvent) { 259 Channel channel = e.getChannel(); 260 ChannelStateEvent se = (ChannelStateEvent) e; 261 if (se.getState() == ChannelState.OPEN && 262 !channel.isOpen()) { 263 removeChildExecutor(key); 264 } 265 } 266 return executor; 267 } 268 269 @Override 270 protected boolean shouldCount(Runnable task) { 271 if (task instanceof ChildExecutor) { 272 return false; 273 } 274 275 return super.shouldCount(task); 276 } 277 278 void onAfterExecute(Runnable r, Throwable t) { 279 afterExecute(r, t); 280 } 281 282 protected final class ChildExecutor implements Executor, Runnable { 283 private final Queue<Runnable> tasks = new ConcurrentLinkedQueue<Runnable>(); 284 private final AtomicBoolean isRunning = new AtomicBoolean(); 285 286 public void execute(Runnable command) { 287 // TODO: What todo if the add return false ? 288 tasks.add(command); 289 290 if (!isRunning.get()) { 291 doUnorderedExecute(this); 292 } 293 } 294 295 public void run() { 296 boolean acquired; 297 298 // check if its already running by using CAS. If so just return here. So in the worst case the thread 299 // is executed and do nothing 300 if (isRunning.compareAndSet(false, true)) { 301 acquired = true; 302 try { 303 Thread thread = Thread.currentThread(); 304 for (;;) { 305 final Runnable task = tasks.poll(); 306 // if the task is null we should exit the loop 307 if (task == null) { 308 break; 309 } 310 311 boolean ran = false; 312 beforeExecute(thread, task); 313 try { 314 task.run(); 315 ran = true; 316 onAfterExecute(task, null); 317 } catch (RuntimeException e) { 318 if (!ran) { 319 onAfterExecute(task, e); 320 } 321 throw e; 322 } 323 } 324 } finally { 325 // set it back to not running 326 isRunning.set(false); 327 } 328 329 if (acquired && !isRunning.get() && tasks.peek() != null) { 330 doUnorderedExecute(this); 331 } 332 } 333 } 334 } 335 }