View Javadoc

1   /*
2    * Copyright 2015 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License, version 2.0 (the
5    * "License"); you may not use this file except in compliance with the License. You may obtain a
6    * copy of the License at:
7    *
8    * http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software distributed under the License
11   * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12   * or implied. See the License for the specific language governing permissions and limitations under
13   * the License.
14   */
15  package org.jboss.netty.handler.execution;
16  
17  import org.jboss.netty.channel.ChannelEvent;
18  import org.jboss.netty.channel.ChannelState;
19  import org.jboss.netty.channel.ChannelStateEvent;
20  import org.jboss.netty.util.ObjectSizeEstimator;
21  import org.jboss.netty.util.internal.ConcurrentIdentityWeakKeyHashMap;
22  
23  import java.util.concurrent.ConcurrentMap;
24  import java.util.concurrent.ThreadFactory;
25  import java.util.concurrent.TimeUnit;
26  import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
27  
28  /**
29   * This is a <b>fair</b> alternative of {@link OrderedMemoryAwareThreadPoolExecutor} .
30   *
31   * <h3>Unfair of {@link OrderedMemoryAwareThreadPoolExecutor}</h3>
32   * The task executed in {@link OrderedMemoryAwareThreadPoolExecutor} is unfair in some situations.
33   * For example, let's say there is only one executor thread that handle the events from the two channels, and events
34   * are submitted in sequence:
35   * <pre>
36   *           Channel A (Event A1) , Channel B (Event B), Channel A (Event A2) , ... , Channel A (Event An)
37   * </pre>
38   * Then the events maybe executed in this unfair order:
39   * <pre>
40   *          ----------------------------------------&gt; Timeline --------------------------------&gt;
41   *           Channel A (Event A1) , Channel A (Event A2) , ... , Channel A (Event An), Channel B (Event B)
42   * </pre>
43   * As we see above, Channel B (Event B) maybe executed unfairly late.
44   * Even more, if there are too much events come in Channel A, and one-by-one closely, then Channel B (Event B) would be
45   * waiting for a long while and become "hungry".
46   *
47   * <h3>Fair of FairOrderedMemoryAwareThreadPoolExecutor</h3>
48   * In the same case above ( one executor thread and two channels ) , this implement will guarantee execution order as:
49   * <pre>
50   *          ----------------------------------------&gt; Timeline --------------------------------&gt;
51   *           Channel A (Event A1) , Channel B (Event B), Channel A (Event A2) , ... , Channel A (Event An),
52   * </pre>
53   *
54   * <b>NOTE</b>: For convenience the case above use <b>one single executor thread</b>, but the fair mechanism is suitable
55   * for <b>multiple executor threads</b> situations.
56   */
57  public class FairOrderedMemoryAwareThreadPoolExecutor extends MemoryAwareThreadPoolExecutor {
58  
59      // end sign
60      private final EventTask end = new EventTask(null);
61  
62      private final AtomicReferenceFieldUpdater<EventTask, EventTask> fieldUpdater =
63              AtomicReferenceFieldUpdater.newUpdater(EventTask.class, EventTask.class, "next");
64  
65      protected final ConcurrentMap<Object, EventTask> map = newMap();
66  
67      /**
68       * Creates a new instance.
69       *
70       * @param corePoolSize the maximum number of active threads
71       * @param maxChannelMemorySize the maximum total size of the queued events per channel. Specify {@code 0} to
72       * disable.
73       * @param maxTotalMemorySize the maximum total size of the queued events for this pool Specify {@code 0} to
74       * disable.
75       * @noinspection unused
76       */
77      public FairOrderedMemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize,
78                                                      long maxTotalMemorySize) {
79          super(corePoolSize, maxChannelMemorySize, maxTotalMemorySize);
80      }
81  
82      /**
83       * Creates a new instance.
84       *
85       * @param corePoolSize the maximum number of active threads
86       * @param maxChannelMemorySize the maximum total size of the queued events per channel. Specify {@code 0} to
87       * disable.
88       * @param maxTotalMemorySize the maximum total size of the queued events for this pool Specify {@code 0} to
89       * disable.
90       * @param keepAliveTime the amount of time for an inactive thread to shut itself down
91       * @param unit the {@link TimeUnit} of {@code keepAliveTime}
92       * @noinspection unused
93       */
94      public FairOrderedMemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize,
95                                                      long maxTotalMemorySize, long keepAliveTime, TimeUnit unit) {
96          super(corePoolSize, maxChannelMemorySize, maxTotalMemorySize, keepAliveTime, unit);
97      }
98  
99      /**
100      * Creates a new instance.
101      *
102      * @param corePoolSize the maximum number of active threads
103      * @param maxChannelMemorySize the maximum total size of the queued events per channel. Specify {@code 0} to
104      * disable.
105      * @param maxTotalMemorySize the maximum total size of the queued events for this pool Specify {@code 0} to
106      * disable.
107      * @param keepAliveTime the amount of time for an inactive thread to shut itself down
108      * @param unit the {@link TimeUnit} of {@code keepAliveTime}
109      * @param threadFactory the {@link ThreadFactory} of this pool
110      * @noinspection unused
111      */
112     public FairOrderedMemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize,
113                                                     long maxTotalMemorySize, long keepAliveTime, TimeUnit unit,
114                                                     ThreadFactory threadFactory) {
115         super(corePoolSize, maxChannelMemorySize, maxTotalMemorySize, keepAliveTime, unit,
116               threadFactory);
117     }
118 
119     /**
120      * Creates a new instance.
121      *
122      * @param corePoolSize the maximum number of active threads
123      * @param maxChannelMemorySize the maximum total size of the queued events per channel. Specify {@code 0} to
124      * disable.
125      * @param maxTotalMemorySize the maximum total size of the queued events for this pool Specify {@code 0} to
126      * disable.
127      * @param keepAliveTime the amount of time for an inactive thread to shut itself down
128      * @param unit the {@link TimeUnit} of {@code keepAliveTime}
129      * @param threadFactory the {@link ThreadFactory} of this pool
130      * @param objectSizeEstimator the {@link ObjectSizeEstimator} of this pool
131      * @noinspection unused
132      */
133     public FairOrderedMemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize,
134                                                     long maxTotalMemorySize, long keepAliveTime, TimeUnit unit,
135                                                     ObjectSizeEstimator objectSizeEstimator,
136                                                     ThreadFactory threadFactory) {
137         super(corePoolSize, maxChannelMemorySize, maxTotalMemorySize, keepAliveTime, unit,
138               objectSizeEstimator, threadFactory);
139     }
140 
141     /** @noinspection WeakerAccess*/
142     protected ConcurrentMap<Object, EventTask> newMap() {
143         return new ConcurrentIdentityWeakKeyHashMap<Object, EventTask>();
144     }
145 
146     /**
147      * Executes the specified task concurrently while maintaining the event order.
148      */
149     @Override
150     protected void doExecute(Runnable task) {
151         if (task instanceof ChannelEventRunnable) {
152             ChannelEventRunnable eventRunnable = (ChannelEventRunnable) task;
153             EventTask newEventTask = new EventTask(eventRunnable);
154             /*
155               * e.g. Three event "Channel A (Event A1)","Channel A (Event A2)","Channel A (Event A3)" are
156               * submitted in sequence, then key "Channel A" is refer to the value of "Event A3", and there
157               * is a linked list: "Event A3" -> "Event A2" -> "Event A1" ( linked by the field "next" in
158               * EventTask )
159               */
160             Object key = getKey(eventRunnable.getEvent());
161             EventTask previousEventTask = map.put(key, newEventTask);
162             // Remove the entry when the channel closes.
163             removeIfClosed(eventRunnable, key);
164             // try to setup "previousEventTask -> newEventTask"
165             // if success, then "newEventTask" will be invoke by "previousEventTask"
166             if (previousEventTask != null) {
167                 if (compareAndSetNext(previousEventTask, null, newEventTask)) {
168                     return;
169                 }
170             }
171             // Two situation here:
172             // 1. "newEventTask" is the header of linked list
173             // 2. the "previousEventTask.next" is already END
174             // At these two situations above, just execute "newEventTask" immediately
175             doUnorderedExecute(newEventTask);
176         } else {
177             doUnorderedExecute(task);
178         }
179     }
180 
181     private void removeIfClosed(ChannelEventRunnable eventRunnable, Object key) {
182         ChannelEvent event = eventRunnable.getEvent();
183         if (event instanceof ChannelStateEvent) {
184             ChannelStateEvent se = (ChannelStateEvent) event;
185             if (se.getState() == ChannelState.OPEN && !event.getChannel().isOpen()) {
186                 removeKey(key);
187             }
188         }
189     }
190 
191     /**
192      * call removeKey(Object key) when the life cycle of the key ends, such as when the channel is closed
193      */
194     protected boolean removeKey(Object key) {
195         return map.remove(key) != null;
196     }
197 
198     protected Object getKey(ChannelEvent e) {
199         return e.getChannel();
200     }
201 
202     @Override
203     protected boolean shouldCount(Runnable task) {
204         return !(task instanceof EventTask) && super.shouldCount(task);
205     }
206 
207     protected final boolean compareAndSetNext(EventTask eventTask, EventTask expect, EventTask update) {
208         // because the "next" field is modified by method "doExecute()" and
209         // method "EventTask.run()", so use CAS for thread-safe
210         return fieldUpdater.compareAndSet(eventTask, expect, update);
211     }
212 
213     protected final class EventTask implements Runnable {
214         /** @noinspection unused*/
215         volatile EventTask next;
216         private final ChannelEventRunnable runnable;
217 
218         EventTask(ChannelEventRunnable runnable) {
219             this.runnable = runnable;
220         }
221 
222         public void run() {
223             try {
224                 runnable.run();
225             } finally {
226                 // if "next" is not null, then trigger "next" to execute;
227                 // else if "next" is null, set "next" to END, means end this linked list
228                 if (!compareAndSetNext(this, null, end)) {
229                     doUnorderedExecute(next);
230                 }
231             }
232         }
233     }
234 }