public class OrderedMemoryAwareThreadPoolExecutor extends MemoryAwareThreadPoolExecutor
MemoryAwareThreadPoolExecutor
which makes sure the events from the
same Channel
are executed sequentially.
NOTE: This thread pool inherits most characteristics of its super
type, so please make sure to refer to MemoryAwareThreadPoolExecutor
to understand how it works basically.
-------------------------------------> Timeline ------------------------------------> Thread X: --- Channel A (Event A1) --. .-- Channel B (Event B2) --- Channel B (Event B3) ---> \ / X / \ Thread Y: --- Channel B (Event B1) --' '-- Channel A (Event A2) --- Channel A (Event A3) --->As you see, the events from different channels are independent from each other. That is, an event of Channel B will not be blocked by an event of Channel A and vice versa, unless the thread pool is exhausted.
Also, it is guaranteed that the invocation will be made sequentially for the
events from the same channel. For example, the event A2 is never executed
before the event A1 is finished. (Although not recommended, if you want the
events from the same channel to be executed simultaneously, please use
MemoryAwareThreadPoolExecutor
instead.)
However, it is not guaranteed that the invocation will be made by the same thread for the same channel. The events from the same channel can be executed by different threads. For example, the Event A2 is executed by the thread Y while the event A1 was executed by the thread X.
Channel
to maintain event order
OrderedMemoryAwareThreadPoolExecutor
uses a Channel
as a key
that is used for maintaining the event execution order, as explained in the
previous section. Alternatively, you can extend it to change its behavior.
For example, you can change the key to the remote IP of the peer:
public class RemoteAddressBasedOMATPE extendsPlease be very careful of memory leak of the child executor map. You must callOrderedMemoryAwareThreadPoolExecutor
{ ... Constructors ...@Override
protected ConcurrentMap<Object, Executor> newChildExecutorMap() { // The default implementation returns a special ConcurrentMap that // uses identity comparison only (seeIdentityHashMap
). // Because SocketAddress does not work with identity comparison, // we need to employ more generic implementation. return new ConcurrentHashMap<Object, Executor> } protected Object getChildExecutorKey(ChannelEvent
e) { // Use the IP of the remote peer as a key. return ((InetSocketAddress) e.getChannel().getRemoteAddress()).getAddress(); } // Make public so that you can call from anywhere. public boolean removeChildExecutor(Object key) { super.removeChildExecutor(key); } }
removeChildExecutor(Object)
when the life cycle of the key
ends (e.g. all connections from the same IP were closed.) Also, please
keep in mind that the key can appear again after calling removeChildExecutor(Object)
(e.g. a new connection could come in from the same old IP after removal.)
If in doubt, prune the old unused or stall keys from the child executor map
periodically:
RemoteAddressBasedOMATPE executor = ...; on every 3 seconds: for (Iterator<Object> i = executor.getChildExecutorKeySet().iterator; i.hasNext();) { InetAddress ip = (InetAddress) i.next(); if (there is no active connection from 'ip' now && there has been no incoming connection from 'ip' for last 10 minutes) { i.remove(); } }If the expected maximum number of keys is small and deterministic, you could use a weak key map such as ConcurrentWeakHashMap or synchronized
WeakHashMap
instead of managing the life cycle of the
keys by yourself.Modifier and Type | Class and Description |
---|---|
protected class |
OrderedMemoryAwareThreadPoolExecutor.ChildExecutor |
ThreadPoolExecutor.AbortPolicy, ThreadPoolExecutor.CallerRunsPolicy, ThreadPoolExecutor.DiscardOldestPolicy, ThreadPoolExecutor.DiscardPolicy
Modifier and Type | Field and Description |
---|---|
protected ConcurrentMap<Object,Executor> |
childExecutors |
Constructor and Description |
---|
OrderedMemoryAwareThreadPoolExecutor(int corePoolSize,
long maxChannelMemorySize,
long maxTotalMemorySize)
Creates a new instance.
|
OrderedMemoryAwareThreadPoolExecutor(int corePoolSize,
long maxChannelMemorySize,
long maxTotalMemorySize,
long keepAliveTime,
TimeUnit unit)
Creates a new instance.
|
OrderedMemoryAwareThreadPoolExecutor(int corePoolSize,
long maxChannelMemorySize,
long maxTotalMemorySize,
long keepAliveTime,
TimeUnit unit,
ObjectSizeEstimator objectSizeEstimator,
ThreadFactory threadFactory)
Creates a new instance.
|
OrderedMemoryAwareThreadPoolExecutor(int corePoolSize,
long maxChannelMemorySize,
long maxTotalMemorySize,
long keepAliveTime,
TimeUnit unit,
ThreadFactory threadFactory)
Creates a new instance.
|
Modifier and Type | Method and Description |
---|---|
protected void |
doExecute(Runnable task)
Executes the specified task concurrently while maintaining the event
order.
|
protected Executor |
getChildExecutor(ChannelEvent e) |
protected Object |
getChildExecutorKey(ChannelEvent e) |
protected Set<Object> |
getChildExecutorKeySet() |
protected ConcurrentMap<Object,Executor> |
newChildExecutorMap() |
protected boolean |
removeChildExecutor(Object key) |
protected boolean |
shouldCount(Runnable task)
Returns
true if and only if the specified task should
be counted to limit the global and per-channel memory consumption. |
beforeExecute, decreaseCounter, doUnorderedExecute, execute, getMaxChannelMemorySize, getMaxTotalMemorySize, getNotifyChannelFuturesOnShutdown, getObjectSizeEstimator, increaseCounter, remove, setMaxChannelMemorySize, setMaxTotalMemorySize, setNotifyChannelFuturesOnShutdown, setObjectSizeEstimator, shutdownNow, shutdownNow, terminated
afterExecute, allowCoreThreadTimeOut, allowsCoreThreadTimeOut, awaitTermination, finalize, getActiveCount, getCompletedTaskCount, getCorePoolSize, getKeepAliveTime, getLargestPoolSize, getMaximumPoolSize, getPoolSize, getQueue, getRejectedExecutionHandler, getTaskCount, getThreadFactory, isShutdown, isTerminated, isTerminating, prestartAllCoreThreads, prestartCoreThread, purge, setCorePoolSize, setKeepAliveTime, setMaximumPoolSize, setRejectedExecutionHandler, setThreadFactory, shutdown, toString
invokeAll, invokeAll, invokeAny, invokeAny, newTaskFor, newTaskFor, submit, submit, submit
protected final ConcurrentMap<Object,Executor> childExecutors
public OrderedMemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize)
corePoolSize
- the maximum number of active threadsmaxChannelMemorySize
- the maximum total size of the queued events per channel.
Specify 0
to disable.maxTotalMemorySize
- the maximum total size of the queued events for this pool
Specify 0
to disable.public OrderedMemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize, long keepAliveTime, TimeUnit unit)
corePoolSize
- the maximum number of active threadsmaxChannelMemorySize
- the maximum total size of the queued events per channel.
Specify 0
to disable.maxTotalMemorySize
- the maximum total size of the queued events for this pool
Specify 0
to disable.keepAliveTime
- the amount of time for an inactive thread to shut itself downunit
- the TimeUnit
of keepAliveTime
public OrderedMemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory)
corePoolSize
- the maximum number of active threadsmaxChannelMemorySize
- the maximum total size of the queued events per channel.
Specify 0
to disable.maxTotalMemorySize
- the maximum total size of the queued events for this pool
Specify 0
to disable.keepAliveTime
- the amount of time for an inactive thread to shut itself downunit
- the TimeUnit
of keepAliveTime
threadFactory
- the ThreadFactory
of this poolpublic OrderedMemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize, long keepAliveTime, TimeUnit unit, ObjectSizeEstimator objectSizeEstimator, ThreadFactory threadFactory)
corePoolSize
- the maximum number of active threadsmaxChannelMemorySize
- the maximum total size of the queued events per channel.
Specify 0
to disable.maxTotalMemorySize
- the maximum total size of the queued events for this pool
Specify 0
to disable.keepAliveTime
- the amount of time for an inactive thread to shut itself downunit
- the TimeUnit
of keepAliveTime
threadFactory
- the ThreadFactory
of this poolobjectSizeEstimator
- the ObjectSizeEstimator
of this poolprotected ConcurrentMap<Object,Executor> newChildExecutorMap()
protected Object getChildExecutorKey(ChannelEvent e)
protected boolean removeChildExecutor(Object key)
protected void doExecute(Runnable task)
doExecute
in class MemoryAwareThreadPoolExecutor
protected Executor getChildExecutor(ChannelEvent e)
protected boolean shouldCount(Runnable task)
MemoryAwareThreadPoolExecutor
true
if and only if the specified task
should
be counted to limit the global and per-channel memory consumption.
To override this method, you must call super.shouldCount()
to
make sure important tasks are not counted.shouldCount
in class MemoryAwareThreadPoolExecutor
Copyright © 2008-2014 The Netty Project. All Rights Reserved.