View Javadoc
1   /*
2    * Copyright 2013 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty5.util.concurrent;
17  
18  import io.netty5.util.internal.logging.InternalLogger;
19  import io.netty5.util.internal.logging.InternalLoggerFactory;
20  
21  import java.util.ArrayDeque;
22  import java.util.Queue;
23  import java.util.concurrent.Callable;
24  import java.util.concurrent.TimeUnit;
25  
26  import static java.util.Objects.requireNonNull;
27  
28  /**
29   * Executes {@link Runnable} objects in the caller's thread. If the {@link #execute(Runnable)} is reentrant it will be
30   * queued until the original {@link Runnable} finishes execution.
31   * <p>
32   * All {@link Throwable} objects thrown from {@link #execute(Runnable)} will be swallowed and logged. This is to ensure
33   * that all queued {@link Runnable} objects have the chance to be run.
34   */
35  public final class ImmediateEventExecutor extends AbstractEventExecutor {
36      private static final InternalLogger logger = InternalLoggerFactory.getInstance(ImmediateEventExecutor.class);
37      public static final ImmediateEventExecutor INSTANCE = new ImmediateEventExecutor();
38      /**
39       * A Runnable will be queued if we are executing a Runnable. This is to prevent a {@link StackOverflowError}.
40       */
41      private static final FastThreadLocal<Queue<Runnable>> DELAYED_RUNNABLES = new FastThreadLocal<>() {
42          @Override
43          protected Queue<Runnable> initialValue() throws Exception {
44              return new ArrayDeque<>();
45          }
46      };
47      /**
48       * Set to {@code true} if we are executing a runnable.
49       */
50      private static final FastThreadLocal<Boolean> RUNNING = new FastThreadLocal<>() {
51          @Override
52          protected Boolean initialValue() throws Exception {
53              return false;
54          }
55      };
56  
57      private final Future<Void> terminationFuture = DefaultPromise.<Void>newFailedPromise(
58              GlobalEventExecutor.INSTANCE, new UnsupportedOperationException()).asFuture();
59  
60      private ImmediateEventExecutor() { }
61  
62      @Override
63      public boolean inEventLoop(Thread thread) {
64          return true;
65      }
66  
67      @Override
68      public Future<Void> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
69          return terminationFuture();
70      }
71  
72      @Override
73      public Future<Void> terminationFuture() {
74          return terminationFuture;
75      }
76  
77      @Override
78      public boolean isShuttingDown() {
79          return false;
80      }
81  
82      @Override
83      public boolean isShutdown() {
84          return false;
85      }
86  
87      @Override
88      public boolean isTerminated() {
89          return false;
90      }
91  
92      @Override
93      public boolean awaitTermination(long timeout, TimeUnit unit) {
94          return false;
95      }
96  
97      @Override
98      public void execute(Runnable task) {
99          requireNonNull(task, "command");
100         if (!RUNNING.get()) {
101             RUNNING.set(true);
102             try {
103                 task.run();
104             } catch (Throwable cause) {
105                 logger.info("Throwable caught while executing Runnable {}", task, cause);
106             } finally {
107                 Queue<Runnable> delayedRunnables = DELAYED_RUNNABLES.get();
108                 Runnable runnable;
109                 while ((runnable = delayedRunnables.poll()) != null) {
110                     try {
111                         runnable.run();
112                     } catch (Throwable cause) {
113                         logger.info("Throwable caught while executing Runnable {}", runnable, cause);
114                     }
115                 }
116                 RUNNING.set(false);
117             }
118         } else {
119             DELAYED_RUNNABLES.get().add(task);
120         }
121     }
122 
123     @Override
124     public <V> Promise<V> newPromise() {
125         return new ImmediatePromise<>(this);
126     }
127 
128     @Override
129     public Future<Void> schedule(Runnable task, long delay,
130                                  TimeUnit unit) {
131         throw new UnsupportedOperationException();
132     }
133 
134     @Override
135     public <V> Future<V> schedule(Callable<V> task, long delay, TimeUnit unit) {
136         throw new UnsupportedOperationException();
137     }
138 
139     @Override
140     public Future<Void> scheduleAtFixedRate(Runnable task, long initialDelay, long period, TimeUnit unit) {
141         throw new UnsupportedOperationException();
142     }
143 
144     @Override
145     public Future<Void> scheduleWithFixedDelay(Runnable task, long initialDelay, long delay, TimeUnit unit) {
146         throw new UnsupportedOperationException();
147     }
148 
149     static class ImmediatePromise<V> extends DefaultPromise<V> {
150         ImmediatePromise(EventExecutor executor) {
151             super(executor);
152         }
153 
154         @Override
155         protected void checkDeadLock() {
156             // No check
157         }
158     }
159 }