View Javadoc
1   /*
2    * Copyright 2016 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.util.concurrent;
17  
18  import io.netty.util.internal.logging.InternalLogger;
19  import io.netty.util.internal.logging.InternalLoggerFactory;
20  
21  import java.util.Collections;
22  import java.util.Iterator;
23  import java.util.List;
24  import java.util.Set;
25  import java.util.concurrent.Callable;
26  import java.util.concurrent.Delayed;
27  import java.util.concurrent.RejectedExecutionHandler;
28  import java.util.concurrent.RunnableScheduledFuture;
29  import java.util.concurrent.ScheduledThreadPoolExecutor;
30  import java.util.concurrent.ThreadFactory;
31  import java.util.concurrent.TimeUnit;
32  
33  import static java.util.concurrent.TimeUnit.NANOSECONDS;
34  
35  /**
36   * {@link EventExecutor} implementation which makes no guarantees about the ordering of task execution that
37   * are submitted because there may be multiple threads executing these tasks.
38   * This implementation is most useful for protocols that do not need strict ordering.
39   *
40   * <strong>Because it provides no ordering care should be taken when using it!</strong>
41   */
42  public final class UnorderedThreadPoolEventExecutor extends ScheduledThreadPoolExecutor implements EventExecutor {
43      private static final InternalLogger logger = InternalLoggerFactory.getInstance(
44              UnorderedThreadPoolEventExecutor.class);
45  
46      private final Promise<?> terminationFuture = GlobalEventExecutor.INSTANCE.newPromise();
47      private final Set<EventExecutor> executorSet = Collections.singleton((EventExecutor) this);
48  
49      /**
50       * Calls {@link UnorderedThreadPoolEventExecutor#UnorderedThreadPoolEventExecutor(int, ThreadFactory)}
51       * using {@link DefaultThreadFactory}.
52       */
53      public UnorderedThreadPoolEventExecutor(int corePoolSize) {
54          this(corePoolSize, new DefaultThreadFactory(UnorderedThreadPoolEventExecutor.class));
55      }
56  
57      /**
58       * See {@link ScheduledThreadPoolExecutor#ScheduledThreadPoolExecutor(int, ThreadFactory)}
59       */
60      public UnorderedThreadPoolEventExecutor(int corePoolSize, ThreadFactory threadFactory) {
61          super(corePoolSize, threadFactory);
62      }
63  
64      /**
65       * Calls {@link UnorderedThreadPoolEventExecutor#UnorderedThreadPoolEventExecutor(int,
66       * ThreadFactory, java.util.concurrent.RejectedExecutionHandler)} using {@link DefaultThreadFactory}.
67       */
68      public UnorderedThreadPoolEventExecutor(int corePoolSize, RejectedExecutionHandler handler) {
69          this(corePoolSize, new DefaultThreadFactory(UnorderedThreadPoolEventExecutor.class), handler);
70      }
71  
72      /**
73       * See {@link ScheduledThreadPoolExecutor#ScheduledThreadPoolExecutor(int, ThreadFactory, RejectedExecutionHandler)}
74       */
75      public UnorderedThreadPoolEventExecutor(int corePoolSize, ThreadFactory threadFactory,
76                                              RejectedExecutionHandler handler) {
77          super(corePoolSize, threadFactory, handler);
78      }
79  
80      @Override
81      public EventExecutor next() {
82          return this;
83      }
84  
85      @Override
86      public EventExecutorGroup parent() {
87          return this;
88      }
89  
90      @Override
91      public boolean inEventLoop() {
92          return false;
93      }
94  
95      @Override
96      public boolean inEventLoop(Thread thread) {
97          return false;
98      }
99  
100     @Override
101     public <V> Promise<V> newPromise() {
102         return new DefaultPromise<V>(this);
103     }
104 
105     @Override
106     public <V> ProgressivePromise<V> newProgressivePromise() {
107         return new DefaultProgressivePromise<V>(this);
108     }
109 
110     @Override
111     public <V> Future<V> newSucceededFuture(V result) {
112         return new SucceededFuture<V>(this, result);
113     }
114 
115     @Override
116     public <V> Future<V> newFailedFuture(Throwable cause) {
117         return new FailedFuture<V>(this, cause);
118     }
119 
120     @Override
121     public boolean isShuttingDown() {
122         return isShutdown();
123     }
124 
125     @Override
126     public List<Runnable> shutdownNow() {
127         List<Runnable> tasks = super.shutdownNow();
128         terminationFuture.trySuccess(null);
129         return tasks;
130     }
131 
132     @Override
133     public void shutdown() {
134         super.shutdown();
135         terminationFuture.trySuccess(null);
136     }
137 
138     @Override
139     public Future<?> shutdownGracefully() {
140         return shutdownGracefully(2, 15, TimeUnit.SECONDS);
141     }
142 
143     @Override
144     public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
145         // TODO: At the moment this just calls shutdown but we may be able to do something more smart here which
146         //       respects the quietPeriod and timeout.
147         shutdown();
148         return terminationFuture();
149     }
150 
151     @Override
152     public Future<?> terminationFuture() {
153         return terminationFuture;
154     }
155 
156     @Override
157     public Iterator<EventExecutor> iterator() {
158         return executorSet.iterator();
159     }
160 
161     @Override
162     protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable, RunnableScheduledFuture<V> task) {
163         return runnable instanceof NonNotifyRunnable ?
164                 task : new RunnableScheduledFutureTask<V>(this, runnable, task);
165     }
166 
167     @Override
168     protected <V> RunnableScheduledFuture<V> decorateTask(Callable<V> callable, RunnableScheduledFuture<V> task) {
169         return new RunnableScheduledFutureTask<V>(this, callable, task);
170     }
171 
172     @Override
173     public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
174         return (ScheduledFuture<?>) super.schedule(command, delay, unit);
175     }
176 
177     @Override
178     public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
179         return (ScheduledFuture<V>) super.schedule(callable, delay, unit);
180     }
181 
182     @Override
183     public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
184         return (ScheduledFuture<?>) super.scheduleAtFixedRate(command, initialDelay, period, unit);
185     }
186 
187     @Override
188     public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
189         return (ScheduledFuture<?>) super.scheduleWithFixedDelay(command, initialDelay, delay, unit);
190     }
191 
192     @Override
193     public Future<?> submit(Runnable task) {
194         return (Future<?>) super.submit(task);
195     }
196 
197     @Override
198     public <T> Future<T> submit(Runnable task, T result) {
199         return (Future<T>) super.submit(task, result);
200     }
201 
202     @Override
203     public <T> Future<T> submit(Callable<T> task) {
204         return (Future<T>) super.submit(task);
205     }
206 
207     @Override
208     public void execute(Runnable command) {
209         super.schedule(new NonNotifyRunnable(command), 0, NANOSECONDS);
210     }
211 
212     private static final class RunnableScheduledFutureTask<V> extends PromiseTask<V>
213             implements RunnableScheduledFuture<V>, ScheduledFuture<V> {
214         private final RunnableScheduledFuture<V> future;
215 
216         RunnableScheduledFutureTask(EventExecutor executor, Runnable runnable,
217                                            RunnableScheduledFuture<V> future) {
218             super(executor, runnable, null);
219             this.future = future;
220         }
221 
222         RunnableScheduledFutureTask(EventExecutor executor, Callable<V> callable,
223                                            RunnableScheduledFuture<V> future) {
224             super(executor, callable);
225             this.future = future;
226         }
227 
228         @Override
229         public void run() {
230             if (!isPeriodic()) {
231                 super.run();
232             } else if (!isDone()) {
233                 try {
234                     // Its a periodic task so we need to ignore the return value
235                     task.call();
236                 } catch (Throwable cause) {
237                     if (!tryFailureInternal(cause)) {
238                         logger.warn("Failure during execution of task", cause);
239                     }
240                 }
241             }
242         }
243 
244         @Override
245         public boolean isPeriodic() {
246             return future.isPeriodic();
247         }
248 
249         @Override
250         public long getDelay(TimeUnit unit) {
251             return future.getDelay(unit);
252         }
253 
254         @Override
255         public int compareTo(Delayed o) {
256             return future.compareTo(o);
257         }
258     }
259 
260     // This is a special wrapper which we will be used in execute(...) to wrap the submitted Runnable. This is needed as
261     // ScheduledThreadPoolExecutor.execute(...) will delegate to submit(...) which will then use decorateTask(...).
262     // The problem with this is that decorateTask(...) needs to ensure we only do our own decoration if we not call
263     // from execute(...) as otherwise we may end up creating an endless loop because DefaultPromise will call
264     // EventExecutor.execute(...) when notify the listeners of the promise.
265     //
266     // See https://github.com/netty/netty/issues/6507
267     private static final class NonNotifyRunnable implements Runnable {
268 
269         private final Runnable task;
270 
271         NonNotifyRunnable(Runnable task) {
272             this.task = task;
273         }
274 
275         @Override
276         public void run() {
277             task.run();
278         }
279     }
280 }