View Javadoc
1   /*
2    * Copyright 2016 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.util.concurrent;
17  
18  import io.netty.util.internal.logging.InternalLogger;
19  import io.netty.util.internal.logging.InternalLoggerFactory;
20  
21  import java.util.Collections;
22  import java.util.Iterator;
23  import java.util.List;
24  import java.util.Set;
25  import java.util.concurrent.Callable;
26  import java.util.concurrent.Delayed;
27  import java.util.concurrent.ExecutionException;
28  import java.util.concurrent.RejectedExecutionHandler;
29  import java.util.concurrent.RunnableScheduledFuture;
30  import java.util.concurrent.ScheduledThreadPoolExecutor;
31  import java.util.concurrent.ThreadFactory;
32  import java.util.concurrent.TimeUnit;
33  
34  import static java.util.concurrent.TimeUnit.NANOSECONDS;
35  
36  /**
37   * {@link EventExecutor} implementation which makes no guarantees about the ordering of task execution that
38   * are submitted because there may be multiple threads executing these tasks.
39   * This implementation is most useful for protocols that do not need strict ordering.
40   *
41   * <strong>Because it provides no ordering care should be taken when using it!</strong>
42   */
43  public final class UnorderedThreadPoolEventExecutor extends ScheduledThreadPoolExecutor implements EventExecutor {
44      private static final InternalLogger logger = InternalLoggerFactory.getInstance(
45              UnorderedThreadPoolEventExecutor.class);
46  
47      private final Promise<?> terminationFuture = GlobalEventExecutor.INSTANCE.newPromise();
48      private final Set<EventExecutor> executorSet = Collections.singleton((EventExecutor) this);
49  
50      /**
51       * Calls {@link UnorderedThreadPoolEventExecutor#UnorderedThreadPoolEventExecutor(int, ThreadFactory)}
52       * using {@link DefaultThreadFactory}.
53       */
54      public UnorderedThreadPoolEventExecutor(int corePoolSize) {
55          this(corePoolSize, new DefaultThreadFactory(UnorderedThreadPoolEventExecutor.class));
56      }
57  
58      /**
59       * See {@link ScheduledThreadPoolExecutor#ScheduledThreadPoolExecutor(int, ThreadFactory)}
60       */
61      public UnorderedThreadPoolEventExecutor(int corePoolSize, ThreadFactory threadFactory) {
62          super(corePoolSize, threadFactory);
63      }
64  
65      /**
66       * Calls {@link UnorderedThreadPoolEventExecutor#UnorderedThreadPoolEventExecutor(int,
67       * ThreadFactory, java.util.concurrent.RejectedExecutionHandler)} using {@link DefaultThreadFactory}.
68       */
69      public UnorderedThreadPoolEventExecutor(int corePoolSize, RejectedExecutionHandler handler) {
70          this(corePoolSize, new DefaultThreadFactory(UnorderedThreadPoolEventExecutor.class), handler);
71      }
72  
73      /**
74       * See {@link ScheduledThreadPoolExecutor#ScheduledThreadPoolExecutor(int, ThreadFactory, RejectedExecutionHandler)}
75       */
76      public UnorderedThreadPoolEventExecutor(int corePoolSize, ThreadFactory threadFactory,
77                                              RejectedExecutionHandler handler) {
78          super(corePoolSize, threadFactory, handler);
79      }
80  
81      @Override
82      public EventExecutor next() {
83          return this;
84      }
85  
86      @Override
87      public EventExecutorGroup parent() {
88          return this;
89      }
90  
91      @Override
92      public boolean inEventLoop() {
93          return false;
94      }
95  
96      @Override
97      public boolean inEventLoop(Thread thread) {
98          return false;
99      }
100 
101     @Override
102     public <V> Promise<V> newPromise() {
103         return new DefaultPromise<V>(this);
104     }
105 
106     @Override
107     public <V> ProgressivePromise<V> newProgressivePromise() {
108         return new DefaultProgressivePromise<V>(this);
109     }
110 
111     @Override
112     public <V> Future<V> newSucceededFuture(V result) {
113         return new SucceededFuture<V>(this, result);
114     }
115 
116     @Override
117     public <V> Future<V> newFailedFuture(Throwable cause) {
118         return new FailedFuture<V>(this, cause);
119     }
120 
121     @Override
122     public boolean isShuttingDown() {
123         return isShutdown();
124     }
125 
126     @Override
127     public List<Runnable> shutdownNow() {
128         List<Runnable> tasks = super.shutdownNow();
129         terminationFuture.trySuccess(null);
130         return tasks;
131     }
132 
133     @Override
134     public void shutdown() {
135         super.shutdown();
136         terminationFuture.trySuccess(null);
137     }
138 
139     @Override
140     public Future<?> shutdownGracefully() {
141         return shutdownGracefully(2, 15, TimeUnit.SECONDS);
142     }
143 
144     @Override
145     public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
146         // TODO: At the moment this just calls shutdown but we may be able to do something more smart here which
147         //       respects the quietPeriod and timeout.
148         shutdown();
149         return terminationFuture();
150     }
151 
152     @Override
153     public Future<?> terminationFuture() {
154         return terminationFuture;
155     }
156 
157     @Override
158     public Iterator<EventExecutor> iterator() {
159         return executorSet.iterator();
160     }
161 
162     @Override
163     protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable, RunnableScheduledFuture<V> task) {
164         return runnable instanceof NonNotifyRunnable ?
165                 task : new RunnableScheduledFutureTask<V>(this, task, false);
166     }
167 
168     @Override
169     protected <V> RunnableScheduledFuture<V> decorateTask(Callable<V> callable, RunnableScheduledFuture<V> task) {
170         return new RunnableScheduledFutureTask<V>(this, task, true);
171     }
172 
173     @Override
174     public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
175         return (ScheduledFuture<?>) super.schedule(command, delay, unit);
176     }
177 
178     @Override
179     public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
180         return (ScheduledFuture<V>) super.schedule(callable, delay, unit);
181     }
182 
183     @Override
184     public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
185         return (ScheduledFuture<?>) super.scheduleAtFixedRate(command, initialDelay, period, unit);
186     }
187 
188     @Override
189     public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
190         return (ScheduledFuture<?>) super.scheduleWithFixedDelay(command, initialDelay, delay, unit);
191     }
192 
193     @Override
194     public Future<?> submit(Runnable task) {
195         return (Future<?>) super.submit(task);
196     }
197 
198     @Override
199     public <T> Future<T> submit(Runnable task, T result) {
200         return (Future<T>) super.submit(task, result);
201     }
202 
203     @Override
204     public <T> Future<T> submit(Callable<T> task) {
205         return (Future<T>) super.submit(task);
206     }
207 
208     @Override
209     public void execute(Runnable command) {
210         super.schedule(new NonNotifyRunnable(command), 0, NANOSECONDS);
211     }
212 
213     private static final class RunnableScheduledFutureTask<V> extends PromiseTask<V>
214             implements RunnableScheduledFuture<V>, ScheduledFuture<V> {
215         private final RunnableScheduledFuture<V> future;
216         private final boolean wasCallable;
217 
218         RunnableScheduledFutureTask(EventExecutor executor, RunnableScheduledFuture<V> future, boolean wasCallable) {
219             super(executor, future);
220             this.future = future;
221             this.wasCallable = wasCallable;
222         }
223 
224         @Override
225         V runTask() throws Throwable {
226             V result =  super.runTask();
227             if (result == null && wasCallable) {
228                 // If this RunnableScheduledFutureTask wraps a RunnableScheduledFuture that wraps a Callable we need
229                 // to ensure that we return the correct result by calling future.get().
230                 //
231                 // See https://github.com/netty/netty/issues/11072
232                 assert future.isDone();
233                 try {
234                     return future.get();
235                 } catch (ExecutionException e) {
236                     // unwrap exception.
237                     throw e.getCause();
238                 }
239             }
240             return result;
241         }
242 
243         @Override
244         public void run() {
245             if (!isPeriodic()) {
246                 super.run();
247             } else if (!isDone()) {
248                 try {
249                     // Its a periodic task so we need to ignore the return value
250                     runTask();
251                 } catch (Throwable cause) {
252                     if (!tryFailureInternal(cause)) {
253                         logger.warn("Failure during execution of task", cause);
254                     }
255                 }
256             }
257         }
258 
259         @Override
260         public boolean isPeriodic() {
261             return future.isPeriodic();
262         }
263 
264         @Override
265         public long getDelay(TimeUnit unit) {
266             return future.getDelay(unit);
267         }
268 
269         @Override
270         public int compareTo(Delayed o) {
271             return future.compareTo(o);
272         }
273     }
274 
275     // This is a special wrapper which we will be used in execute(...) to wrap the submitted Runnable. This is needed as
276     // ScheduledThreadPoolExecutor.execute(...) will delegate to submit(...) which will then use decorateTask(...).
277     // The problem with this is that decorateTask(...) needs to ensure we only do our own decoration if we not call
278     // from execute(...) as otherwise we may end up creating an endless loop because DefaultPromise will call
279     // EventExecutor.execute(...) when notify the listeners of the promise.
280     //
281     // See https://github.com/netty/netty/issues/6507
282     private static final class NonNotifyRunnable implements Runnable {
283 
284         private final Runnable task;
285 
286         NonNotifyRunnable(Runnable task) {
287             this.task = task;
288         }
289 
290         @Override
291         public void run() {
292             task.run();
293         }
294     }
295 }