1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.util;
17
18 import java.util.Collections;
19 import java.util.IdentityHashMap;
20 import java.util.List;
21 import java.util.Set;
22 import java.util.concurrent.AbstractExecutorService;
23 import java.util.concurrent.Executor;
24 import java.util.concurrent.ExecutorService;
25 import java.util.concurrent.RejectedExecutionException;
26 import java.util.concurrent.TimeUnit;
27
28 import org.jboss.netty.channel.ChannelFactory;
29 import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76 public class VirtualExecutorService extends AbstractExecutorService {
77
78 private final Executor e;
79 private final ExecutorService s;
80 final Object startStopLock = new Object();
81 volatile boolean shutdown;
82 Set<Thread> activeThreads = new MapBackedSet<Thread>(new IdentityHashMap<Thread, Boolean>());
83
84
85
86
87 public VirtualExecutorService(Executor parent) {
88 if (parent == null) {
89 throw new NullPointerException("parent");
90 }
91
92 if (parent instanceof ExecutorService) {
93 e = null;
94 s = (ExecutorService) parent;
95 } else {
96 e = parent;
97 s = null;
98 }
99 }
100
101 public boolean isShutdown() {
102 synchronized (startStopLock) {
103 return shutdown;
104 }
105 }
106
107 public boolean isTerminated() {
108 synchronized (startStopLock) {
109 return shutdown && activeThreads.isEmpty();
110 }
111 }
112
113 public void shutdown() {
114 synchronized (startStopLock) {
115 if (shutdown) {
116 return;
117 }
118 shutdown = true;
119 }
120 }
121
122 public List<Runnable> shutdownNow() {
123 synchronized (startStopLock) {
124 if (!isTerminated()) {
125 shutdown();
126 for (Thread t: activeThreads) {
127 t.interrupt();
128 }
129 }
130 }
131
132 return Collections.emptyList();
133 }
134
135 public boolean awaitTermination(long timeout, TimeUnit unit)
136 throws InterruptedException {
137 synchronized (startStopLock) {
138 while (!isTerminated()) {
139 startStopLock.wait(TimeUnit.MILLISECONDS.convert(timeout, unit));
140 }
141
142 return isTerminated();
143 }
144 }
145
146 public void execute(Runnable command) {
147 if (command == null) {
148 throw new NullPointerException("command");
149 }
150
151 if (shutdown) {
152 throw new RejectedExecutionException();
153 }
154
155 if (s != null) {
156 s.execute(new ChildExecutorRunnable(command));
157 } else {
158 e.execute(new ChildExecutorRunnable(command));
159 }
160 }
161
162 private class ChildExecutorRunnable implements Runnable {
163
164 private final Runnable runnable;
165
166 ChildExecutorRunnable(Runnable runnable) {
167 this.runnable = runnable;
168 }
169
170 public void run() {
171 Thread thread = Thread.currentThread();
172 synchronized (startStopLock) {
173 activeThreads.add(thread);
174 }
175 try {
176 runnable.run();
177 } finally {
178 synchronized (startStopLock) {
179 boolean removed = activeThreads.remove(thread);
180 assert removed;
181 if (isTerminated()) {
182 startStopLock.notifyAll();
183 }
184 }
185 }
186 }
187 }
188 }