View Javadoc
1   /*
2    * Copyright 2016 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.util.concurrent;
17  
18  import io.netty.util.internal.ObjectUtil;
19  import io.netty.util.internal.PlatformDependent;
20  import io.netty.util.internal.UnstableApi;
21  
22  import java.util.Collection;
23  import java.util.Iterator;
24  import java.util.List;
25  import java.util.Queue;
26  import java.util.concurrent.Callable;
27  import java.util.concurrent.ExecutionException;
28  import java.util.concurrent.RejectedExecutionException;
29  import java.util.concurrent.TimeUnit;
30  import java.util.concurrent.TimeoutException;
31  import java.util.concurrent.atomic.AtomicInteger;
32  
33  /**
34   * {@link EventExecutorGroup} which will preserve {@link Runnable} execution order but makes no guarantees about what
35   * {@link EventExecutor} (and therefore {@link Thread}) will be used to execute the {@link Runnable}s.
36   *
37   * <p>The {@link EventExecutorGroup#next()} for the wrapped {@link EventExecutorGroup} must <strong>NOT</strong> return
38   * executors of type {@link OrderedEventExecutor}.
39   */
40  @UnstableApi
41  public final class NonStickyEventExecutorGroup implements EventExecutorGroup {
42      private final EventExecutorGroup group;
43      private final int maxTaskExecutePerRun;
44  
45      /**
46       * Creates a new instance. Be aware that the given {@link EventExecutorGroup} <strong>MUST NOT</strong> contain
47       * any {@link OrderedEventExecutor}s.
48       */
49      public NonStickyEventExecutorGroup(EventExecutorGroup group) {
50          this(group, 1024);
51      }
52  
53      /**
54       * Creates a new instance. Be aware that the given {@link EventExecutorGroup} <strong>MUST NOT</strong> contain
55       * any {@link OrderedEventExecutor}s.
56       */
57      public NonStickyEventExecutorGroup(EventExecutorGroup group, int maxTaskExecutePerRun) {
58          this.group = verify(group);
59          this.maxTaskExecutePerRun = ObjectUtil.checkPositive(maxTaskExecutePerRun, "maxTaskExecutePerRun");
60      }
61  
62      private static EventExecutorGroup verify(EventExecutorGroup group) {
63          Iterator<EventExecutor> executors = ObjectUtil.checkNotNull(group, "group").iterator();
64          while (executors.hasNext()) {
65              EventExecutor executor = executors.next();
66              if (executor instanceof OrderedEventExecutor) {
67                  throw new IllegalArgumentException("EventExecutorGroup " + group
68                          + " contains OrderedEventExecutors: " + executor);
69              }
70          }
71          return group;
72      }
73  
74      private NonStickyOrderedEventExecutor newExecutor(EventExecutor executor) {
75          return new NonStickyOrderedEventExecutor(executor, maxTaskExecutePerRun);
76      }
77  
78      @Override
79      public boolean isShuttingDown() {
80          return group.isShuttingDown();
81      }
82  
83      @Override
84      public Future<?> shutdownGracefully() {
85          return group.shutdownGracefully();
86      }
87  
88      @Override
89      public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
90          return group.shutdownGracefully(quietPeriod, timeout, unit);
91      }
92  
93      @Override
94      public Future<?> terminationFuture() {
95          return group.terminationFuture();
96      }
97  
98      @SuppressWarnings("deprecation")
99      @Override
100     public void shutdown() {
101         group.shutdown();
102     }
103 
104     @SuppressWarnings("deprecation")
105     @Override
106     public List<Runnable> shutdownNow() {
107         return group.shutdownNow();
108     }
109 
110     @Override
111     public EventExecutor next() {
112         return newExecutor(group.next());
113     }
114 
115     @Override
116     public Iterator<EventExecutor> iterator() {
117         final Iterator<EventExecutor> itr = group.iterator();
118         return new Iterator<EventExecutor>() {
119             @Override
120             public boolean hasNext() {
121                 return itr.hasNext();
122             }
123 
124             @Override
125             public EventExecutor next() {
126                 return newExecutor(itr.next());
127             }
128 
129             @Override
130             public void remove() {
131                 itr.remove();
132             }
133         };
134     }
135 
136     @Override
137     public Future<?> submit(Runnable task) {
138         return group.submit(task);
139     }
140 
141     @Override
142     public <T> Future<T> submit(Runnable task, T result) {
143         return group.submit(task, result);
144     }
145 
146     @Override
147     public <T> Future<T> submit(Callable<T> task) {
148         return group.submit(task);
149     }
150 
151     @Override
152     public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
153         return group.schedule(command, delay, unit);
154     }
155 
156     @Override
157     public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
158         return group.schedule(callable, delay, unit);
159     }
160 
161     @Override
162     public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
163         return group.scheduleAtFixedRate(command, initialDelay, period, unit);
164     }
165 
166     @Override
167     public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
168         return group.scheduleWithFixedDelay(command, initialDelay, delay, unit);
169     }
170 
171     @Override
172     public boolean isShutdown() {
173         return group.isShutdown();
174     }
175 
176     @Override
177     public boolean isTerminated() {
178         return group.isTerminated();
179     }
180 
181     @Override
182     public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
183         return group.awaitTermination(timeout, unit);
184     }
185 
186     @Override
187     public <T> List<java.util.concurrent.Future<T>> invokeAll(
188             Collection<? extends Callable<T>> tasks) throws InterruptedException {
189         return group.invokeAll(tasks);
190     }
191 
192     @Override
193     public <T> List<java.util.concurrent.Future<T>> invokeAll(
194             Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
195         return group.invokeAll(tasks, timeout, unit);
196     }
197 
198     @Override
199     public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
200         return group.invokeAny(tasks);
201     }
202 
203     @Override
204     public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
205             throws InterruptedException, ExecutionException, TimeoutException {
206         return group.invokeAny(tasks, timeout, unit);
207     }
208 
209     @Override
210     public void execute(Runnable command) {
211         group.execute(command);
212     }
213 
214     private static final class NonStickyOrderedEventExecutor extends AbstractEventExecutor
215             implements Runnable, OrderedEventExecutor {
216         private final EventExecutor executor;
217         private final Queue<Runnable> tasks = PlatformDependent.newMpscQueue();
218 
219         private static final int NONE = 0;
220         private static final int SUBMITTED = 1;
221         private static final int RUNNING = 2;
222 
223         private final AtomicInteger state = new AtomicInteger();
224         private final int maxTaskExecutePerRun;
225 
226         NonStickyOrderedEventExecutor(EventExecutor executor, int maxTaskExecutePerRun) {
227             super(executor);
228             this.executor = executor;
229             this.maxTaskExecutePerRun = maxTaskExecutePerRun;
230         }
231 
232         @Override
233         public void run() {
234             if (!state.compareAndSet(SUBMITTED, RUNNING)) {
235                 return;
236             }
237             for (;;) {
238                 int i = 0;
239                 try {
240                     for (; i < maxTaskExecutePerRun; i++) {
241                         Runnable task = tasks.poll();
242                         if (task == null) {
243                             break;
244                         }
245                         safeExecute(task);
246                     }
247                 } finally {
248                     if (i == maxTaskExecutePerRun) {
249                         try {
250                             state.set(SUBMITTED);
251                             executor.execute(this);
252                             return; // done
253                         } catch (Throwable ignore) {
254                             // Reset the state back to running as we will keep on executing tasks.
255                             state.set(RUNNING);
256                             // if an error happened we should just ignore it and let the loop run again as there is not
257                             // much else we can do. Most likely this was triggered by a full task queue. In this case
258                             // we just will run more tasks and try again later.
259                         }
260                     } else {
261                         state.set(NONE);
262                         return; // done
263                     }
264                 }
265             }
266         }
267 
268         @Override
269         public boolean inEventLoop(Thread thread) {
270             return false;
271         }
272 
273         @Override
274         public boolean inEventLoop() {
275             return false;
276         }
277 
278         @Override
279         public boolean isShuttingDown() {
280             return executor.isShutdown();
281         }
282 
283         @Override
284         public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
285             return executor.shutdownGracefully(quietPeriod, timeout, unit);
286         }
287 
288         @Override
289         public Future<?> terminationFuture() {
290             return executor.terminationFuture();
291         }
292 
293         @Override
294         public void shutdown() {
295             executor.shutdown();
296         }
297 
298         @Override
299         public boolean isShutdown() {
300             return executor.isShutdown();
301         }
302 
303         @Override
304         public boolean isTerminated() {
305             return executor.isTerminated();
306         }
307 
308         @Override
309         public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
310             return executor.awaitTermination(timeout, unit);
311         }
312 
313         @Override
314         public void execute(Runnable command) {
315             if (!tasks.offer(command)) {
316                 throw new RejectedExecutionException();
317             }
318             if (state.compareAndSet(NONE, SUBMITTED)) {
319                 // Actually it could happen that the runnable was picked up in between but we not care to much and just
320                 // execute ourself. At worst this will be a NOOP when run() is called.
321                 try {
322                     executor.execute(this);
323                 } catch (Throwable e) {
324                     // Not reset the state as some other Runnable may be added to the queue already in the meantime.
325                     tasks.remove(command);
326                     PlatformDependent.throwException(e);
327                 }
328             }
329         }
330     }
331 }