View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.util.concurrent;
17  
18  import io.netty.util.internal.PlatformDependent;
19  import io.netty.util.internal.logging.InternalLogger;
20  import io.netty.util.internal.logging.InternalLoggerFactory;
21  
22  import java.util.ArrayList;
23  import java.util.LinkedHashSet;
24  import java.util.List;
25  import java.util.Queue;
26  import java.util.Set;
27  import java.util.concurrent.BlockingQueue;
28  import java.util.concurrent.Executor;
29  import java.util.concurrent.Executors;
30  import java.util.concurrent.LinkedBlockingQueue;
31  import java.util.concurrent.RejectedExecutionException;
32  import java.util.concurrent.Semaphore;
33  import java.util.concurrent.TimeUnit;
34  import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
35  import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
36  
37  /**
38   * Abstract base class for {@link EventExecutor}'s that execute all its submitted tasks in a single thread.
39   *
40   */
41  public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor {
42  
43      private static final InternalLogger logger =
44              InternalLoggerFactory.getInstance(SingleThreadEventExecutor.class);
45  
46      private static final int ST_NOT_STARTED = 1;
47      private static final int ST_STARTED = 2;
48      private static final int ST_SHUTTING_DOWN = 3;
49      private static final int ST_SHUTDOWN = 4;
50      private static final int ST_TERMINATED = 5;
51  
52      private static final Runnable WAKEUP_TASK = new Runnable() {
53          @Override
54          public void run() {
55              // Do nothing.
56          }
57      };
58  
59      private static final AtomicIntegerFieldUpdater<SingleThreadEventExecutor> STATE_UPDATER;
60      private static final AtomicReferenceFieldUpdater<SingleThreadEventExecutor, Thread> THREAD_UPDATER;
61  
62      static {
63          AtomicIntegerFieldUpdater<SingleThreadEventExecutor> updater =
64                  PlatformDependent.newAtomicIntegerFieldUpdater(SingleThreadEventExecutor.class, "state");
65          if (updater == null) {
66              updater = AtomicIntegerFieldUpdater.newUpdater(SingleThreadEventExecutor.class, "state");
67          }
68          STATE_UPDATER = updater;
69  
70          AtomicReferenceFieldUpdater<SingleThreadEventExecutor, Thread> refUpdater =
71                  PlatformDependent.newAtomicReferenceFieldUpdater(SingleThreadEventExecutor.class, "thread");
72          if (refUpdater == null) {
73              refUpdater = AtomicReferenceFieldUpdater.newUpdater(
74                      SingleThreadEventExecutor.class, Thread.class, "thread");
75          }
76          THREAD_UPDATER = refUpdater;
77      }
78  
79      private final Queue<Runnable> taskQueue;
80  
81      @SuppressWarnings({ "FieldMayBeFinal", "unused" })
82      private volatile Thread thread;
83      private final Executor executor;
84      private final Semaphore threadLock = new Semaphore(0);
85      private final Set<Runnable> shutdownHooks = new LinkedHashSet<Runnable>();
86      private final boolean addTaskWakesUp;
87  
88      private long lastExecutionTime;
89  
90      @SuppressWarnings({ "FieldMayBeFinal", "unused" })
91      private volatile int state = ST_NOT_STARTED;
92  
93      private volatile long gracefulShutdownQuietPeriod;
94      private volatile long gracefulShutdownTimeout;
95      private long gracefulShutdownStartTime;
96  
97      private final Promise<?> terminationFuture = new DefaultPromise<Void>(GlobalEventExecutor.INSTANCE);
98  
99      private boolean firstRun = true;
100 
101     private final Runnable asRunnable = new Runnable() {
102         @Override
103         public void run() {
104             updateThread(Thread.currentThread());
105 
106             // lastExecutionTime must be set on the first run
107             // in order for shutdown to work correctly for the
108             // rare case that the eventloop did not execute
109             // a single task during its lifetime.
110             if (firstRun) {
111                 firstRun = false;
112                 updateLastExecutionTime();
113             }
114 
115             try {
116                 SingleThreadEventExecutor.this.run();
117             } catch (Throwable t) {
118                 logger.warn("Unexpected exception from an event executor: ", t);
119                 cleanupAndTerminate(false);
120             }
121         }
122     };
123 
124     /**
125      * @param parent            the {@link EventExecutorGroup} which is the parent of this instance and belongs to it.
126      * @param executor          the {@link Executor} which will be used for executing.
127      * @param addTaskWakesUp   {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up
128      *                         the executor thread.
129      */
130     protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp) {
131         super(parent);
132 
133         if (executor == null) {
134             throw new NullPointerException("executor");
135         }
136 
137         this.addTaskWakesUp = addTaskWakesUp;
138         this.executor = executor;
139         taskQueue = newTaskQueue();
140     }
141 
142     /**
143      * Create a new {@link Queue} which will holds the tasks to execute. This default implementation will return a
144      * {@link LinkedBlockingQueue} but if your sub-class of {@link SingleThreadEventExecutor} will not do any blocking
145      * calls on the this {@link Queue} it may make sense to {@code @Override} this and return some more performant
146      * implementation that does not support blocking operations at all.
147      */
148     protected Queue<Runnable> newTaskQueue() {
149         return new LinkedBlockingQueue<Runnable>();
150     }
151 
152     /**
153      * @see {@link Queue#poll()}
154      */
155     protected Runnable pollTask() {
156         assert inEventLoop();
157         for (;;) {
158             Runnable task = taskQueue.poll();
159             if (task == WAKEUP_TASK) {
160                 continue;
161             }
162             return task;
163         }
164     }
165 
166     /**
167      * Take the next {@link Runnable} from the task queue and so will block if no task is currently present.
168      * <p>
169      * Be aware that this method will throw an {@link UnsupportedOperationException} if the task queue, which was
170      * created via {@link #newTaskQueue()}, does not implement {@link BlockingQueue}.
171      * </p>
172      *
173      * @return {@code null} if the executor thread has been interrupted or waken up.
174      */
175     protected Runnable takeTask() {
176         assert inEventLoop();
177         if (!(taskQueue instanceof BlockingQueue)) {
178             throw new UnsupportedOperationException();
179         }
180 
181         BlockingQueue<Runnable> taskQueue = (BlockingQueue<Runnable>) this.taskQueue;
182         for (;;) {
183             ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
184             if (scheduledTask == null) {
185                 Runnable task = null;
186                 try {
187                     task = taskQueue.take();
188                     if (task == WAKEUP_TASK) {
189                         task = null;
190                     }
191                 } catch (InterruptedException e) {
192                     // Ignore
193                 }
194                 return task;
195             } else {
196                 long delayNanos = scheduledTask.delayNanos();
197                 Runnable task = null;
198                 if (delayNanos > 0) {
199                     try {
200                         task = taskQueue.poll(delayNanos, TimeUnit.NANOSECONDS);
201                     } catch (InterruptedException e) {
202                         // Waken up.
203                         return null;
204                     }
205                 }
206                 if (task == null) {
207                     // We need to fetch the scheduled tasks now as otherwise there may be a chance that
208                     // scheduled tasks are never executed if there is always one task in the taskQueue.
209                     // This is for example true for the read task of OIO Transport
210                     // See https://github.com/netty/netty/issues/1614
211                     fetchFromScheduledTaskQueue();
212                     task = taskQueue.poll();
213                 }
214 
215                 if (task != null) {
216                     return task;
217                 }
218             }
219         }
220     }
221 
222     private void fetchFromScheduledTaskQueue() {
223         if (hasScheduledTasks()) {
224             long nanoTime = AbstractScheduledEventExecutor.nanoTime();
225             for (;;) {
226                 Runnable scheduledTask = pollScheduledTask(nanoTime);
227                 if (scheduledTask == null) {
228                     break;
229                 }
230                 taskQueue.add(scheduledTask);
231             }
232         }
233     }
234 
235     /**
236      * @see {@link Queue#peek()}
237      */
238     protected Runnable peekTask() {
239         assert inEventLoop();
240         return taskQueue.peek();
241     }
242 
243     /**
244      * @see {@link Queue#isEmpty()}
245      */
246     protected boolean hasTasks() {
247         assert inEventLoop();
248         return !taskQueue.isEmpty();
249     }
250 
251     /**
252      * Return the number of tasks that are pending for processing.
253      *
254      * <strong>Be aware that this operation may be expensive as it depends on the internal implementation of the
255      * SingleThreadEventExecutor. So use it was care!</strong>
256      */
257     public final int pendingTasks() {
258         return taskQueue.size();
259     }
260 
261     /**
262      * Add a task to the task queue, or throws a {@link RejectedExecutionException} if this instance was shutdown
263      * before.
264      */
265     protected void addTask(Runnable task) {
266         if (task == null) {
267             throw new NullPointerException("task");
268         }
269         if (isShutdown()) {
270             reject();
271         }
272         taskQueue.add(task);
273     }
274 
275     /**
276      * @see {@link Queue#remove(Object)}
277      */
278     protected boolean removeTask(Runnable task) {
279         if (task == null) {
280             throw new NullPointerException("task");
281         }
282         return taskQueue.remove(task);
283     }
284 
285     /**
286      * Poll all tasks from the task queue and run them via {@link Runnable#run()} method.
287      *
288      * @return {@code true} if and only if at least one task was run
289      */
290     protected boolean runAllTasks() {
291         fetchFromScheduledTaskQueue();
292         Runnable task = pollTask();
293         if (task == null) {
294             return false;
295         }
296 
297         for (;;) {
298             try {
299                 task.run();
300             } catch (Throwable t) {
301                 logger.warn("A task raised an exception.", t);
302             }
303 
304             task = pollTask();
305             if (task == null) {
306                 lastExecutionTime = ScheduledFutureTask.nanoTime();
307                 return true;
308             }
309         }
310     }
311 
312     /**
313      * Poll all tasks from the task queue and run them via {@link Runnable#run()} method.  This method stops running
314      * the tasks in the task queue and returns if it ran longer than {@code timeoutNanos}.
315      */
316     protected boolean runAllTasks(long timeoutNanos) {
317         fetchFromScheduledTaskQueue();
318         Runnable task = pollTask();
319         if (task == null) {
320             return false;
321         }
322 
323         final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
324         long runTasks = 0;
325         long lastExecutionTime;
326         for (;;) {
327             try {
328                 task.run();
329             } catch (Throwable t) {
330                 logger.warn("A task raised an exception.", t);
331             }
332 
333             runTasks ++;
334 
335             // Check timeout every 64 tasks because nanoTime() is relatively expensive.
336             // XXX: Hard-coded value - will make it configurable if it is really a problem.
337             if ((runTasks & 0x3F) == 0) {
338                 lastExecutionTime = ScheduledFutureTask.nanoTime();
339                 if (lastExecutionTime >= deadline) {
340                     break;
341                 }
342             }
343 
344             task = pollTask();
345             if (task == null) {
346                 lastExecutionTime = ScheduledFutureTask.nanoTime();
347                 break;
348             }
349         }
350 
351         this.lastExecutionTime = lastExecutionTime;
352         return true;
353     }
354 
355     /**
356      * Returns the amount of time left until the scheduled task with the closest dead line is executed.
357      */
358     protected long delayNanos(long currentTimeNanos) {
359         ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
360         if (scheduledTask == null) {
361             return SCHEDULE_PURGE_INTERVAL;
362         }
363 
364         return scheduledTask.delayNanos(currentTimeNanos);
365     }
366 
367     /**
368      * Updates the internal timestamp that tells when a submitted task was executed most recently.
369      * {@link #runAllTasks()} and {@link #runAllTasks(long)} updates this timestamp automatically, and thus there's
370      * usually no need to call this method.  However, if you take the tasks manually using {@link #takeTask()} or
371      * {@link #pollTask()}, you have to call this method at the end of task execution loop for accurate quiet period
372      * checks.
373      */
374     protected void updateLastExecutionTime() {
375         lastExecutionTime = ScheduledFutureTask.nanoTime();
376     }
377 
378     /**
379      *
380      */
381     protected abstract void run();
382 
383     /**
384      * Do nothing, sub-classes may override
385      */
386     protected void cleanup() {
387         // NOOP
388     }
389 
390     protected void wakeup(boolean inEventLoop) {
391         if (!inEventLoop || STATE_UPDATER.get(this) == ST_SHUTTING_DOWN) {
392             taskQueue.add(WAKEUP_TASK);
393         }
394     }
395 
396     @Override
397     public boolean inEventLoop(Thread thread) {
398         return thread == this.thread;
399     }
400 
401     /**
402      * Add a {@link Runnable} which will be executed on shutdown of this instance
403      */
404     public void addShutdownHook(final Runnable task) {
405         if (inEventLoop()) {
406             shutdownHooks.add(task);
407         } else {
408             execute(new Runnable() {
409                 @Override
410                 public void run() {
411                     shutdownHooks.add(task);
412                 }
413             });
414         }
415     }
416 
417     /**
418      * Remove a previous added {@link Runnable} as a shutdown hook
419      */
420     public void removeShutdownHook(final Runnable task) {
421         if (inEventLoop()) {
422             shutdownHooks.remove(task);
423         } else {
424             execute(new Runnable() {
425                 @Override
426                 public void run() {
427                     shutdownHooks.remove(task);
428                 }
429             });
430         }
431     }
432 
433     private boolean runShutdownHooks() {
434         boolean ran = false;
435         // Note shutdown hooks can add / remove shutdown hooks.
436         while (!shutdownHooks.isEmpty()) {
437             List<Runnable> copy = new ArrayList<Runnable>(shutdownHooks);
438             shutdownHooks.clear();
439             for (Runnable task: copy) {
440                 try {
441                     task.run();
442                 } catch (Throwable t) {
443                     logger.warn("Shutdown hook raised an exception.", t);
444                 } finally {
445                     ran = true;
446                 }
447             }
448         }
449 
450         if (ran) {
451             lastExecutionTime = ScheduledFutureTask.nanoTime();
452         }
453 
454         return ran;
455     }
456 
457     @Override
458     public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
459         if (quietPeriod < 0) {
460             throw new IllegalArgumentException("quietPeriod: " + quietPeriod + " (expected >= 0)");
461         }
462         if (timeout < quietPeriod) {
463             throw new IllegalArgumentException(
464                     "timeout: " + timeout + " (expected >= quietPeriod (" + quietPeriod + "))");
465         }
466         if (unit == null) {
467             throw new NullPointerException("unit");
468         }
469 
470         if (isShuttingDown()) {
471             return terminationFuture();
472         }
473 
474         boolean inEventLoop = inEventLoop();
475         boolean wakeup;
476         int oldState;
477         for (;;) {
478             if (isShuttingDown()) {
479                 return terminationFuture();
480             }
481             int newState;
482             wakeup = true;
483             oldState = STATE_UPDATER.get(this);
484             if (inEventLoop) {
485                 newState = ST_SHUTTING_DOWN;
486             } else {
487                 switch (oldState) {
488                     case ST_NOT_STARTED:
489                     case ST_STARTED:
490                         newState = ST_SHUTTING_DOWN;
491                         break;
492                     default:
493                         newState = oldState;
494                         wakeup = false;
495                 }
496             }
497             if (STATE_UPDATER.compareAndSet(this, oldState, newState)) {
498                 break;
499             }
500         }
501         gracefulShutdownQuietPeriod = unit.toNanos(quietPeriod);
502         gracefulShutdownTimeout = unit.toNanos(timeout);
503 
504         if (oldState == ST_NOT_STARTED) {
505             scheduleExecution();
506         }
507 
508         if (wakeup) {
509             wakeup(inEventLoop);
510         }
511 
512         return terminationFuture();
513     }
514 
515     @Override
516     public Future<?> terminationFuture() {
517         return terminationFuture;
518     }
519 
520     @Override
521     @Deprecated
522     public void shutdown() {
523         if (isShutdown()) {
524             return;
525         }
526 
527         boolean inEventLoop = inEventLoop();
528         boolean wakeup;
529         int oldState;
530         for (;;) {
531             if (isShuttingDown()) {
532                 return;
533             }
534             int newState;
535             wakeup = true;
536             oldState = STATE_UPDATER.get(this);
537             if (inEventLoop) {
538                 newState = ST_SHUTDOWN;
539             } else {
540                 switch (oldState) {
541                     case ST_NOT_STARTED:
542                     case ST_STARTED:
543                     case ST_SHUTTING_DOWN:
544                         newState = ST_SHUTDOWN;
545                         break;
546                     default:
547                         newState = oldState;
548                         wakeup = false;
549                 }
550             }
551             if (STATE_UPDATER.compareAndSet(this, oldState, newState)) {
552                 break;
553             }
554         }
555 
556         if (oldState == ST_NOT_STARTED) {
557             scheduleExecution();
558         }
559 
560         if (wakeup) {
561             wakeup(inEventLoop);
562         }
563     }
564 
565     @Override
566     public boolean isShuttingDown() {
567         return STATE_UPDATER.get(this) >= ST_SHUTTING_DOWN;
568     }
569 
570     @Override
571     public boolean isShutdown() {
572         return STATE_UPDATER.get(this) >= ST_SHUTDOWN;
573     }
574 
575     @Override
576     public boolean isTerminated() {
577         return STATE_UPDATER.get(this) == ST_TERMINATED;
578     }
579 
580     /**
581      * Confirm that the shutdown if the instance should be done now!
582      */
583     protected boolean confirmShutdown() {
584         if (!isShuttingDown()) {
585             return false;
586         }
587 
588         if (!inEventLoop()) {
589             throw new IllegalStateException("must be invoked from an event loop");
590         }
591 
592         cancelScheduledTasks();
593 
594         if (gracefulShutdownStartTime == 0) {
595             gracefulShutdownStartTime = ScheduledFutureTask.nanoTime();
596         }
597 
598         if (runAllTasks() || runShutdownHooks()) {
599             if (isShutdown()) {
600                 // Executor shut down - no new tasks anymore.
601                 return true;
602             }
603 
604             // There were tasks in the queue. Wait a little bit more until no tasks are queued for the quiet period.
605             wakeup(true);
606             return false;
607         }
608 
609         final long nanoTime = ScheduledFutureTask.nanoTime();
610 
611         if (isShutdown() || nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout) {
612             return true;
613         }
614 
615         if (nanoTime - lastExecutionTime <= gracefulShutdownQuietPeriod) {
616             // Check if any tasks were added to the queue every 100ms.
617             // TODO: Change the behavior of takeTask() so that it returns on timeout.
618             wakeup(true);
619             try {
620                 Thread.sleep(100);
621             } catch (InterruptedException e) {
622                 // Ignore
623             }
624 
625             return false;
626         }
627 
628         // No tasks were added for last quiet period - hopefully safe to shut down.
629         // (Hopefully because we really cannot make a guarantee that there will be no execute() calls by a user.)
630         return true;
631     }
632 
633     @Override
634     public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
635         if (unit == null) {
636             throw new NullPointerException("unit");
637         }
638 
639         if (inEventLoop()) {
640             throw new IllegalStateException("cannot await termination of the current thread");
641         }
642 
643         if (threadLock.tryAcquire(timeout, unit)) {
644             threadLock.release();
645         }
646 
647         return isTerminated();
648     }
649 
650     @Override
651     public void execute(Runnable task) {
652         if (task == null) {
653             throw new NullPointerException("task");
654         }
655 
656         boolean inEventLoop = inEventLoop();
657         if (inEventLoop) {
658             addTask(task);
659         } else {
660             startExecution();
661             addTask(task);
662             if (isShutdown() && removeTask(task)) {
663                 reject();
664             }
665         }
666 
667         if (!addTaskWakesUp && wakesUpForTask(task)) {
668             wakeup(inEventLoop);
669         }
670     }
671 
672     @SuppressWarnings("unused")
673     protected boolean wakesUpForTask(Runnable task) {
674         return true;
675     }
676 
677     protected static void reject() {
678         throw new RejectedExecutionException("event executor terminated");
679     }
680 
681     // ScheduledExecutorService implementation
682 
683     private static final long SCHEDULE_PURGE_INTERVAL = TimeUnit.SECONDS.toNanos(1);
684 
685     protected void cleanupAndTerminate(boolean success) {
686         for (;;) {
687             int oldState = STATE_UPDATER.get(this);
688             if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
689                     this, oldState, ST_SHUTTING_DOWN)) {
690                 break;
691             }
692         }
693 
694         // Check if confirmShutdown() was called at the end of the loop.
695         if (success && gracefulShutdownStartTime == 0) {
696             logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
697                     SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +
698                     "before run() implementation terminates.");
699         }
700 
701         try {
702             // Run all remaining tasks and shutdown hooks.
703             for (;;) {
704                 if (confirmShutdown()) {
705                     break;
706                 }
707             }
708         } finally {
709             try {
710                 cleanup();
711             } finally {
712                 STATE_UPDATER.set(this, ST_TERMINATED);
713                 threadLock.release();
714                 if (!taskQueue.isEmpty()) {
715                     logger.warn(
716                             "An event executor terminated with " +
717                                     "non-empty task queue (" + taskQueue.size() + ')');
718                 }
719 
720                 firstRun = true;
721                 terminationFuture.setSuccess(null);
722             }
723         }
724     }
725 
726     private void startExecution() {
727         if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
728             if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
729                 schedule(new ScheduledFutureTask<Void>(
730                         this, Executors.<Void>callable(new PurgeTask(), null),
731                         ScheduledFutureTask.deadlineNanos(SCHEDULE_PURGE_INTERVAL), -SCHEDULE_PURGE_INTERVAL));
732                 scheduleExecution();
733             }
734         }
735     }
736 
737     protected final void scheduleExecution() {
738         updateThread(null);
739         executor.execute(asRunnable);
740     }
741 
742     private void updateThread(Thread t) {
743         THREAD_UPDATER.lazySet(this, t);
744     }
745 
746     private final class PurgeTask implements Runnable {
747         @Override
748         public void run() {
749             purgeCancelledScheduledTasks();
750         }
751     }
752 }