1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel;
17
18 import io.netty.util.concurrent.RejectedExecutionHandler;
19 import io.netty.util.concurrent.RejectedExecutionHandlers;
20 import io.netty.util.concurrent.SingleThreadEventExecutor;
21 import io.netty.util.internal.ObjectUtil;
22 import io.netty.util.internal.SystemPropertyUtil;
23 import io.netty.util.internal.UnstableApi;
24
25 import java.util.Queue;
26 import java.util.concurrent.Executor;
27 import java.util.concurrent.ThreadFactory;
28
29
30
31
32
33 public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
34
35 protected static final int DEFAULT_MAX_PENDING_TASKS = Math.max(16,
36 SystemPropertyUtil.getInt("io.netty.eventLoop.maxPendingTasks", Integer.MAX_VALUE));
37
38 private final Queue<Runnable> tailTasks;
39
40 protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
41 this(parent, threadFactory, addTaskWakesUp, DEFAULT_MAX_PENDING_TASKS, RejectedExecutionHandlers.reject());
42 }
43
44 protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor, boolean addTaskWakesUp) {
45 this(parent, executor, addTaskWakesUp, DEFAULT_MAX_PENDING_TASKS, RejectedExecutionHandlers.reject());
46 }
47
48 protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory,
49 boolean addTaskWakesUp, int maxPendingTasks,
50 RejectedExecutionHandler rejectedExecutionHandler) {
51 super(parent, threadFactory, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
52 tailTasks = newTaskQueue(maxPendingTasks);
53 }
54
55 protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
56 boolean addTaskWakesUp, int maxPendingTasks,
57 RejectedExecutionHandler rejectedExecutionHandler) {
58 super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
59 tailTasks = newTaskQueue(maxPendingTasks);
60 }
61
62 protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
63 boolean addTaskWakesUp, Queue<Runnable> taskQueue, Queue<Runnable> tailTaskQueue,
64 RejectedExecutionHandler rejectedExecutionHandler) {
65 super(parent, executor, addTaskWakesUp, taskQueue, rejectedExecutionHandler);
66 tailTasks = ObjectUtil.checkNotNull(tailTaskQueue, "tailTaskQueue");
67 }
68
69 @Override
70 public EventLoopGroup parent() {
71 return (EventLoopGroup) super.parent();
72 }
73
74 @Override
75 public EventLoop next() {
76 return (EventLoop) super.next();
77 }
78
79 @Override
80 public ChannelFuture register(Channel channel) {
81 return register(new DefaultChannelPromise(channel, this));
82 }
83
84 @Override
85 public ChannelFuture register(final ChannelPromise promise) {
86 ObjectUtil.checkNotNull(promise, "promise");
87 promise.channel().unsafe().register(this, promise);
88 return promise;
89 }
90
91 @Deprecated
92 @Override
93 public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
94 ObjectUtil.checkNotNull(promise, "promise");
95 ObjectUtil.checkNotNull(channel, "channel");
96 channel.unsafe().register(this, promise);
97 return promise;
98 }
99
100
101
102
103
104
105 @UnstableApi
106 public final void executeAfterEventLoopIteration(Runnable task) {
107 ObjectUtil.checkNotNull(task, "task");
108 if (isShutdown()) {
109 reject();
110 }
111
112 if (!tailTasks.offer(task)) {
113 reject(task);
114 }
115
116 if (!(task instanceof LazyRunnable) && wakesUpForTask(task)) {
117 wakeup(inEventLoop());
118 }
119 }
120
121
122
123
124
125
126
127
128 @UnstableApi
129 final boolean removeAfterEventLoopIterationTask(Runnable task) {
130 return tailTasks.remove(ObjectUtil.checkNotNull(task, "task"));
131 }
132
133 @Override
134 protected void afterRunningAllTasks() {
135 runAllTasksFrom(tailTasks);
136 }
137
138 @Override
139 protected boolean hasTasks() {
140 return super.hasTasks() || !tailTasks.isEmpty();
141 }
142
143 @Override
144 public int pendingTasks() {
145 return super.pendingTasks() + tailTasks.size();
146 }
147
148
149
150
151
152
153 @UnstableApi
154 public int registeredChannels() {
155 return -1;
156 }
157 }