View Javadoc
1   /*
2    * Copyright 2016 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty5.util.concurrent;
17  
18  import io.netty5.util.internal.PlatformDependent;
19  import io.netty5.util.internal.UnstableApi;
20  
21  import java.util.Iterator;
22  import java.util.Queue;
23  import java.util.concurrent.Callable;
24  import java.util.concurrent.RejectedExecutionException;
25  import java.util.concurrent.TimeUnit;
26  import java.util.concurrent.atomic.AtomicInteger;
27  
28  import static io.netty5.util.internal.ObjectUtil.checkPositive;
29  import static java.util.Objects.requireNonNull;
30  
31  /**
32   * {@link EventExecutorGroup} which will preserve {@link Runnable} execution order but makes no guarantees about what
33   * {@link EventExecutor} (and therefore {@link Thread}) will be used to execute the {@link Runnable}s.
34   *
35   * <p>The {@link EventExecutorGroup#next()} for the wrapped {@link EventExecutorGroup} must <strong>NOT</strong> return
36   * executors of type {@link OrderedEventExecutor}.
37   */
38  @UnstableApi
39  public final class NonStickyEventExecutorGroup implements EventExecutorGroup {
40      private final EventExecutorGroup group;
41      private final int maxTaskExecutePerRun;
42  
43      /**
44       * Creates a new instance. Be aware that the given {@link EventExecutorGroup} <strong>MUST NOT</strong> contain
45       * any {@link OrderedEventExecutor}s.
46       */
47      public NonStickyEventExecutorGroup(EventExecutorGroup group) {
48          this(group, 1024);
49      }
50  
51      /**
52       * Creates a new instance. Be aware that the given {@link EventExecutorGroup} <strong>MUST NOT</strong> contain
53       * any {@link OrderedEventExecutor}s.
54       */
55      public NonStickyEventExecutorGroup(EventExecutorGroup group, int maxTaskExecutePerRun) {
56          this.group = verify(group);
57          this.maxTaskExecutePerRun = checkPositive(maxTaskExecutePerRun, "maxTaskExecutePerRun");
58      }
59  
60      private static EventExecutorGroup verify(EventExecutorGroup group) {
61          Iterator<EventExecutor> executors = requireNonNull(group, "group").iterator();
62          while (executors.hasNext()) {
63              EventExecutor executor = executors.next();
64              if (executor instanceof OrderedEventExecutor) {
65                  throw new IllegalArgumentException("EventExecutorGroup " + group
66                          + " contains OrderedEventExecutors: " + executor);
67              }
68          }
69          return group;
70      }
71  
72      private NonStickyOrderedEventExecutor newExecutor(EventExecutor executor) {
73          return new NonStickyOrderedEventExecutor(executor, maxTaskExecutePerRun);
74      }
75  
76      @Override
77      public boolean isShuttingDown() {
78          return group.isShuttingDown();
79      }
80  
81      @Override
82      public Future<Void> shutdownGracefully() {
83          return group.shutdownGracefully();
84      }
85  
86      @Override
87      public Future<Void> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
88          return group.shutdownGracefully(quietPeriod, timeout, unit);
89      }
90  
91      @Override
92      public Future<Void> terminationFuture() {
93          return group.terminationFuture();
94      }
95  
96      @Override
97      public EventExecutor next() {
98          return newExecutor(group.next());
99      }
100 
101     @Override
102     public Iterator<EventExecutor> iterator() {
103         final Iterator<EventExecutor> itr = group.iterator();
104         return new Iterator<>() {
105             @Override
106             public boolean hasNext() {
107                 return itr.hasNext();
108             }
109 
110             @Override
111             public EventExecutor next() {
112                 return newExecutor(itr.next());
113             }
114 
115             @Override
116             public void remove() {
117                 itr.remove();
118             }
119         };
120     }
121 
122     @Override
123     public Future<Void> submit(Runnable task) {
124         return group.submit(task);
125     }
126 
127     @Override
128     public <T> Future<T> submit(Runnable task, T result) {
129         return group.submit(task, result);
130     }
131 
132     @Override
133     public <T> Future<T> submit(Callable<T> task) {
134         return group.submit(task);
135     }
136 
137     @Override
138     public Future<Void> schedule(Runnable task, long delay, TimeUnit unit) {
139         return group.schedule(task, delay, unit);
140     }
141 
142     @Override
143     public <V> Future<V> schedule(Callable<V> task, long delay, TimeUnit unit) {
144         return group.schedule(task, delay, unit);
145     }
146 
147     @Override
148     public Future<Void> scheduleAtFixedRate(Runnable task, long initialDelay, long period, TimeUnit unit) {
149         return group.scheduleAtFixedRate(task, initialDelay, period, unit);
150     }
151 
152     @Override
153     public Future<Void> scheduleWithFixedDelay(Runnable task, long initialDelay, long delay, TimeUnit unit) {
154         return group.scheduleWithFixedDelay(task, initialDelay, delay, unit);
155     }
156 
157     @Override
158     public boolean isShutdown() {
159         return group.isShutdown();
160     }
161 
162     @Override
163     public boolean isTerminated() {
164         return group.isTerminated();
165     }
166 
167     @Override
168     public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
169         return group.awaitTermination(timeout, unit);
170     }
171 
172     @Override
173     public void execute(Runnable task) {
174         group.execute(task);
175     }
176 
177     private static final class NonStickyOrderedEventExecutor extends AbstractEventExecutor
178             implements Runnable, OrderedEventExecutor {
179         private final EventExecutor executor;
180         private final Queue<Runnable> tasks = PlatformDependent.newMpscQueue();
181 
182         private static final int NONE = 0;
183         private static final int SUBMITTED = 1;
184         private static final int RUNNING = 2;
185 
186         private final AtomicInteger state = new AtomicInteger();
187         private final int maxTaskExecutePerRun;
188 
189         NonStickyOrderedEventExecutor(EventExecutor executor, int maxTaskExecutePerRun) {
190             this.executor = executor;
191             this.maxTaskExecutePerRun = maxTaskExecutePerRun;
192         }
193 
194         @Override
195         public void run() {
196             if (!state.compareAndSet(SUBMITTED, RUNNING)) {
197                 return;
198             }
199             for (;;) {
200                 int i = 0;
201                 try {
202                     for (; i < maxTaskExecutePerRun; i++) {
203                         Runnable task = tasks.poll();
204                         if (task == null) {
205                             break;
206                         }
207                         safeExecute(task);
208                     }
209                 } finally {
210                     if (i == maxTaskExecutePerRun) {
211                         try {
212                             state.set(SUBMITTED);
213                             executor.execute(this);
214                             return; // done
215                         } catch (Throwable ignore) {
216                             // Reset the state back to running as we will keep on executing tasks.
217                             state.set(RUNNING);
218                             // if an error happened we should just ignore it and let the loop run again as there is not
219                             // much else we can do. Most likely this was triggered by a full task queue. In this case
220                             // we just will run more tasks and try again later.
221                         }
222                     } else {
223                         state.set(NONE);
224                         // After setting the state to NONE, look at the tasks queue one more time.
225                         // If it is empty, then we can return from this method.
226                         // Otherwise, it means the producer thread has called execute(Runnable)
227                         // and enqueued a task in between the tasks.poll() above and the state.set(NONE) here.
228                         // There are two possible scenarios when this happen
229                         //
230                         // 1. The producer thread sees state == NONE, hence the compareAndSet(NONE, SUBMITTED)
231                         //    is successfully setting the state to SUBMITTED. This mean the producer
232                         //    will call / has called executor.execute(this). In this case, we can just return.
233                         // 2. The producer thread don't see the state change, hence the compareAndSet(NONE, SUBMITTED)
234                         //    returns false. In this case, the producer thread won't call executor.execute.
235                         //    In this case, we need to change the state to RUNNING and keeps running.
236                         //
237                         // The above cases can be distinguished by performing a
238                         // compareAndSet(NONE, RUNNING). If it returns "false", it is case 1; otherwise it is case 2.
239                         if (tasks.isEmpty() || !state.compareAndSet(NONE, RUNNING)) {
240                             return; // done
241                         }
242                     }
243                 }
244             }
245         }
246 
247         @Override
248         public boolean inEventLoop(Thread thread) {
249             return false;
250         }
251 
252         @Override
253         public boolean isShuttingDown() {
254             return executor.isShutdown();
255         }
256 
257         @Override
258         public Future<Void> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
259             return executor.shutdownGracefully(quietPeriod, timeout, unit);
260         }
261 
262         @Override
263         public Future<Void> terminationFuture() {
264             return executor.terminationFuture();
265         }
266 
267         @Override
268         public boolean isShutdown() {
269             return executor.isShutdown();
270         }
271 
272         @Override
273         public boolean isTerminated() {
274             return executor.isTerminated();
275         }
276 
277         @Override
278         public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
279             return executor.awaitTermination(timeout, unit);
280         }
281 
282         @Override
283         public void execute(Runnable task) {
284             if (!tasks.offer(task)) {
285                 throw new RejectedExecutionException();
286             }
287             if (state.compareAndSet(NONE, SUBMITTED)) {
288                 // Actually it could happen that the runnable was picked up in between but we not care to much and just
289                 // execute ourself. At worst this will be a NOOP when run() is called.
290                 executor.execute(this);
291             }
292         }
293 
294         @Override
295         public Future<Void> schedule(Runnable task, long delay,
296                                      TimeUnit unit) {
297             throw new UnsupportedOperationException();
298         }
299 
300         @Override
301         public <V> Future<V> schedule(Callable<V> task, long delay, TimeUnit unit) {
302             throw new UnsupportedOperationException();
303         }
304 
305         @Override
306         public Future<Void> scheduleAtFixedRate(
307                 Runnable task, long initialDelay, long period, TimeUnit unit) {
308             throw new UnsupportedOperationException();
309         }
310 
311         @Override
312         public Future<Void> scheduleWithFixedDelay(
313                 Runnable task, long initialDelay, long delay, TimeUnit unit) {
314             throw new UnsupportedOperationException();
315         }
316     }
317 }