1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel;
17
18 import io.netty.util.concurrent.RejectedExecutionHandler;
19 import io.netty.util.concurrent.RejectedExecutionHandlers;
20 import io.netty.util.concurrent.SingleThreadEventExecutor;
21 import io.netty.util.internal.ObjectUtil;
22 import io.netty.util.internal.SystemPropertyUtil;
23 import io.netty.util.internal.UnstableApi;
24
25 import java.util.Iterator;
26 import java.util.NoSuchElementException;
27 import java.util.Queue;
28 import java.util.concurrent.Executor;
29 import java.util.concurrent.ThreadFactory;
30
31
32
33
34 public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
35
36 protected static final int DEFAULT_MAX_PENDING_TASKS = Math.max(16,
37 SystemPropertyUtil.getInt("io.netty.eventLoop.maxPendingTasks", Integer.MAX_VALUE));
38
39 private final Queue<Runnable> tailTasks;
40
41 protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
42 this(parent, threadFactory, addTaskWakesUp, DEFAULT_MAX_PENDING_TASKS, RejectedExecutionHandlers.reject());
43 }
44
45 protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor, boolean addTaskWakesUp) {
46 this(parent, executor, addTaskWakesUp, DEFAULT_MAX_PENDING_TASKS, RejectedExecutionHandlers.reject());
47 }
48
49 protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory,
50 boolean addTaskWakesUp, int maxPendingTasks,
51 RejectedExecutionHandler rejectedExecutionHandler) {
52 super(parent, threadFactory, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
53 tailTasks = newTaskQueue(maxPendingTasks);
54 }
55
56 protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
57 boolean addTaskWakesUp, int maxPendingTasks,
58 RejectedExecutionHandler rejectedExecutionHandler) {
59 super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
60 tailTasks = newTaskQueue(maxPendingTasks);
61 }
62
63 protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
64 boolean addTaskWakesUp, Queue<Runnable> taskQueue, Queue<Runnable> tailTaskQueue,
65 RejectedExecutionHandler rejectedExecutionHandler) {
66 super(parent, executor, addTaskWakesUp, taskQueue, rejectedExecutionHandler);
67 tailTasks = ObjectUtil.checkNotNull(tailTaskQueue, "tailTaskQueue");
68 }
69
70 @Override
71 public EventLoopGroup parent() {
72 return (EventLoopGroup) super.parent();
73 }
74
75 @Override
76 public EventLoop next() {
77 return (EventLoop) super.next();
78 }
79
80 @Override
81 public ChannelFuture register(Channel channel) {
82 return register(new DefaultChannelPromise(channel, this));
83 }
84
85 @Override
86 public ChannelFuture register(final ChannelPromise promise) {
87 ObjectUtil.checkNotNull(promise, "promise");
88 promise.channel().unsafe().register(this, promise);
89 return promise;
90 }
91
92 @Deprecated
93 @Override
94 public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
95 ObjectUtil.checkNotNull(promise, "promise");
96 ObjectUtil.checkNotNull(channel, "channel");
97 channel.unsafe().register(this, promise);
98 return promise;
99 }
100
101
102
103
104
105
106 public final void executeAfterEventLoopIteration(Runnable task) {
107 ObjectUtil.checkNotNull(task, "task");
108 if (isShutdown()) {
109 reject();
110 }
111
112 if (!tailTasks.offer(task)) {
113 reject(task);
114 }
115
116 if (wakesUpForTask(task)) {
117 wakeup(inEventLoop());
118 }
119 }
120
121
122
123
124
125
126
127
128 final boolean removeAfterEventLoopIterationTask(Runnable task) {
129 return tailTasks.remove(ObjectUtil.checkNotNull(task, "task"));
130 }
131
132 @Override
133 protected void afterRunningAllTasks() {
134 runAllTasksFrom(tailTasks);
135 }
136
137 @Override
138 protected boolean hasTasks() {
139 return super.hasTasks() || !tailTasks.isEmpty();
140 }
141
142 @Override
143 public int pendingTasks() {
144 return super.pendingTasks() + tailTasks.size();
145 }
146
147
148
149
150
151
152 @UnstableApi
153 public int registeredChannels() {
154 return -1;
155 }
156
157
158
159
160
161
162
163
164 @UnstableApi
165 public Iterator<Channel> registeredChannelsIterator() {
166 throw new UnsupportedOperationException("registeredChannelsIterator");
167 }
168
169 protected static final class ChannelsReadOnlyIterator<T extends Channel> implements Iterator<Channel> {
170 private final Iterator<T> channelIterator;
171
172 public ChannelsReadOnlyIterator(Iterable<T> channelIterable) {
173 this.channelIterator =
174 ObjectUtil.checkNotNull(channelIterable, "channelIterable").iterator();
175 }
176
177 @Override
178 public boolean hasNext() {
179 return channelIterator.hasNext();
180 }
181
182 @Override
183 public Channel next() {
184 return channelIterator.next();
185 }
186
187 @Override
188 public void remove() {
189 throw new UnsupportedOperationException("remove");
190 }
191
192 @SuppressWarnings("unchecked")
193 public static <T> Iterator<T> empty() {
194 return (Iterator<T>) EMPTY;
195 }
196
197 private static final Iterator<Object> EMPTY = new Iterator<Object>() {
198 @Override
199 public boolean hasNext() {
200 return false;
201 }
202
203 @Override
204 public Object next() {
205 throw new NoSuchElementException();
206 }
207
208 @Override
209 public void remove() {
210 throw new UnsupportedOperationException("remove");
211 }
212 };
213 }
214 }