View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.util;
17  
18  import java.util.Collections;
19  import java.util.IdentityHashMap;
20  import java.util.List;
21  import java.util.Set;
22  import java.util.concurrent.AbstractExecutorService;
23  import java.util.concurrent.Executor;
24  import java.util.concurrent.ExecutorService;
25  import java.util.concurrent.RejectedExecutionException;
26  import java.util.concurrent.TimeUnit;
27  
28  import org.jboss.netty.channel.ChannelFactory;
29  import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
30  
31  
32  /**
33   * A delegating {@link ExecutorService} with its own termination management.
34   * <p>
35   * {@link VirtualExecutorService} is used when you want to inject an
36   * {@link ExecutorService} but you do not want to allow the explicit termination
37   * of threads on shutdown request.  It is particularly useful when the
38   * {@link ExecutorService} to inject is shared by different components and
39   * the life cycle of the components depend on the termination of the injected
40   * {@link ExecutorService}.
41   *
42   * <pre>
43   * ExecutorService globalExecutor = ...;
44   * ExecutorService virtualExecutor = new {@link VirtualExecutorService}(globalExecutor);
45   *
46   * {@link ChannelFactory} factory =
47   *         new {@link NioServerSocketChannelFactory}(virtualExecutor, virtualExecutor);
48   * ...
49   *
50   * // ChannelFactory.releaseExternalResources() shuts down the executor and
51   * // interrupts the I/O threads to terminate all I/O tasks and to release all
52   * // resources acquired by ChannelFactory.
53   * factory.releaseExternalResources();
54   *
55   * // Note that globalExecutor is not shut down because VirtualExecutorService
56   * // implements its own termination management. All threads which were acquired
57   * // by ChannelFactory via VirtualExecutorService are returned to the pool.
58   * assert !globalExecutor.isShutdown();
59   * </pre>
60   *
61   * <h3>The differences from an ordinary {@link ExecutorService}</h3>
62   *
63   * A shutdown request ({@link #shutdown()} or {@link #shutdownNow()}) does not
64   * shut down its parent {@link Executor} but simply sets its internal flag to
65   * reject further execution request.
66   * <p>
67   * {@link #shutdownNow()} interrupts only the thread which is executing the
68   * task executed via {@link VirtualExecutorService}.
69   * <p>
70   * {@link #awaitTermination(long, TimeUnit)} does not wait for real thread
71   * termination but wait until {@link VirtualExecutorService} is shut down and
72   * its active tasks are finished and the threads are returned to the parent
73   * {@link Executor}.
74   * @apiviz.landmark
75   */
76  public class VirtualExecutorService extends AbstractExecutorService {
77  
78      private final Executor e;
79      private final ExecutorService s;
80      final Object startStopLock = new Object();
81      volatile boolean shutdown;
82      final Set<Thread> activeThreads = new MapBackedSet<Thread>(new IdentityHashMap<Thread, Boolean>());
83  
84      /**
85       * Creates a new instance with the specified parent {@link Executor}.
86       */
87      public VirtualExecutorService(Executor parent) {
88          if (parent == null) {
89              throw new NullPointerException("parent");
90          }
91  
92          if (parent instanceof ExecutorService) {
93              e = null;
94              s = (ExecutorService) parent;
95          } else {
96              e = parent;
97              s = null;
98          }
99      }
100 
101     public boolean isShutdown() {
102         synchronized (startStopLock) {
103             return shutdown;
104         }
105     }
106 
107     public boolean isTerminated() {
108         synchronized (startStopLock) {
109             return shutdown && activeThreads.isEmpty();
110         }
111     }
112 
113     public void shutdown() {
114         synchronized (startStopLock) {
115             if (shutdown) {
116                 return;
117             }
118             shutdown = true;
119         }
120     }
121 
122     public List<Runnable> shutdownNow() {
123         synchronized (startStopLock) {
124             if (!isTerminated()) {
125                 shutdown();
126                 for (Thread t: activeThreads) {
127                     t.interrupt();
128                 }
129             }
130         }
131 
132         return Collections.emptyList();
133     }
134 
135     public boolean awaitTermination(long timeout, TimeUnit unit)
136             throws InterruptedException {
137         synchronized (startStopLock) {
138             if (!isTerminated()) {
139                 startStopLock.wait(TimeUnit.MILLISECONDS.convert(timeout, unit));
140             }
141             return isTerminated();
142         }
143     }
144 
145     public void execute(Runnable command) {
146         if (command == null) {
147             throw new NullPointerException("command");
148         }
149 
150         if (shutdown) {
151             throw new RejectedExecutionException();
152         }
153 
154         if (s != null) {
155             s.execute(new ChildExecutorRunnable(command));
156         } else {
157             e.execute(new ChildExecutorRunnable(command));
158         }
159     }
160 
161     private class ChildExecutorRunnable implements Runnable {
162 
163         private final Runnable runnable;
164 
165         ChildExecutorRunnable(Runnable runnable) {
166             this.runnable = runnable;
167         }
168 
169         public void run() {
170             Thread thread = Thread.currentThread();
171             synchronized (startStopLock) {
172                 activeThreads.add(thread);
173             }
174             try {
175                 runnable.run();
176             } finally {
177                 synchronized (startStopLock) {
178                     boolean removed = activeThreads.remove(thread);
179                     assert removed;
180                     if (isTerminated()) {
181                         startStopLock.notifyAll();
182                     }
183                 }
184             }
185         }
186     }
187 }