1 /*
2 * Copyright 2015 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License,
5 * version 2.0 (the "License"); you may not use this file except in compliance
6 * with the License. You may obtain a copy of the License at:
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations
14 * under the License.
15 */
16 package org.jboss.netty.handler.execution;
17
18 import org.jboss.netty.channel.ChannelEvent;
19 import org.jboss.netty.channel.ChannelFuture;
20 import org.jboss.netty.channel.ChannelFutureListener;
21 import org.jboss.netty.util.ObjectSizeEstimator;
22
23 import java.util.concurrent.RejectedExecutionException;
24 import java.util.concurrent.ThreadFactory;
25 import java.util.concurrent.TimeUnit;
26
27 /**
28 * This is a <b>fair</b> alternative of {@link OrderedDownstreamThreadPoolExecutor} .
29 * <p> For more information about how the order is preserved
30 * see {@link FairOrderedMemoryAwareThreadPoolExecutor}</p>
31 */
32 public final class FairOrderedDownstreamThreadPoolExecutor extends FairOrderedMemoryAwareThreadPoolExecutor {
33
34 /**
35 * Creates a new instance.
36 *
37 * @param corePoolSize the maximum number of active threads
38 * @noinspection unused
39 */
40 public FairOrderedDownstreamThreadPoolExecutor(int corePoolSize) {
41 super(corePoolSize, 0L, 0L);
42 }
43
44 /**
45 * Creates a new instance.
46 *
47 * @param corePoolSize the maximum number of active threads
48 * @param keepAliveTime the amount of time for an inactive thread to shut itself down
49 * @param unit the {@link TimeUnit} of {@code keepAliveTime}
50 * @noinspection unused
51 */
52 public FairOrderedDownstreamThreadPoolExecutor(
53 int corePoolSize, long keepAliveTime, TimeUnit unit) {
54 super(corePoolSize, 0L, 0L, keepAliveTime, unit);
55 }
56
57 /**
58 * Creates a new instance.
59 *
60 * @param corePoolSize the maximum number of active threads
61 * @param keepAliveTime the amount of time for an inactive thread to shut itself down
62 * @param unit the {@link TimeUnit} of {@code keepAliveTime}
63 * @param threadFactory the {@link ThreadFactory} of this pool
64 * @noinspection unused
65 */
66 public FairOrderedDownstreamThreadPoolExecutor(
67 int corePoolSize, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
68 super(corePoolSize, 0L, 0L,
69 keepAliveTime, unit, threadFactory);
70 }
71
72 /**
73 * Return {@code null}
74 */
75 @Override
76 public ObjectSizeEstimator getObjectSizeEstimator() {
77 return null;
78 }
79
80 /**
81 * Throws {@link UnsupportedOperationException} as there is not support for limit the memory size in this
82 * implementation
83 */
84 @Override
85 public void setObjectSizeEstimator(ObjectSizeEstimator objectSizeEstimator) {
86 throw new UnsupportedOperationException("Not supported by this implementation");
87 }
88
89 /**
90 * Returns {@code 0L}
91 */
92 @Override
93 public long getMaxChannelMemorySize() {
94 return 0L;
95 }
96
97 /**
98 * Throws {@link UnsupportedOperationException} as there is not support for limit the memory size in this
99 * implementation
100 */
101 @Override
102 public void setMaxChannelMemorySize(long maxChannelMemorySize) {
103 throw new UnsupportedOperationException("Not supported by this implementation");
104 }
105
106 /**
107 * Returns {@code 0L}
108 */
109 @Override
110 public long getMaxTotalMemorySize() {
111 return 0L;
112 }
113
114 /**
115 * Return {@code false} as we not need to count the memory in this implementation
116 */
117 @Override
118 protected boolean shouldCount(Runnable task) {
119 return false;
120 }
121
122 @Override
123 public void execute(Runnable command) {
124 // check if the Runnable was of an unsupported type
125 if (command instanceof ChannelUpstreamEventRunnable) {
126 throw new RejectedExecutionException("command must be enclosed with an downstream event.");
127 }
128 doExecute(command);
129 }
130
131 /**
132 * Executes the specified task concurrently while maintaining the event order.
133 */
134 @Override
135 protected void doExecute(Runnable task) {
136 if (task instanceof ChannelEventRunnable) {
137 ChannelEventRunnable eventRunnable = (ChannelEventRunnable) task;
138 ChannelEvent event = eventRunnable.getEvent();
139 EventTask newEventTask = new EventTask(eventRunnable);
140
141 /*
142 * e.g. Three event
143 * "Channel A (Event A1)","Channel A (Event A2)","Channel A (Event A3)"
144 * are submitted in sequence, then key "Channel A" is refer to the
145 * value of "Event A3", and there is a linked list: "Event A3" ->
146 * "Event A2" -> "Event A1" ( linked by the field "next" in
147 * EventTask )
148 */
149
150 final Object key = getKey(event);
151 EventTask previousEventTask = map.put(key, newEventTask);
152
153 // try to setup "previousEventTask -> newEventTask"
154 // if success, then "newEventTask" will be invoke by
155 // "previousEventTask"
156 if (previousEventTask != null) {
157 if (compareAndSetNext(previousEventTask, null, newEventTask)) {
158 return;
159 }
160 } else {
161 // register a listener so that the ChildExecutor will get removed once the channel was closed
162 event.getChannel().getCloseFuture().addListener(new ChannelFutureListener() {
163
164 public void operationComplete(ChannelFuture future) throws Exception {
165 removeKey(key);
166 }
167 });
168 }
169
170 // Two situation here:
171 // 1. "newEventTask" is the header of linked list
172 // 2. the "previousEventTask.next" is already END
173 // At these two situations above, just execute "newEventTask"
174 // immediately
175 doUnorderedExecute(newEventTask);
176 } else {
177 doUnorderedExecute(task);
178 }
179 }
180 }