1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel;
17
18 import io.netty.util.concurrent.RejectedExecutionHandler;
19 import io.netty.util.concurrent.RejectedExecutionHandlers;
20 import io.netty.util.concurrent.SingleThreadEventExecutor;
21 import io.netty.util.internal.ObjectUtil;
22 import io.netty.util.internal.SystemPropertyUtil;
23 import io.netty.util.internal.UnstableApi;
24
25 import java.util.Iterator;
26 import java.util.NoSuchElementException;
27 import java.util.Queue;
28 import java.util.concurrent.Executor;
29 import java.util.concurrent.ThreadFactory;
30
31
32
33
34 public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
35
36 protected static final int DEFAULT_MAX_PENDING_TASKS = Math.max(16,
37 SystemPropertyUtil.getInt("io.netty.eventLoop.maxPendingTasks", Integer.MAX_VALUE));
38
39 private final Queue<Runnable> tailTasks;
40
41 protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
42 this(parent, threadFactory, addTaskWakesUp, DEFAULT_MAX_PENDING_TASKS, RejectedExecutionHandlers.reject());
43 }
44
45 protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory,
46 boolean addTaskWakesUp, boolean supportSuspension) {
47 this(parent, threadFactory, addTaskWakesUp, supportSuspension, DEFAULT_MAX_PENDING_TASKS,
48 RejectedExecutionHandlers.reject());
49 }
50
51 protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor, boolean addTaskWakesUp) {
52 this(parent, executor, addTaskWakesUp, false);
53 }
54
55 protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
56 boolean addTaskWakesUp, boolean supportSuspension) {
57 this(parent, executor, addTaskWakesUp, supportSuspension, DEFAULT_MAX_PENDING_TASKS,
58 RejectedExecutionHandlers.reject());
59 }
60
61 protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory,
62 boolean addTaskWakesUp, int maxPendingTasks,
63 RejectedExecutionHandler rejectedExecutionHandler) {
64 this(parent, threadFactory, addTaskWakesUp, false, maxPendingTasks, rejectedExecutionHandler);
65 }
66
67 protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory,
68 boolean addTaskWakesUp, boolean supportSuspension, int maxPendingTasks,
69 RejectedExecutionHandler rejectedExecutionHandler) {
70 super(parent, threadFactory, addTaskWakesUp, supportSuspension, maxPendingTasks, rejectedExecutionHandler);
71 tailTasks = newTaskQueue(maxPendingTasks);
72 }
73
74 protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
75 boolean addTaskWakesUp, int maxPendingTasks,
76 RejectedExecutionHandler rejectedExecutionHandler) {
77 this(parent, executor, addTaskWakesUp, false, maxPendingTasks, rejectedExecutionHandler);
78 }
79
80 protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
81 boolean addTaskWakesUp, boolean supportSuspension, int maxPendingTasks,
82 RejectedExecutionHandler rejectedExecutionHandler) {
83 super(parent, executor, addTaskWakesUp, supportSuspension, maxPendingTasks, rejectedExecutionHandler);
84 tailTasks = newTaskQueue(maxPendingTasks);
85 }
86
87 protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
88 boolean addTaskWakesUp, Queue<Runnable> taskQueue, Queue<Runnable> tailTaskQueue,
89 RejectedExecutionHandler rejectedExecutionHandler) {
90 this(parent, executor, addTaskWakesUp, false, taskQueue, tailTaskQueue, rejectedExecutionHandler);
91 }
92
93 protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
94 boolean addTaskWakesUp, boolean supportSuspension,
95 Queue<Runnable> taskQueue, Queue<Runnable> tailTaskQueue,
96 RejectedExecutionHandler rejectedExecutionHandler) {
97 super(parent, executor, addTaskWakesUp, supportSuspension, taskQueue, rejectedExecutionHandler);
98 tailTasks = ObjectUtil.checkNotNull(tailTaskQueue, "tailTaskQueue");
99 }
100
101 @Override
102 public EventLoopGroup parent() {
103 return (EventLoopGroup) super.parent();
104 }
105
106 @Override
107 public EventLoop next() {
108 return (EventLoop) super.next();
109 }
110
111 @Override
112 public ChannelFuture register(Channel channel) {
113 return register(new DefaultChannelPromise(channel, this));
114 }
115
116 @Override
117 public ChannelFuture register(final ChannelPromise promise) {
118 ObjectUtil.checkNotNull(promise, "promise");
119 promise.channel().unsafe().register(this, promise);
120 return promise;
121 }
122
123 @Deprecated
124 @Override
125 public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
126 ObjectUtil.checkNotNull(promise, "promise");
127 ObjectUtil.checkNotNull(channel, "channel");
128 channel.unsafe().register(this, promise);
129 return promise;
130 }
131
132
133
134
135
136
137 public final void executeAfterEventLoopIteration(Runnable task) {
138 ObjectUtil.checkNotNull(task, "task");
139 if (isShutdown()) {
140 reject();
141 }
142
143 if (!tailTasks.offer(task)) {
144 reject(task);
145 }
146
147 if (wakesUpForTask(task)) {
148 wakeup(inEventLoop());
149 }
150 }
151
152
153
154
155
156
157
158
159 final boolean removeAfterEventLoopIterationTask(Runnable task) {
160 return tailTasks.remove(ObjectUtil.checkNotNull(task, "task"));
161 }
162
163 @Override
164 protected void afterRunningAllTasks() {
165 runAllTasksFrom(tailTasks);
166 }
167
168 @Override
169 protected boolean hasTasks() {
170 return super.hasTasks() || !tailTasks.isEmpty();
171 }
172
173 @Override
174 public int pendingTasks() {
175 return super.pendingTasks() + tailTasks.size();
176 }
177
178
179
180
181
182
183 @UnstableApi
184 public int registeredChannels() {
185 return -1;
186 }
187
188
189
190
191
192
193
194
195 @UnstableApi
196 public Iterator<Channel> registeredChannelsIterator() {
197 throw new UnsupportedOperationException("registeredChannelsIterator");
198 }
199
200 protected static final class ChannelsReadOnlyIterator<T extends Channel> implements Iterator<Channel> {
201 private final Iterator<T> channelIterator;
202
203 public ChannelsReadOnlyIterator(Iterable<T> channelIterable) {
204 this.channelIterator =
205 ObjectUtil.checkNotNull(channelIterable, "channelIterable").iterator();
206 }
207
208 @Override
209 public boolean hasNext() {
210 return channelIterator.hasNext();
211 }
212
213 @Override
214 public Channel next() {
215 return channelIterator.next();
216 }
217
218 @Override
219 public void remove() {
220 throw new UnsupportedOperationException("remove");
221 }
222
223 @SuppressWarnings("unchecked")
224 public static <T> Iterator<T> empty() {
225 return (Iterator<T>) EMPTY;
226 }
227
228 private static final Iterator<Object> EMPTY = new Iterator<Object>() {
229 @Override
230 public boolean hasNext() {
231 return false;
232 }
233
234 @Override
235 public Object next() {
236 throw new NoSuchElementException();
237 }
238
239 @Override
240 public void remove() {
241 throw new UnsupportedOperationException("remove");
242 }
243 };
244 }
245 }