View Javadoc
1   /*
2    * Copyright 2013 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.util.concurrent;
17  
18  import io.netty.util.internal.logging.InternalLogger;
19  import io.netty.util.internal.logging.InternalLoggerFactory;
20  
21  import java.util.ArrayDeque;
22  import java.util.Queue;
23  import java.util.concurrent.TimeUnit;
24  
25  /**
26   * Executes {@link Runnable} objects in the caller's thread. If the {@link #execute(Runnable)} is reentrant it will be
27   * queued until the original {@link Runnable} finishes execution.
28   * <p>
29   * All {@link Throwable} objects thrown from {@link #execute(Runnable)} will be swallowed and logged. This is to ensure
30   * that all queued {@link Runnable} objects have the chance to be run.
31   */
32  public final class ImmediateEventExecutor extends AbstractEventExecutor {
33      private static final InternalLogger logger = InternalLoggerFactory.getInstance(ImmediateEventExecutor.class);
34      public static final ImmediateEventExecutor INSTANCE = new ImmediateEventExecutor();
35      /**
36       * A Runnable will be queued if we are executing a Runnable. This is to prevent a {@link StackOverflowError}.
37       */
38      private static final FastThreadLocal<Queue<Runnable>> DELAYED_RUNNABLES = new FastThreadLocal<Queue<Runnable>>() {
39          @Override
40          protected Queue<Runnable> initialValue() throws Exception {
41              return new ArrayDeque<Runnable>();
42          }
43      };
44      /**
45       * Set to {@code true} if we are executing a runnable.
46       */
47      private static final FastThreadLocal<Boolean> RUNNING = new FastThreadLocal<Boolean>() {
48          @Override
49          protected Boolean initialValue() throws Exception {
50              return false;
51          }
52      };
53  
54      private final Future<?> terminationFuture = new FailedFuture<Object>(
55              GlobalEventExecutor.INSTANCE, new UnsupportedOperationException());
56  
57      private ImmediateEventExecutor() { }
58  
59      @Override
60      public EventExecutorGroup parent() {
61          return null;
62      }
63  
64      @Override
65      public boolean inEventLoop() {
66          return true;
67      }
68  
69      @Override
70      public boolean inEventLoop(Thread thread) {
71          return true;
72      }
73  
74      @Override
75      public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
76          return terminationFuture();
77      }
78  
79      @Override
80      public Future<?> terminationFuture() {
81          return terminationFuture;
82      }
83  
84      @Override
85      @Deprecated
86      public void shutdown() { }
87  
88      @Override
89      public boolean isShuttingDown() {
90          return false;
91      }
92  
93      @Override
94      public boolean isShutdown() {
95          return false;
96      }
97  
98      @Override
99      public boolean isTerminated() {
100         return false;
101     }
102 
103     @Override
104     public boolean awaitTermination(long timeout, TimeUnit unit) {
105         return false;
106     }
107 
108     @Override
109     public void execute(Runnable command) {
110         if (command == null) {
111             throw new NullPointerException("command");
112         }
113         if (!RUNNING.get()) {
114             RUNNING.set(true);
115             try {
116                 command.run();
117             } catch (Throwable cause) {
118                 logger.info("Throwable caught while executing Runnable {}", command, cause);
119             } finally {
120                 Queue<Runnable> delayedRunnables = DELAYED_RUNNABLES.get();
121                 Runnable runnable;
122                 while ((runnable = delayedRunnables.poll()) != null) {
123                     try {
124                         runnable.run();
125                     } catch (Throwable cause) {
126                         logger.info("Throwable caught while executing Runnable {}", runnable, cause);
127                     }
128                 }
129                 RUNNING.set(false);
130             }
131         } else {
132             DELAYED_RUNNABLES.get().add(command);
133         }
134     }
135 
136     @Override
137     public <V> Promise<V> newPromise() {
138         return new ImmediatePromise<V>(this);
139     }
140 
141     @Override
142     public <V> ProgressivePromise<V> newProgressivePromise() {
143         return new ImmediateProgressivePromise<V>(this);
144     }
145 
146     static class ImmediatePromise<V> extends DefaultPromise<V> {
147         ImmediatePromise(EventExecutor executor) {
148             super(executor);
149         }
150 
151         @Override
152         protected void checkDeadLock() {
153             // No check
154         }
155     }
156 
157     static class ImmediateProgressivePromise<V> extends DefaultProgressivePromise<V> {
158         ImmediateProgressivePromise(EventExecutor executor) {
159             super(executor);
160         }
161 
162         @Override
163         protected void checkDeadLock() {
164             // No check
165         }
166     }
167 }