1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.handler.execution;
17
18 import org.jboss.netty.channel.ChannelEvent;
19 import org.jboss.netty.channel.ChannelFuture;
20 import org.jboss.netty.channel.ChannelFutureListener;
21 import org.jboss.netty.util.ObjectSizeEstimator;
22
23 import java.util.concurrent.RejectedExecutionException;
24 import java.util.concurrent.ThreadFactory;
25 import java.util.concurrent.TimeUnit;
26
27
28
29
30
31
32 public final class FairOrderedDownstreamThreadPoolExecutor extends FairOrderedMemoryAwareThreadPoolExecutor {
33
34
35
36
37
38
39
40 public FairOrderedDownstreamThreadPoolExecutor(int corePoolSize) {
41 super(corePoolSize, 0L, 0L);
42 }
43
44
45
46
47
48
49
50
51
52 public FairOrderedDownstreamThreadPoolExecutor(
53 int corePoolSize, long keepAliveTime, TimeUnit unit) {
54 super(corePoolSize, 0L, 0L, keepAliveTime, unit);
55 }
56
57
58
59
60
61
62
63
64
65
66 public FairOrderedDownstreamThreadPoolExecutor(
67 int corePoolSize, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
68 super(corePoolSize, 0L, 0L,
69 keepAliveTime, unit, threadFactory);
70 }
71
72
73
74
75 @Override
76 public ObjectSizeEstimator getObjectSizeEstimator() {
77 return null;
78 }
79
80
81
82
83
84 @Override
85 public void setObjectSizeEstimator(ObjectSizeEstimator objectSizeEstimator) {
86 throw new UnsupportedOperationException("Not supported by this implementation");
87 }
88
89
90
91
92 @Override
93 public long getMaxChannelMemorySize() {
94 return 0L;
95 }
96
97
98
99
100
101 @Override
102 public void setMaxChannelMemorySize(long maxChannelMemorySize) {
103 throw new UnsupportedOperationException("Not supported by this implementation");
104 }
105
106
107
108
109 @Override
110 public long getMaxTotalMemorySize() {
111 return 0L;
112 }
113
114
115
116
117 @Override
118 protected boolean shouldCount(Runnable task) {
119 return false;
120 }
121
122 @Override
123 public void execute(Runnable command) {
124
125 if (command instanceof ChannelUpstreamEventRunnable) {
126 throw new RejectedExecutionException("command must be enclosed with an downstream event.");
127 }
128 doExecute(command);
129 }
130
131
132
133
134 @Override
135 protected void doExecute(Runnable task) {
136 if (task instanceof ChannelEventRunnable) {
137 ChannelEventRunnable eventRunnable = (ChannelEventRunnable) task;
138 ChannelEvent event = eventRunnable.getEvent();
139 EventTask newEventTask = new EventTask(eventRunnable);
140
141
142
143
144
145
146
147
148
149
150 final Object key = getKey(event);
151 EventTask previousEventTask = map.put(key, newEventTask);
152
153
154
155
156 if (previousEventTask != null) {
157 if (compareAndSetNext(previousEventTask, null, newEventTask)) {
158 return;
159 }
160 } else {
161
162 event.getChannel().getCloseFuture().addListener(new ChannelFutureListener() {
163
164 public void operationComplete(ChannelFuture future) throws Exception {
165 removeKey(key);
166 }
167 });
168 }
169
170
171
172
173
174
175 doUnorderedExecute(newEventTask);
176 } else {
177 doUnorderedExecute(task);
178 }
179 }
180 }