View Javadoc
1   /*
2    * Copyright 2016 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.util.concurrent;
17  
18  import io.netty.util.internal.ObjectUtil;
19  import io.netty.util.internal.PlatformDependent;
20  import io.netty.util.internal.UnstableApi;
21  
22  import java.util.Collection;
23  import java.util.Iterator;
24  import java.util.List;
25  import java.util.Queue;
26  import java.util.concurrent.Callable;
27  import java.util.concurrent.ExecutionException;
28  import java.util.concurrent.RejectedExecutionException;
29  import java.util.concurrent.TimeUnit;
30  import java.util.concurrent.TimeoutException;
31  import java.util.concurrent.atomic.AtomicInteger;
32  import java.util.concurrent.atomic.AtomicReference;
33  
34  /**
35   * {@link EventExecutorGroup} which will preserve {@link Runnable} execution order but makes no guarantees about what
36   * {@link EventExecutor} (and therefore {@link Thread}) will be used to execute the {@link Runnable}s.
37   *
38   * <p>The {@link EventExecutorGroup#next()} for the wrapped {@link EventExecutorGroup} must <strong>NOT</strong> return
39   * executors of type {@link OrderedEventExecutor}.
40   */
41  @UnstableApi
42  public final class NonStickyEventExecutorGroup implements EventExecutorGroup {
43      private final EventExecutorGroup group;
44      private final int maxTaskExecutePerRun;
45  
46      /**
47       * Creates a new instance. Be aware that the given {@link EventExecutorGroup} <strong>MUST NOT</strong> contain
48       * any {@link OrderedEventExecutor}s.
49       */
50      public NonStickyEventExecutorGroup(EventExecutorGroup group) {
51          this(group, 1024);
52      }
53  
54      /**
55       * Creates a new instance. Be aware that the given {@link EventExecutorGroup} <strong>MUST NOT</strong> contain
56       * any {@link OrderedEventExecutor}s.
57       */
58      public NonStickyEventExecutorGroup(EventExecutorGroup group, int maxTaskExecutePerRun) {
59          this.group = verify(group);
60          this.maxTaskExecutePerRun = ObjectUtil.checkPositive(maxTaskExecutePerRun, "maxTaskExecutePerRun");
61      }
62  
63      private static EventExecutorGroup verify(EventExecutorGroup group) {
64          Iterator<EventExecutor> executors = ObjectUtil.checkNotNull(group, "group").iterator();
65          while (executors.hasNext()) {
66              EventExecutor executor = executors.next();
67              if (executor instanceof OrderedEventExecutor) {
68                  throw new IllegalArgumentException("EventExecutorGroup " + group
69                          + " contains OrderedEventExecutors: " + executor);
70              }
71          }
72          return group;
73      }
74  
75      private NonStickyOrderedEventExecutor newExecutor(EventExecutor executor) {
76          return new NonStickyOrderedEventExecutor(executor, maxTaskExecutePerRun);
77      }
78  
79      @Override
80      public boolean isShuttingDown() {
81          return group.isShuttingDown();
82      }
83  
84      @Override
85      public Future<?> shutdownGracefully() {
86          return group.shutdownGracefully();
87      }
88  
89      @Override
90      public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
91          return group.shutdownGracefully(quietPeriod, timeout, unit);
92      }
93  
94      @Override
95      public Future<?> terminationFuture() {
96          return group.terminationFuture();
97      }
98  
99      @SuppressWarnings("deprecation")
100     @Override
101     public void shutdown() {
102         group.shutdown();
103     }
104 
105     @SuppressWarnings("deprecation")
106     @Override
107     public List<Runnable> shutdownNow() {
108         return group.shutdownNow();
109     }
110 
111     @Override
112     public EventExecutor next() {
113         return newExecutor(group.next());
114     }
115 
116     @Override
117     public Iterator<EventExecutor> iterator() {
118         final Iterator<EventExecutor> itr = group.iterator();
119         return new Iterator<EventExecutor>() {
120             @Override
121             public boolean hasNext() {
122                 return itr.hasNext();
123             }
124 
125             @Override
126             public EventExecutor next() {
127                 return newExecutor(itr.next());
128             }
129 
130             @Override
131             public void remove() {
132                 itr.remove();
133             }
134         };
135     }
136 
137     @Override
138     public Future<?> submit(Runnable task) {
139         return group.submit(task);
140     }
141 
142     @Override
143     public <T> Future<T> submit(Runnable task, T result) {
144         return group.submit(task, result);
145     }
146 
147     @Override
148     public <T> Future<T> submit(Callable<T> task) {
149         return group.submit(task);
150     }
151 
152     @Override
153     public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
154         return group.schedule(command, delay, unit);
155     }
156 
157     @Override
158     public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
159         return group.schedule(callable, delay, unit);
160     }
161 
162     @Override
163     public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
164         return group.scheduleAtFixedRate(command, initialDelay, period, unit);
165     }
166 
167     @Override
168     public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
169         return group.scheduleWithFixedDelay(command, initialDelay, delay, unit);
170     }
171 
172     @Override
173     public boolean isShutdown() {
174         return group.isShutdown();
175     }
176 
177     @Override
178     public boolean isTerminated() {
179         return group.isTerminated();
180     }
181 
182     @Override
183     public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
184         return group.awaitTermination(timeout, unit);
185     }
186 
187     @Override
188     public <T> List<java.util.concurrent.Future<T>> invokeAll(
189             Collection<? extends Callable<T>> tasks) throws InterruptedException {
190         return group.invokeAll(tasks);
191     }
192 
193     @Override
194     public <T> List<java.util.concurrent.Future<T>> invokeAll(
195             Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
196         return group.invokeAll(tasks, timeout, unit);
197     }
198 
199     @Override
200     public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
201         return group.invokeAny(tasks);
202     }
203 
204     @Override
205     public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
206             throws InterruptedException, ExecutionException, TimeoutException {
207         return group.invokeAny(tasks, timeout, unit);
208     }
209 
210     @Override
211     public void execute(Runnable command) {
212         group.execute(command);
213     }
214 
215     private static final class NonStickyOrderedEventExecutor extends AbstractEventExecutor
216             implements Runnable, OrderedEventExecutor {
217         private final EventExecutor executor;
218         private final Queue<Runnable> tasks = PlatformDependent.newMpscQueue();
219 
220         private static final int NONE = 0;
221         private static final int SUBMITTED = 1;
222         private static final int RUNNING = 2;
223 
224         private final AtomicInteger state = new AtomicInteger();
225         private final int maxTaskExecutePerRun;
226 
227         private final AtomicReference<Thread> executingThread = new AtomicReference<Thread>();
228 
229         NonStickyOrderedEventExecutor(EventExecutor executor, int maxTaskExecutePerRun) {
230             super(executor);
231             this.executor = executor;
232             this.maxTaskExecutePerRun = maxTaskExecutePerRun;
233         }
234 
235         @Override
236         public void run() {
237             if (!state.compareAndSet(SUBMITTED, RUNNING)) {
238                 return;
239             }
240             Thread current = Thread.currentThread();
241             executingThread.set(current);
242             for (;;) {
243                 int i = 0;
244                 try {
245                     for (; i < maxTaskExecutePerRun; i++) {
246                         Runnable task = tasks.poll();
247                         if (task == null) {
248                             break;
249                         }
250                         safeExecute(task);
251                     }
252                 } finally {
253                     if (i == maxTaskExecutePerRun) {
254                         try {
255                             state.set(SUBMITTED);
256                             // Only set executingThread to null if no other thread did update it yet.
257                             executingThread.compareAndSet(current, null);
258                             executor.execute(this);
259                             return; // done
260                         } catch (Throwable ignore) {
261                             // Reset the state back to running as we will keep on executing tasks.
262                             state.set(RUNNING);
263                             // if an error happened we should just ignore it and let the loop run again as there is not
264                             // much else we can do. Most likely this was triggered by a full task queue. In this case
265                             // we just will run more tasks and try again later.
266                         }
267                     } else {
268                         state.set(NONE);
269                         // After setting the state to NONE, look at the tasks queue one more time.
270                         // If it is empty, then we can return from this method.
271                         // Otherwise, it means the producer thread has called execute(Runnable)
272                         // and enqueued a task in between the tasks.poll() above and the state.set(NONE) here.
273                         // There are two possible scenarios when this happens
274                         //
275                         // 1. The producer thread sees state == NONE, hence the compareAndSet(NONE, SUBMITTED)
276                         //    is successfully setting the state to SUBMITTED. This mean the producer
277                         //    will call / has called executor.execute(this). In this case, we can just return.
278                         // 2. The producer thread don't see the state change, hence the compareAndSet(NONE, SUBMITTED)
279                         //    returns false. In this case, the producer thread won't call executor.execute.
280                         //    In this case, we need to change the state to RUNNING and keeps running.
281                         //
282                         // The above cases can be distinguished by performing a
283                         // compareAndSet(NONE, RUNNING). If it returns "false", it is case 1; otherwise it is case 2.
284                         if (tasks.isEmpty() || !state.compareAndSet(NONE, RUNNING)) {
285                             // Only set executingThread to null if no other thread did update it yet.
286                             executingThread.compareAndSet(current, null);
287                             return; // done
288                         }
289                     }
290                 }
291             }
292         }
293 
294         @Override
295         public boolean inEventLoop(Thread thread) {
296             return executingThread.get() == thread;
297         }
298 
299         @Override
300         public boolean isShuttingDown() {
301             return executor.isShutdown();
302         }
303 
304         @Override
305         public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
306             return executor.shutdownGracefully(quietPeriod, timeout, unit);
307         }
308 
309         @Override
310         public Future<?> terminationFuture() {
311             return executor.terminationFuture();
312         }
313 
314         @Override
315         public void shutdown() {
316             executor.shutdown();
317         }
318 
319         @Override
320         public boolean isShutdown() {
321             return executor.isShutdown();
322         }
323 
324         @Override
325         public boolean isTerminated() {
326             return executor.isTerminated();
327         }
328 
329         @Override
330         public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
331             return executor.awaitTermination(timeout, unit);
332         }
333 
334         @Override
335         public void execute(Runnable command) {
336             if (!tasks.offer(command)) {
337                 throw new RejectedExecutionException();
338             }
339             if (state.compareAndSet(NONE, SUBMITTED)) {
340                 // Actually it could happen that the runnable was picked up in between but we not care to much and just
341                 // execute ourself. At worst this will be a NOOP when run() is called.
342                 executor.execute(this);
343             }
344         }
345     }
346 }