1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel;
17
18 import io.netty.util.concurrent.RejectedExecutionHandler;
19 import io.netty.util.concurrent.RejectedExecutionHandlers;
20 import io.netty.util.concurrent.SingleThreadEventExecutor;
21 import io.netty.util.internal.ObjectUtil;
22 import io.netty.util.internal.SystemPropertyUtil;
23 import io.netty.util.internal.UnstableApi;
24
25 import java.util.Iterator;
26 import java.util.NoSuchElementException;
27 import java.util.Queue;
28 import java.util.concurrent.Executor;
29 import java.util.concurrent.ThreadFactory;
30
31
32
33
34
35 public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
36
37 protected static final int DEFAULT_MAX_PENDING_TASKS = Math.max(16,
38 SystemPropertyUtil.getInt("io.netty.eventLoop.maxPendingTasks", Integer.MAX_VALUE));
39
40 private final Queue<Runnable> tailTasks;
41
42 protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
43 this(parent, threadFactory, addTaskWakesUp, DEFAULT_MAX_PENDING_TASKS, RejectedExecutionHandlers.reject());
44 }
45
46 protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor, boolean addTaskWakesUp) {
47 this(parent, executor, addTaskWakesUp, DEFAULT_MAX_PENDING_TASKS, RejectedExecutionHandlers.reject());
48 }
49
50 protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory,
51 boolean addTaskWakesUp, int maxPendingTasks,
52 RejectedExecutionHandler rejectedExecutionHandler) {
53 super(parent, threadFactory, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
54 tailTasks = newTaskQueue(maxPendingTasks);
55 }
56
57 protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
58 boolean addTaskWakesUp, int maxPendingTasks,
59 RejectedExecutionHandler rejectedExecutionHandler) {
60 super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
61 tailTasks = newTaskQueue(maxPendingTasks);
62 }
63
64 protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
65 boolean addTaskWakesUp, Queue<Runnable> taskQueue, Queue<Runnable> tailTaskQueue,
66 RejectedExecutionHandler rejectedExecutionHandler) {
67 super(parent, executor, addTaskWakesUp, taskQueue, rejectedExecutionHandler);
68 tailTasks = ObjectUtil.checkNotNull(tailTaskQueue, "tailTaskQueue");
69 }
70
71 @Override
72 public EventLoopGroup parent() {
73 return (EventLoopGroup) super.parent();
74 }
75
76 @Override
77 public EventLoop next() {
78 return (EventLoop) super.next();
79 }
80
81 @Override
82 public ChannelFuture register(Channel channel) {
83 return register(new DefaultChannelPromise(channel, this));
84 }
85
86 @Override
87 public ChannelFuture register(final ChannelPromise promise) {
88 ObjectUtil.checkNotNull(promise, "promise");
89 promise.channel().unsafe().register(this, promise);
90 return promise;
91 }
92
93 @Deprecated
94 @Override
95 public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
96 ObjectUtil.checkNotNull(promise, "promise");
97 ObjectUtil.checkNotNull(channel, "channel");
98 channel.unsafe().register(this, promise);
99 return promise;
100 }
101
102
103
104
105
106
107 public final void executeAfterEventLoopIteration(Runnable task) {
108 ObjectUtil.checkNotNull(task, "task");
109 if (isShutdown()) {
110 reject();
111 }
112
113 if (!tailTasks.offer(task)) {
114 reject(task);
115 }
116
117 if (wakesUpForTask(task)) {
118 wakeup(inEventLoop());
119 }
120 }
121
122
123
124
125
126
127
128
129 final boolean removeAfterEventLoopIterationTask(Runnable task) {
130 return tailTasks.remove(ObjectUtil.checkNotNull(task, "task"));
131 }
132
133 @Override
134 protected void afterRunningAllTasks() {
135 runAllTasksFrom(tailTasks);
136 }
137
138 @Override
139 protected boolean hasTasks() {
140 return super.hasTasks() || !tailTasks.isEmpty();
141 }
142
143 @Override
144 public int pendingTasks() {
145 return super.pendingTasks() + tailTasks.size();
146 }
147
148
149
150
151
152
153 @UnstableApi
154 public int registeredChannels() {
155 return -1;
156 }
157
158
159
160
161
162
163
164
165 @UnstableApi
166 public Iterator<Channel> registeredChannelsIterator() {
167 throw new UnsupportedOperationException("registeredChannelsIterator");
168 }
169
170 protected static final class ChannelsReadOnlyIterator<T extends Channel> implements Iterator<Channel> {
171 private final Iterator<T> channelIterator;
172
173 public ChannelsReadOnlyIterator(Iterable<T> channelIterable) {
174 this.channelIterator =
175 ObjectUtil.checkNotNull(channelIterable, "channelIterable").iterator();
176 }
177
178 @Override
179 public boolean hasNext() {
180 return channelIterator.hasNext();
181 }
182
183 @Override
184 public Channel next() {
185 return channelIterator.next();
186 }
187
188 @Override
189 public void remove() {
190 throw new UnsupportedOperationException("remove");
191 }
192
193 @SuppressWarnings("unchecked")
194 public static <T> Iterator<T> empty() {
195 return (Iterator<T>) EMPTY;
196 }
197
198 private static final Iterator<Object> EMPTY = new Iterator<Object>() {
199 @Override
200 public boolean hasNext() {
201 return false;
202 }
203
204 @Override
205 public Object next() {
206 throw new NoSuchElementException();
207 }
208
209 @Override
210 public void remove() {
211 throw new UnsupportedOperationException("remove");
212 }
213 };
214 }
215 }