1 /*
2 * Copyright 2012 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License,
5 * version 2.0 (the "License"); you may not use this file except in compliance
6 * with the License. You may obtain a copy of the License at:
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations
14 * under the License.
15 */
16 package org.jboss.netty.handler.execution;
17
18 import java.util.concurrent.Executor;
19 import java.util.concurrent.Executors;
20 import java.util.concurrent.RejectedExecutionException;
21 import java.util.concurrent.ThreadFactory;
22 import java.util.concurrent.TimeUnit;
23
24 import org.jboss.netty.channel.Channel;
25 import org.jboss.netty.channel.ChannelEvent;
26 import org.jboss.netty.channel.ChannelFuture;
27 import org.jboss.netty.channel.ChannelFutureListener;
28 import org.jboss.netty.util.ObjectSizeEstimator;
29
30 /**
31 * {@link Executor} which should be used for downstream {@link ChannelEvent}'s. This implementation
32 * will take care of preserve the order of the events in a {@link Channel}. If you don't need to
33 * preserve the order just use one of the {@link Executor} implementations provided by the static
34 * methods of {@link Executors}.
35 * <br>
36 * <br>
37 * For more informations about how the order is preserved see {@link OrderedMemoryAwareThreadPoolExecutor}
38 */
39 public final class OrderedDownstreamThreadPoolExecutor extends OrderedMemoryAwareThreadPoolExecutor {
40
41 /**
42 * Creates a new instance.
43 *
44 * @param corePoolSize the maximum number of active threads
45 */
46 public OrderedDownstreamThreadPoolExecutor(int corePoolSize) {
47 super(corePoolSize, 0L, 0L);
48 }
49
50 /**
51 * Creates a new instance.
52 *
53 * @param corePoolSize the maximum number of active threads
54 * @param keepAliveTime the amount of time for an inactive thread to shut itself down
55 * @param unit the {@link TimeUnit} of {@code keepAliveTime}
56 */
57 public OrderedDownstreamThreadPoolExecutor(
58 int corePoolSize, long keepAliveTime, TimeUnit unit) {
59 super(corePoolSize, 0L, 0L, keepAliveTime, unit);
60 }
61
62 /**
63 * Creates a new instance.
64 *
65 * @param corePoolSize the maximum number of active threads
66 * @param keepAliveTime the amount of time for an inactive thread to shut itself down
67 * @param unit the {@link TimeUnit} of {@code keepAliveTime}
68 * @param threadFactory the {@link ThreadFactory} of this pool
69 */
70 public OrderedDownstreamThreadPoolExecutor(
71 int corePoolSize, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
72 super(corePoolSize, 0L, 0L,
73 keepAliveTime, unit, threadFactory);
74 }
75
76
77 /**
78 * Return <code>null</code>
79 */
80 @Override
81 public ObjectSizeEstimator getObjectSizeEstimator() {
82 return null;
83 }
84
85 /**
86 * Throws {@link UnsupportedOperationException} as there is not support for limit the memory
87 * size in this implementation
88 */
89 @Override
90 public void setObjectSizeEstimator(ObjectSizeEstimator objectSizeEstimator) {
91 throw new UnsupportedOperationException("Not supported by this implementation");
92 }
93
94 /**
95 * Returns <code>0L</code>
96 */
97 @Override
98 public long getMaxChannelMemorySize() {
99 return 0L;
100 }
101
102 /**
103 * Throws {@link UnsupportedOperationException} as there is not support for limit the memory
104 * size in this implementation
105 */
106 @Override
107 public void setMaxChannelMemorySize(long maxChannelMemorySize) {
108 throw new UnsupportedOperationException("Not supported by this implementation");
109 }
110
111 /**
112 * Returns <code>0L</code>
113 */
114 @Override
115 public long getMaxTotalMemorySize() {
116 return 0L;
117 }
118
119 /**
120 * Throws {@link UnsupportedOperationException} as there is not support for limit the memory
121 * size in this implementation
122 */
123 @Override
124 public void setMaxTotalMemorySize(long maxTotalMemorySize) {
125 throw new UnsupportedOperationException("Not supported by this implementation");
126 }
127
128 /**
129 * Return <code>false</code> as we not need to cound the memory in this implementation
130 */
131 @Override
132 protected boolean shouldCount(Runnable task) {
133 return false;
134 }
135
136 @Override
137 public void execute(Runnable command) {
138
139 // check if the Runnable was of an unsupported type
140 if (command instanceof ChannelUpstreamEventRunnable) {
141 throw new RejectedExecutionException("command must be enclosed with an downstream event.");
142 }
143 doExecute(command);
144 }
145
146 @Override
147 protected Executor getChildExecutor(ChannelEvent e) {
148 final Object key = getChildExecutorKey(e);
149 Executor executor = childExecutors.get(key);
150 if (executor == null) {
151 executor = new ChildExecutor();
152 Executor oldExecutor = childExecutors.putIfAbsent(key, executor);
153 if (oldExecutor != null) {
154 executor = oldExecutor;
155 } else {
156
157 // register a listener so that the ChildExecutor will get removed once the channel was closed
158 e.getChannel().getCloseFuture().addListener(new ChannelFutureListener() {
159
160 public void operationComplete(ChannelFuture future) throws Exception {
161 removeChildExecutor(key);
162 }
163 });
164 }
165 }
166
167 return executor;
168 }
169
170
171 }