1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.util;
17
18 import java.util.Collections;
19 import java.util.IdentityHashMap;
20 import java.util.List;
21 import java.util.Set;
22 import java.util.concurrent.AbstractExecutorService;
23 import java.util.concurrent.Executor;
24 import java.util.concurrent.ExecutorService;
25 import java.util.concurrent.RejectedExecutionException;
26 import java.util.concurrent.TimeUnit;
27
28 import org.jboss.netty.channel.ChannelFactory;
29 import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76 public class VirtualExecutorService extends AbstractExecutorService {
77
78 private final Executor e;
79 private final ExecutorService s;
80 final Object startStopLock = new Object();
81 volatile boolean shutdown;
82 final Set<Thread> activeThreads = new MapBackedSet<Thread>(new IdentityHashMap<Thread, Boolean>());
83
84
85
86
87 public VirtualExecutorService(Executor parent) {
88 if (parent == null) {
89 throw new NullPointerException("parent");
90 }
91
92 if (parent instanceof ExecutorService) {
93 e = null;
94 s = (ExecutorService) parent;
95 } else {
96 e = parent;
97 s = null;
98 }
99 }
100
101 public boolean isShutdown() {
102 synchronized (startStopLock) {
103 return shutdown;
104 }
105 }
106
107 public boolean isTerminated() {
108 synchronized (startStopLock) {
109 return shutdown && activeThreads.isEmpty();
110 }
111 }
112
113 public void shutdown() {
114 synchronized (startStopLock) {
115 if (shutdown) {
116 return;
117 }
118 shutdown = true;
119 }
120 }
121
122 public List<Runnable> shutdownNow() {
123 synchronized (startStopLock) {
124 if (!isTerminated()) {
125 shutdown();
126 for (Thread t: activeThreads) {
127 t.interrupt();
128 }
129 }
130 }
131
132 return Collections.emptyList();
133 }
134
135 public boolean awaitTermination(long timeout, TimeUnit unit)
136 throws InterruptedException {
137 synchronized (startStopLock) {
138 if (!isTerminated()) {
139 startStopLock.wait(TimeUnit.MILLISECONDS.convert(timeout, unit));
140 }
141 return isTerminated();
142 }
143 }
144
145 public void execute(Runnable command) {
146 if (command == null) {
147 throw new NullPointerException("command");
148 }
149
150 if (shutdown) {
151 throw new RejectedExecutionException();
152 }
153
154 if (s != null) {
155 s.execute(new ChildExecutorRunnable(command));
156 } else {
157 e.execute(new ChildExecutorRunnable(command));
158 }
159 }
160
161 private class ChildExecutorRunnable implements Runnable {
162
163 private final Runnable runnable;
164
165 ChildExecutorRunnable(Runnable runnable) {
166 this.runnable = runnable;
167 }
168
169 public void run() {
170 Thread thread = Thread.currentThread();
171 synchronized (startStopLock) {
172 activeThreads.add(thread);
173 }
174 try {
175 runnable.run();
176 } finally {
177 synchronized (startStopLock) {
178 boolean removed = activeThreads.remove(thread);
179 assert removed;
180 if (isTerminated()) {
181 startStopLock.notifyAll();
182 }
183 }
184 }
185 }
186 }
187 }