View Javadoc
1   /*
2    * Copyright 2016 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty5.util.concurrent;
17  
18  import io.netty5.util.internal.logging.InternalLogger;
19  import io.netty5.util.internal.logging.InternalLoggerFactory;
20  
21  import java.util.concurrent.BlockingQueue;
22  import java.util.concurrent.Callable;
23  import java.util.concurrent.Delayed;
24  import java.util.concurrent.RejectedExecutionHandler;
25  import java.util.concurrent.RunnableScheduledFuture;
26  import java.util.concurrent.ScheduledThreadPoolExecutor;
27  import java.util.concurrent.ThreadFactory;
28  import java.util.concurrent.TimeUnit;
29  
30  import static java.util.concurrent.TimeUnit.NANOSECONDS;
31  
32  /**
33   * {@link EventExecutor} implementation which makes no guarantees about the ordering of task execution that
34   * are submitted because there may be multiple threads executing these tasks.
35   * This implementation is most useful for protocols that do not need strict ordering.
36   *
37   * <strong>Because it provides no ordering care should be taken when using it!</strong>
38   */
39  @SuppressWarnings("unchecked")
40  public final class UnorderedThreadPoolEventExecutor implements EventExecutor {
41      private static final InternalLogger logger = InternalLoggerFactory.getInstance(
42              UnorderedThreadPoolEventExecutor.class);
43  
44      private final Promise<Void> terminationFuture = GlobalEventExecutor.INSTANCE.newPromise();
45      private final InnerScheduledThreadPoolExecutor executor;
46  
47      /**
48       * Calls {@link UnorderedThreadPoolEventExecutor#UnorderedThreadPoolEventExecutor(int, ThreadFactory)}
49       * using {@link DefaultThreadFactory}.
50       */
51      public UnorderedThreadPoolEventExecutor(int corePoolSize) {
52          DefaultThreadFactory threadFactory = new DefaultThreadFactory(UnorderedThreadPoolEventExecutor.class);
53          executor = new InnerScheduledThreadPoolExecutor(this, corePoolSize, threadFactory);
54      }
55  
56      /**
57       * See {@link ScheduledThreadPoolExecutor#ScheduledThreadPoolExecutor(int, ThreadFactory)}
58       */
59      public UnorderedThreadPoolEventExecutor(int corePoolSize, ThreadFactory threadFactory) {
60          executor = new InnerScheduledThreadPoolExecutor(this, corePoolSize, threadFactory);
61      }
62  
63      /**
64       * Calls {@link UnorderedThreadPoolEventExecutor#UnorderedThreadPoolEventExecutor(int,
65       * ThreadFactory, java.util.concurrent.RejectedExecutionHandler)} using {@link DefaultThreadFactory}.
66       */
67      public UnorderedThreadPoolEventExecutor(int corePoolSize, RejectedExecutionHandler handler) {
68          DefaultThreadFactory threadFactory = new DefaultThreadFactory(UnorderedThreadPoolEventExecutor.class);
69          executor = new InnerScheduledThreadPoolExecutor(this, corePoolSize, threadFactory, handler);
70      }
71  
72      /**
73       * See {@link ScheduledThreadPoolExecutor#ScheduledThreadPoolExecutor(int, ThreadFactory, RejectedExecutionHandler)}
74       */
75      public UnorderedThreadPoolEventExecutor(int corePoolSize, ThreadFactory threadFactory,
76                                              RejectedExecutionHandler handler) {
77          executor = new InnerScheduledThreadPoolExecutor(this, corePoolSize, threadFactory, handler);
78      }
79  
80      @Override
81      public boolean inEventLoop(Thread thread) {
82          return false;
83      }
84  
85      @Override
86      public boolean isShuttingDown() {
87          return isShutdown();
88      }
89  
90      @Override
91      public boolean isShutdown() {
92          return executor.isShutdown();
93      }
94  
95      @Override
96      public boolean isTerminated() {
97          return executor.isTerminated();
98      }
99  
100     @Override
101     public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
102         return executor.awaitTermination(timeout, unit);
103     }
104 
105     @Override
106     public Future<Void> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
107         // TODO: At the moment this just calls shutdown but we may be able to do something more smart here which
108         //       respects the quietPeriod and timeout.
109         executor.shutdown();
110         return terminationFuture();
111     }
112 
113     @Override
114     public Future<Void> terminationFuture() {
115         return terminationFuture.asFuture();
116     }
117 
118     @Override
119     public Future<Void> schedule(Runnable task, long delay, TimeUnit unit) {
120         return (Future<Void>) executor.schedule(task, delay, unit);
121     }
122 
123     @Override
124     public <V> Future<V> schedule(Callable<V> task, long delay, TimeUnit unit) {
125         return (Future<V>) executor.schedule(task, delay, unit);
126     }
127 
128     @Override
129     public Future<Void> scheduleAtFixedRate(Runnable task, long initialDelay, long period, TimeUnit unit) {
130         return (Future<Void>) executor.scheduleAtFixedRate(task, initialDelay, period, unit);
131     }
132 
133     @Override
134     public Future<Void> scheduleWithFixedDelay(Runnable task, long initialDelay, long delay, TimeUnit unit) {
135         return (Future<Void>) executor.scheduleWithFixedDelay(task, initialDelay, delay, unit);
136     }
137 
138     @Override
139     public Future<Void> submit(Runnable task) {
140         return (Future<Void>) executor.submit(task);
141     }
142 
143     @Override
144     public <T> Future<T> submit(Runnable task, T result) {
145         return (Future<T>) executor.submit(task, result);
146     }
147 
148     @Override
149     public <T> Future<T> submit(Callable<T> task) {
150         return (Future<T>) executor.submit(task);
151     }
152 
153     @Override
154     public void execute(Runnable task) {
155         executor.schedule(new NonNotifyRunnable(task), 0, NANOSECONDS);
156     }
157 
158     /**
159      * Return the task queue of the underlying {@link java.util.concurrent.Executor} instance.
160      * <p>
161      * Visible for testing.
162      *
163      * @return The task queue of this executor.
164      */
165     BlockingQueue<Runnable> getQueue() {
166         return executor.getQueue();
167     }
168 
169     /**
170      * Note: this class has a natural ordering that is inconsistent with equals.
171      */
172     private static final class RunnableScheduledFutureTask<V> extends PromiseTask<V>
173             implements RunnableScheduledFuture<V> {
174         private final RunnableScheduledFuture<V> future;
175 
176         RunnableScheduledFutureTask(EventExecutor executor, Runnable runnable, RunnableScheduledFuture<V> future) {
177             super(executor, runnable, null);
178             this.future = future;
179         }
180 
181         RunnableScheduledFutureTask(EventExecutor executor, Callable<V> callable, RunnableScheduledFuture<V> future) {
182             super(executor, callable);
183             this.future = future;
184         }
185 
186         @Override
187         public void run() {
188             if (!isPeriodic()) {
189                 super.run();
190             } else if (!isDone()) {
191                 try {
192                     // Its a periodic task so we need to ignore the return value
193                     future.run();
194                 } catch (Throwable cause) {
195                     if (!tryFailureInternal(cause)) {
196                         logger.warn("Failure during execution of task", cause);
197                     }
198                 }
199             }
200         }
201 
202         @Override
203         public boolean isPeriodic() {
204             return future.isPeriodic();
205         }
206 
207         @Override
208         public long getDelay(TimeUnit unit) {
209             return future.getDelay(unit);
210         }
211 
212         @Override
213         public int compareTo(Delayed o) {
214             return future.compareTo(o);
215         }
216     }
217 
218     // This is a special wrapper which we will be used in execute(...) to wrap the submitted Runnable. This is needed as
219     // ScheduledThreadPoolExecutor.execute(...) will delegate to submit(...) which will then use decorateTask(...).
220     // The problem with this is that decorateTask(...) needs to ensure we only do our own decoration if we not call
221     // from execute(...) as otherwise we may end up creating an endless loop because DefaultPromise will call
222     // EventExecutor.execute(...) when notify the listeners of the promise.
223     //
224     // See https://github.com/netty/netty/issues/6507
225     private static final class NonNotifyRunnable implements Runnable {
226 
227         private final Runnable task;
228 
229         NonNotifyRunnable(Runnable task) {
230             this.task = task;
231         }
232 
233         @Override
234         public void run() {
235             task.run();
236         }
237     }
238 
239     private static final class InnerScheduledThreadPoolExecutor extends ScheduledThreadPoolExecutor {
240         private final EventExecutor eventExecutor;
241 
242         InnerScheduledThreadPoolExecutor(EventExecutor eventExecutor, int corePoolSize, ThreadFactory threadFactory) {
243             super(corePoolSize, threadFactory);
244             this.eventExecutor = eventExecutor;
245         }
246 
247         InnerScheduledThreadPoolExecutor(EventExecutor eventExecutor, int corePoolSize, ThreadFactory threadFactory,
248                                                 RejectedExecutionHandler handler) {
249             super(corePoolSize, threadFactory, handler);
250             this.eventExecutor = eventExecutor;
251         }
252 
253         @Override
254         protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable, RunnableScheduledFuture<V> task) {
255             return runnable instanceof NonNotifyRunnable ?
256                     task : new RunnableScheduledFutureTask<>(eventExecutor, runnable, task);
257         }
258 
259         @Override
260         protected <V> RunnableScheduledFuture<V> decorateTask(Callable<V> callable, RunnableScheduledFuture<V> task) {
261             return new RunnableScheduledFutureTask<>(eventExecutor, callable, task);
262         }
263     }
264 }