View Javadoc

1   /*
2    * Copyright 2015 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.handler.execution;
17  
18  import org.jboss.netty.channel.ChannelEvent;
19  import org.jboss.netty.channel.ChannelFuture;
20  import org.jboss.netty.channel.ChannelFutureListener;
21  import org.jboss.netty.util.ObjectSizeEstimator;
22  
23  import java.util.concurrent.RejectedExecutionException;
24  import java.util.concurrent.ThreadFactory;
25  import java.util.concurrent.TimeUnit;
26  
27  /**
28   * This is a <b>fair</b> alternative of {@link OrderedDownstreamThreadPoolExecutor} .
29   * <p> For more information about how the order is preserved
30   * see {@link FairOrderedMemoryAwareThreadPoolExecutor}</p>
31   */
32  public final class FairOrderedDownstreamThreadPoolExecutor extends FairOrderedMemoryAwareThreadPoolExecutor {
33  
34      /**
35       * Creates a new instance.
36       *
37       * @param corePoolSize the maximum number of active threads
38       * @noinspection unused
39       */
40      public FairOrderedDownstreamThreadPoolExecutor(int corePoolSize) {
41          super(corePoolSize, 0L, 0L);
42      }
43  
44      /**
45       * Creates a new instance.
46       *
47       * @param corePoolSize the maximum number of active threads
48       * @param keepAliveTime the amount of time for an inactive thread to shut itself down
49       * @param unit the {@link TimeUnit} of {@code keepAliveTime}
50       * @noinspection unused
51       */
52      public FairOrderedDownstreamThreadPoolExecutor(
53              int corePoolSize, long keepAliveTime, TimeUnit unit) {
54          super(corePoolSize, 0L, 0L, keepAliveTime, unit);
55      }
56  
57      /**
58       * Creates a new instance.
59       *
60       * @param corePoolSize the maximum number of active threads
61       * @param keepAliveTime the amount of time for an inactive thread to shut itself down
62       * @param unit the {@link TimeUnit} of {@code keepAliveTime}
63       * @param threadFactory the {@link ThreadFactory} of this pool
64       * @noinspection unused
65       */
66      public FairOrderedDownstreamThreadPoolExecutor(
67              int corePoolSize, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
68          super(corePoolSize, 0L, 0L,
69                keepAliveTime, unit, threadFactory);
70      }
71  
72      /**
73       * Return {@code null}
74       */
75      @Override
76      public ObjectSizeEstimator getObjectSizeEstimator() {
77          return null;
78      }
79  
80      /**
81       * Throws {@link UnsupportedOperationException} as there is not support for limit the memory size in this
82       * implementation
83       */
84      @Override
85      public void setObjectSizeEstimator(ObjectSizeEstimator objectSizeEstimator) {
86          throw new UnsupportedOperationException("Not supported by this implementation");
87      }
88  
89      /**
90       * Returns {@code 0L}
91       */
92      @Override
93      public long getMaxChannelMemorySize() {
94          return 0L;
95      }
96  
97      /**
98       * Throws {@link UnsupportedOperationException} as there is not support for limit the memory size in this
99       * implementation
100      */
101     @Override
102     public void setMaxChannelMemorySize(long maxChannelMemorySize) {
103         throw new UnsupportedOperationException("Not supported by this implementation");
104     }
105 
106     /**
107      * Returns {@code 0L}
108      */
109     @Override
110     public long getMaxTotalMemorySize() {
111         return 0L;
112     }
113 
114     /**
115      * Return {@code false} as we not need to count the memory in this implementation
116      */
117     @Override
118     protected boolean shouldCount(Runnable task) {
119         return false;
120     }
121 
122     @Override
123     public void execute(Runnable command) {
124         // check if the Runnable was of an unsupported type
125         if (command instanceof ChannelUpstreamEventRunnable) {
126             throw new RejectedExecutionException("command must be enclosed with an downstream event.");
127         }
128         doExecute(command);
129     }
130 
131     /**
132      * Executes the specified task concurrently while maintaining the event order.
133      */
134     @Override
135     protected void doExecute(Runnable task) {
136         if (task instanceof ChannelEventRunnable) {
137             ChannelEventRunnable eventRunnable = (ChannelEventRunnable) task;
138             ChannelEvent event = eventRunnable.getEvent();
139             EventTask newEventTask = new EventTask(eventRunnable);
140 
141             /*
142              * e.g. Three event
143              * "Channel A (Event A1)","Channel A (Event A2)","Channel A (Event A3)"
144              * are submitted in sequence, then key "Channel A" is refer to the
145              * value of "Event A3", and there is a linked list: "Event A3" ->
146              * "Event A2" -> "Event A1" ( linked by the field "next" in
147              * EventTask )
148              */
149 
150             final Object key = getKey(event);
151             EventTask previousEventTask = map.put(key, newEventTask);
152 
153             // try to setup "previousEventTask -> newEventTask"
154             // if success, then "newEventTask" will be invoke by
155             // "previousEventTask"
156             if (previousEventTask != null) {
157                 if (compareAndSetNext(previousEventTask, null, newEventTask)) {
158                     return;
159                 }
160             } else {
161                 // register a listener so that the ChildExecutor will get removed once the channel was closed
162                 event.getChannel().getCloseFuture().addListener(new ChannelFutureListener() {
163 
164                     public void operationComplete(ChannelFuture future) throws Exception {
165                         removeKey(key);
166                     }
167                 });
168             }
169 
170             // Two situation here:
171             // 1. "newEventTask" is the header of linked list
172             // 2. the "previousEventTask.next" is already END
173             // At these two situations above, just execute "newEventTask"
174             // immediately
175             doUnorderedExecute(newEventTask);
176         } else {
177             doUnorderedExecute(task);
178         }
179     }
180 }