1 /*
2 * Copyright 2013 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License,
5 * version 2.0 (the "License"); you may not use this file except in compliance
6 * with the License. You may obtain a copy of the License at:
7 *
8 * https://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations
14 * under the License.
15 */
16 package io.netty5.util.concurrent;
17
18 import io.netty5.util.internal.logging.InternalLogger;
19 import io.netty5.util.internal.logging.InternalLoggerFactory;
20
21 import java.util.concurrent.Callable;
22 import java.util.concurrent.Executors;
23
24 import static java.util.Objects.requireNonNull;
25
26 /**
27 * Abstract base class for {@link EventExecutor} implementations.
28 */
29 public abstract class AbstractEventExecutor implements EventExecutor {
30 private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractEventExecutor.class);
31 static final long DEFAULT_SHUTDOWN_QUIET_PERIOD = 2;
32 static final long DEFAULT_SHUTDOWN_TIMEOUT = 15;
33
34 private final Future<?> successfulVoidFuture = DefaultPromise.newSuccessfulPromise(this, null).asFuture();
35
36 @Override
37 public <V> Future<V> newSucceededFuture(V result) {
38 if (result == null) {
39 @SuppressWarnings("unchecked")
40 Future<V> f = (Future<V>) successfulVoidFuture;
41 return f;
42 }
43 return EventExecutor.super.newSucceededFuture(result);
44 }
45
46 @Override
47 public final Future<Void> submit(Runnable task) {
48 var futureTask = newTaskFor(task, (Void) null);
49 execute(futureTask);
50 return futureTask;
51 }
52
53 @Override
54 public final <T> Future<T> submit(Runnable task, T result) {
55 var futureTask = newTaskFor(task, result);
56 execute(futureTask);
57 return futureTask;
58 }
59
60 @Override
61 public final <T> Future<T> submit(Callable<T> task) {
62 var futureTask = newTaskFor(task);
63 execute(futureTask);
64 return futureTask;
65 }
66
67 /**
68 * Decorate the given {@link Runnable} and its return value, as a {@link RunnableFuture}, such that the
69 * returned {@link RunnableFuture} completes with the given result at the end of executing its
70 * {@link RunnableFuture#run()} method.
71 * <p>
72 * The returned {@link RunnableFuture} is the task that will actually be run by a thread in this
73 * executor.
74 * <p>
75 * This method can be overridden by sub-classes to hook into the life cycle of the given task.
76 *
77 * @param runnable The task to be decorated.
78 * @param value The value that the returned future will complete with, assuming the given {@link Runnable} doesn't
79 * throw an exception.
80 * @param <T> The type of the result value.
81 * @return The decorated {@link Runnable} that is now also a {@link Future}.
82 */
83 protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
84 return newRunnableFuture(newPromise(), runnable, value);
85 }
86
87 /**
88 * Decorate the given {@link Callable} and its return value, as a {@link RunnableFuture}, such that the
89 * returned {@link RunnableFuture} completes with the returned result from the {@link Callable} at the end of
90 * executing its {@link RunnableFuture#run()} method.
91 * <p>
92 * The returned {@link RunnableFuture} is the task that will actually be run by a thread in this
93 * executor.
94 * <p>
95 * This method can be overridden by sub-classes to hook into the life cycle of the given task.
96 *
97 * @param callable The task to be decorated.
98 * @param <T> The type of the result value.
99 * @return The decorated {@link Runnable} that is now also a {@link Future}.
100 */
101 protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
102 return newRunnableFuture(newPromise(), callable);
103 }
104
105 /**
106 * Try to execute the given {@link Runnable} and just log if it throws a {@link Throwable}.
107 */
108 static void safeExecute(Runnable task) {
109 try {
110 task.run();
111 } catch (Throwable t) {
112 logger.warn("A task raised an exception. Task: {}", task, t);
113 }
114 }
115
116 /**
117 * Returns a new {@link RunnableFuture} build on top of the given {@link Promise} and {@link Callable}.
118 *
119 * This can be used if you want to override {@link #newTaskFor(Callable)} and return a different
120 * {@link RunnableFuture}.
121 */
122 private static <V> RunnableFuture<V> newRunnableFuture(Promise<V> promise, Callable<V> task) {
123 return new RunnableFutureAdapter<>(promise, requireNonNull(task, "task"));
124 }
125
126 /**
127 * Returns a new {@link RunnableFuture} build on top of the given {@link Promise} and {@link Runnable} and
128 * {@code value}.
129 */
130 private static <V> RunnableFuture<V> newRunnableFuture(Promise<V> promise, Runnable task, V value) {
131 return new RunnableFutureAdapter<>(promise, Executors.callable(requireNonNull(task, "task"), value));
132 }
133 }