1 /*
2 * Copyright 2012 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License,
5 * version 2.0 (the "License"); you may not use this file except in compliance
6 * with the License. You may obtain a copy of the License at:
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations
14 * under the License.
15 */
16 package org.jboss.netty.util;
17
18 import java.util.Collections;
19 import java.util.IdentityHashMap;
20 import java.util.List;
21 import java.util.Set;
22 import java.util.concurrent.AbstractExecutorService;
23 import java.util.concurrent.Executor;
24 import java.util.concurrent.ExecutorService;
25 import java.util.concurrent.RejectedExecutionException;
26 import java.util.concurrent.TimeUnit;
27
28 import org.jboss.netty.channel.ChannelFactory;
29 import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
30
31
32 /**
33 * A delegating {@link ExecutorService} with its own termination management.
34 * <p>
35 * {@link VirtualExecutorService} is used when you want to inject an
36 * {@link ExecutorService} but you do not want to allow the explicit termination
37 * of threads on shutdown request. It is particularly useful when the
38 * {@link ExecutorService} to inject is shared by different components and
39 * the life cycle of the components depend on the termination of the injected
40 * {@link ExecutorService}.
41 *
42 * <pre>
43 * ExecutorService globalExecutor = ...;
44 * ExecutorService virtualExecutor = new {@link VirtualExecutorService}(globalExecutor);
45 *
46 * {@link ChannelFactory} factory =
47 * new {@link NioServerSocketChannelFactory}(virtualExecutor, virtualExecutor);
48 * ...
49 *
50 * // ChannelFactory.releaseExternalResources() shuts down the executor and
51 * // interrupts the I/O threads to terminate all I/O tasks and to release all
52 * // resources acquired by ChannelFactory.
53 * factory.releaseExternalResources();
54 *
55 * // Note that globalExecutor is not shut down because VirtualExecutorService
56 * // implements its own termination management. All threads which were acquired
57 * // by ChannelFactory via VirtualExecutorService are returned to the pool.
58 * assert !globalExecutor.isShutdown();
59 * </pre>
60 *
61 * <h3>The differences from an ordinary {@link ExecutorService}</h3>
62 *
63 * A shutdown request ({@link #shutdown()} or {@link #shutdownNow()}) does not
64 * shut down its parent {@link Executor} but simply sets its internal flag to
65 * reject further execution request.
66 * <p>
67 * {@link #shutdownNow()} interrupts only the thread which is executing the
68 * task executed via {@link VirtualExecutorService}.
69 * <p>
70 * {@link #awaitTermination(long, TimeUnit)} does not wait for real thread
71 * termination but wait until {@link VirtualExecutorService} is shut down and
72 * its active tasks are finished and the threads are returned to the parent
73 * {@link Executor}.
74 * @apiviz.landmark
75 */
76 public class VirtualExecutorService extends AbstractExecutorService {
77
78 private final Executor e;
79 private final ExecutorService s;
80 final Object startStopLock = new Object();
81 volatile boolean shutdown;
82 final Set<Thread> activeThreads = new MapBackedSet<Thread>(new IdentityHashMap<Thread, Boolean>());
83
84 /**
85 * Creates a new instance with the specified parent {@link Executor}.
86 */
87 public VirtualExecutorService(Executor parent) {
88 if (parent == null) {
89 throw new NullPointerException("parent");
90 }
91
92 if (parent instanceof ExecutorService) {
93 e = null;
94 s = (ExecutorService) parent;
95 } else {
96 e = parent;
97 s = null;
98 }
99 }
100
101 public boolean isShutdown() {
102 synchronized (startStopLock) {
103 return shutdown;
104 }
105 }
106
107 public boolean isTerminated() {
108 synchronized (startStopLock) {
109 return shutdown && activeThreads.isEmpty();
110 }
111 }
112
113 public void shutdown() {
114 synchronized (startStopLock) {
115 if (shutdown) {
116 return;
117 }
118 shutdown = true;
119 }
120 }
121
122 public List<Runnable> shutdownNow() {
123 synchronized (startStopLock) {
124 if (!isTerminated()) {
125 shutdown();
126 for (Thread t: activeThreads) {
127 t.interrupt();
128 }
129 }
130 }
131
132 return Collections.emptyList();
133 }
134
135 public boolean awaitTermination(long timeout, TimeUnit unit)
136 throws InterruptedException {
137 synchronized (startStopLock) {
138 if (!isTerminated()) {
139 startStopLock.wait(TimeUnit.MILLISECONDS.convert(timeout, unit));
140 }
141 return isTerminated();
142 }
143 }
144
145 public void execute(Runnable command) {
146 if (command == null) {
147 throw new NullPointerException("command");
148 }
149
150 if (shutdown) {
151 throw new RejectedExecutionException();
152 }
153
154 if (s != null) {
155 s.execute(new ChildExecutorRunnable(command));
156 } else {
157 e.execute(new ChildExecutorRunnable(command));
158 }
159 }
160
161 private class ChildExecutorRunnable implements Runnable {
162
163 private final Runnable runnable;
164
165 ChildExecutorRunnable(Runnable runnable) {
166 this.runnable = runnable;
167 }
168
169 public void run() {
170 Thread thread = Thread.currentThread();
171 synchronized (startStopLock) {
172 activeThreads.add(thread);
173 }
174 try {
175 runnable.run();
176 } finally {
177 synchronized (startStopLock) {
178 boolean removed = activeThreads.remove(thread);
179 assert removed;
180 if (isTerminated()) {
181 startStopLock.notifyAll();
182 }
183 }
184 }
185 }
186 }
187 }