1 /*
2 * Copyright 2015 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License, version 2.0 (the
5 * "License"); you may not use this file except in compliance with the License. You may obtain a
6 * copy of the License at:
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software distributed under the License
11 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12 * or implied. See the License for the specific language governing permissions and limitations under
13 * the License.
14 */
15 package org.jboss.netty.handler.execution;
16
17 import org.jboss.netty.channel.ChannelEvent;
18 import org.jboss.netty.channel.ChannelState;
19 import org.jboss.netty.channel.ChannelStateEvent;
20 import org.jboss.netty.util.ObjectSizeEstimator;
21 import org.jboss.netty.util.internal.ConcurrentIdentityWeakKeyHashMap;
22
23 import java.util.concurrent.ConcurrentMap;
24 import java.util.concurrent.ThreadFactory;
25 import java.util.concurrent.TimeUnit;
26 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
27
28 /**
29 * This is a <b>fair</b> alternative of {@link OrderedMemoryAwareThreadPoolExecutor} .
30 *
31 * <h3>Unfair of {@link OrderedMemoryAwareThreadPoolExecutor}</h3>
32 * The task executed in {@link OrderedMemoryAwareThreadPoolExecutor} is unfair in some situations.
33 * For example, let's say there is only one executor thread that handle the events from the two channels, and events
34 * are submitted in sequence:
35 * <pre>
36 * Channel A (Event A1) , Channel B (Event B), Channel A (Event A2) , ... , Channel A (Event An)
37 * </pre>
38 * Then the events maybe executed in this unfair order:
39 * <pre>
40 * ----------------------------------------> Timeline -------------------------------->
41 * Channel A (Event A1) , Channel A (Event A2) , ... , Channel A (Event An), Channel B (Event B)
42 * </pre>
43 * As we see above, Channel B (Event B) maybe executed unfairly late.
44 * Even more, if there are too much events come in Channel A, and one-by-one closely, then Channel B (Event B) would be
45 * waiting for a long while and become "hungry".
46 *
47 * <h3>Fair of FairOrderedMemoryAwareThreadPoolExecutor</h3>
48 * In the same case above ( one executor thread and two channels ) , this implement will guarantee execution order as:
49 * <pre>
50 * ----------------------------------------> Timeline -------------------------------->
51 * Channel A (Event A1) , Channel B (Event B), Channel A (Event A2) , ... , Channel A (Event An),
52 * </pre>
53 *
54 * <b>NOTE</b>: For convenience the case above use <b>one single executor thread</b>, but the fair mechanism is suitable
55 * for <b>multiple executor threads</b> situations.
56 */
57 public class FairOrderedMemoryAwareThreadPoolExecutor extends MemoryAwareThreadPoolExecutor {
58
59 // end sign
60 private final EventTask end = new EventTask(null);
61
62 private final AtomicReferenceFieldUpdater<EventTask, EventTask> fieldUpdater =
63 AtomicReferenceFieldUpdater.newUpdater(EventTask.class, EventTask.class, "next");
64
65 protected final ConcurrentMap<Object, EventTask> map = newMap();
66
67 /**
68 * Creates a new instance.
69 *
70 * @param corePoolSize the maximum number of active threads
71 * @param maxChannelMemorySize the maximum total size of the queued events per channel. Specify {@code 0} to
72 * disable.
73 * @param maxTotalMemorySize the maximum total size of the queued events for this pool Specify {@code 0} to
74 * disable.
75 * @noinspection unused
76 */
77 public FairOrderedMemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize,
78 long maxTotalMemorySize) {
79 super(corePoolSize, maxChannelMemorySize, maxTotalMemorySize);
80 }
81
82 /**
83 * Creates a new instance.
84 *
85 * @param corePoolSize the maximum number of active threads
86 * @param maxChannelMemorySize the maximum total size of the queued events per channel. Specify {@code 0} to
87 * disable.
88 * @param maxTotalMemorySize the maximum total size of the queued events for this pool Specify {@code 0} to
89 * disable.
90 * @param keepAliveTime the amount of time for an inactive thread to shut itself down
91 * @param unit the {@link TimeUnit} of {@code keepAliveTime}
92 * @noinspection unused
93 */
94 public FairOrderedMemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize,
95 long maxTotalMemorySize, long keepAliveTime, TimeUnit unit) {
96 super(corePoolSize, maxChannelMemorySize, maxTotalMemorySize, keepAliveTime, unit);
97 }
98
99 /**
100 * Creates a new instance.
101 *
102 * @param corePoolSize the maximum number of active threads
103 * @param maxChannelMemorySize the maximum total size of the queued events per channel. Specify {@code 0} to
104 * disable.
105 * @param maxTotalMemorySize the maximum total size of the queued events for this pool Specify {@code 0} to
106 * disable.
107 * @param keepAliveTime the amount of time for an inactive thread to shut itself down
108 * @param unit the {@link TimeUnit} of {@code keepAliveTime}
109 * @param threadFactory the {@link ThreadFactory} of this pool
110 * @noinspection unused
111 */
112 public FairOrderedMemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize,
113 long maxTotalMemorySize, long keepAliveTime, TimeUnit unit,
114 ThreadFactory threadFactory) {
115 super(corePoolSize, maxChannelMemorySize, maxTotalMemorySize, keepAliveTime, unit,
116 threadFactory);
117 }
118
119 /**
120 * Creates a new instance.
121 *
122 * @param corePoolSize the maximum number of active threads
123 * @param maxChannelMemorySize the maximum total size of the queued events per channel. Specify {@code 0} to
124 * disable.
125 * @param maxTotalMemorySize the maximum total size of the queued events for this pool Specify {@code 0} to
126 * disable.
127 * @param keepAliveTime the amount of time for an inactive thread to shut itself down
128 * @param unit the {@link TimeUnit} of {@code keepAliveTime}
129 * @param threadFactory the {@link ThreadFactory} of this pool
130 * @param objectSizeEstimator the {@link ObjectSizeEstimator} of this pool
131 * @noinspection unused
132 */
133 public FairOrderedMemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize,
134 long maxTotalMemorySize, long keepAliveTime, TimeUnit unit,
135 ObjectSizeEstimator objectSizeEstimator,
136 ThreadFactory threadFactory) {
137 super(corePoolSize, maxChannelMemorySize, maxTotalMemorySize, keepAliveTime, unit,
138 objectSizeEstimator, threadFactory);
139 }
140
141 /** @noinspection WeakerAccess*/
142 protected ConcurrentMap<Object, EventTask> newMap() {
143 return new ConcurrentIdentityWeakKeyHashMap<Object, EventTask>();
144 }
145
146 /**
147 * Executes the specified task concurrently while maintaining the event order.
148 */
149 @Override
150 protected void doExecute(Runnable task) {
151 if (task instanceof ChannelEventRunnable) {
152 ChannelEventRunnable eventRunnable = (ChannelEventRunnable) task;
153 EventTask newEventTask = new EventTask(eventRunnable);
154 /*
155 * e.g. Three event "Channel A (Event A1)","Channel A (Event A2)","Channel A (Event A3)" are
156 * submitted in sequence, then key "Channel A" is refer to the value of "Event A3", and there
157 * is a linked list: "Event A3" -> "Event A2" -> "Event A1" ( linked by the field "next" in
158 * EventTask )
159 */
160 Object key = getKey(eventRunnable.getEvent());
161 EventTask previousEventTask = map.put(key, newEventTask);
162 // Remove the entry when the channel closes.
163 removeIfClosed(eventRunnable, key);
164 // try to setup "previousEventTask -> newEventTask"
165 // if success, then "newEventTask" will be invoke by "previousEventTask"
166 if (previousEventTask != null) {
167 if (compareAndSetNext(previousEventTask, null, newEventTask)) {
168 return;
169 }
170 }
171 // Two situation here:
172 // 1. "newEventTask" is the header of linked list
173 // 2. the "previousEventTask.next" is already END
174 // At these two situations above, just execute "newEventTask" immediately
175 doUnorderedExecute(newEventTask);
176 } else {
177 doUnorderedExecute(task);
178 }
179 }
180
181 private void removeIfClosed(ChannelEventRunnable eventRunnable, Object key) {
182 ChannelEvent event = eventRunnable.getEvent();
183 if (event instanceof ChannelStateEvent) {
184 ChannelStateEvent se = (ChannelStateEvent) event;
185 if (se.getState() == ChannelState.OPEN && !event.getChannel().isOpen()) {
186 removeKey(key);
187 }
188 }
189 }
190
191 /**
192 * call removeKey(Object key) when the life cycle of the key ends, such as when the channel is closed
193 */
194 protected boolean removeKey(Object key) {
195 return map.remove(key) != null;
196 }
197
198 protected Object getKey(ChannelEvent e) {
199 return e.getChannel();
200 }
201
202 @Override
203 protected boolean shouldCount(Runnable task) {
204 return !(task instanceof EventTask) && super.shouldCount(task);
205 }
206
207 protected final boolean compareAndSetNext(EventTask eventTask, EventTask expect, EventTask update) {
208 // because the "next" field is modified by method "doExecute()" and
209 // method "EventTask.run()", so use CAS for thread-safe
210 return fieldUpdater.compareAndSet(eventTask, expect, update);
211 }
212
213 protected final class EventTask implements Runnable {
214 /** @noinspection unused*/
215 volatile EventTask next;
216 private final ChannelEventRunnable runnable;
217
218 EventTask(ChannelEventRunnable runnable) {
219 this.runnable = runnable;
220 }
221
222 public void run() {
223 try {
224 runnable.run();
225 } finally {
226 // if "next" is not null, then trigger "next" to execute;
227 // else if "next" is null, set "next" to END, means end this linked list
228 if (!compareAndSetNext(this, null, end)) {
229 doUnorderedExecute(next);
230 }
231 }
232 }
233 }
234 }