View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel;
17  
18  import io.netty.util.concurrent.RejectedExecutionHandler;
19  import io.netty.util.concurrent.RejectedExecutionHandlers;
20  import io.netty.util.concurrent.SingleThreadEventExecutor;
21  import io.netty.util.internal.ObjectUtil;
22  import io.netty.util.internal.SystemPropertyUtil;
23  import io.netty.util.internal.UnstableApi;
24  
25  import java.util.Iterator;
26  import java.util.NoSuchElementException;
27  import java.util.Queue;
28  import java.util.concurrent.Executor;
29  import java.util.concurrent.ThreadFactory;
30  
31  /**
32   * Abstract base class for {@link EventLoop}s that execute all its submitted tasks in a single thread.
33   */
34  public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
35  
36      protected static final int DEFAULT_MAX_PENDING_TASKS = Math.max(16,
37              SystemPropertyUtil.getInt("io.netty.eventLoop.maxPendingTasks", Integer.MAX_VALUE));
38  
39      private final Queue<Runnable> tailTasks;
40  
41      protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
42          this(parent, threadFactory, addTaskWakesUp, DEFAULT_MAX_PENDING_TASKS, RejectedExecutionHandlers.reject());
43      }
44  
45      protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory,
46                                      boolean addTaskWakesUp, boolean supportSuspension) {
47          this(parent, threadFactory, addTaskWakesUp, supportSuspension, DEFAULT_MAX_PENDING_TASKS,
48                  RejectedExecutionHandlers.reject());
49      }
50  
51      protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor, boolean addTaskWakesUp) {
52          this(parent, executor, addTaskWakesUp, false);
53      }
54  
55      protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
56                                      boolean addTaskWakesUp, boolean supportSuspension) {
57          this(parent, executor, addTaskWakesUp, supportSuspension, DEFAULT_MAX_PENDING_TASKS,
58                  RejectedExecutionHandlers.reject());
59      }
60  
61      protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory,
62                                      boolean addTaskWakesUp, int maxPendingTasks,
63                                      RejectedExecutionHandler rejectedExecutionHandler) {
64          this(parent, threadFactory, addTaskWakesUp, false, maxPendingTasks, rejectedExecutionHandler);
65      }
66  
67      protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory,
68                                      boolean addTaskWakesUp, boolean supportSuspension, int maxPendingTasks,
69                                      RejectedExecutionHandler rejectedExecutionHandler) {
70          super(parent, threadFactory, addTaskWakesUp, supportSuspension, maxPendingTasks, rejectedExecutionHandler);
71          tailTasks = newTaskQueue(maxPendingTasks);
72      }
73  
74      protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
75                                      boolean addTaskWakesUp, int maxPendingTasks,
76                                      RejectedExecutionHandler rejectedExecutionHandler) {
77          this(parent, executor, addTaskWakesUp, false, maxPendingTasks, rejectedExecutionHandler);
78      }
79  
80      protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
81                                      boolean addTaskWakesUp, boolean supportSuspension, int maxPendingTasks,
82                                      RejectedExecutionHandler rejectedExecutionHandler) {
83          super(parent, executor, addTaskWakesUp, supportSuspension, maxPendingTasks, rejectedExecutionHandler);
84          tailTasks = newTaskQueue(maxPendingTasks);
85      }
86  
87      protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
88                                      boolean addTaskWakesUp, Queue<Runnable> taskQueue, Queue<Runnable> tailTaskQueue,
89                                      RejectedExecutionHandler rejectedExecutionHandler) {
90          this(parent, executor, addTaskWakesUp, false, taskQueue, tailTaskQueue, rejectedExecutionHandler);
91      }
92  
93      protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
94                                      boolean addTaskWakesUp, boolean supportSuspension,
95                                      Queue<Runnable> taskQueue, Queue<Runnable> tailTaskQueue,
96                                      RejectedExecutionHandler rejectedExecutionHandler) {
97          super(parent, executor, addTaskWakesUp, supportSuspension, taskQueue, rejectedExecutionHandler);
98          tailTasks = ObjectUtil.checkNotNull(tailTaskQueue, "tailTaskQueue");
99      }
100 
101     @Override
102     public EventLoopGroup parent() {
103         return (EventLoopGroup) super.parent();
104     }
105 
106     @Override
107     public EventLoop next() {
108         return (EventLoop) super.next();
109     }
110 
111     @Override
112     public ChannelFuture register(Channel channel) {
113         return register(new DefaultChannelPromise(channel, this));
114     }
115 
116     @Override
117     public ChannelFuture register(final ChannelPromise promise) {
118         ObjectUtil.checkNotNull(promise, "promise");
119         promise.channel().unsafe().register(this, promise);
120         return promise;
121     }
122 
123     @Deprecated
124     @Override
125     public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
126         ObjectUtil.checkNotNull(promise, "promise");
127         ObjectUtil.checkNotNull(channel, "channel");
128         channel.unsafe().register(this, promise);
129         return promise;
130     }
131 
132     /**
133      * Adds a task to be run once at the end of next (or current) {@code eventloop} iteration.
134      *
135      * @param task to be added.
136      */
137     public final void executeAfterEventLoopIteration(Runnable task) {
138         ObjectUtil.checkNotNull(task, "task");
139         if (isShutdown()) {
140             reject();
141         }
142 
143         if (!tailTasks.offer(task)) {
144             reject(task);
145         }
146 
147         if (wakesUpForTask(task)) {
148             wakeup(inEventLoop());
149         }
150     }
151 
152     /**
153      * Removes a task that was added previously via {@link #executeAfterEventLoopIteration(Runnable)}.
154      *
155      * @param task to be removed.
156      *
157      * @return {@code true} if the task was removed as a result of this call.
158      */
159     final boolean removeAfterEventLoopIterationTask(Runnable task) {
160         return tailTasks.remove(ObjectUtil.checkNotNull(task, "task"));
161     }
162 
163     @Override
164     protected void afterRunningAllTasks() {
165         runAllTasksFrom(tailTasks);
166     }
167 
168     @Override
169     protected boolean hasTasks() {
170         return super.hasTasks() || !tailTasks.isEmpty();
171     }
172 
173     @Override
174     public int pendingTasks() {
175         return super.pendingTasks() + tailTasks.size();
176     }
177 
178     /**
179      * Returns the number of {@link Channel}s registered with this {@link EventLoop} or {@code -1}
180      * if operation is not supported. The returned value is not guaranteed to be exact accurate and
181      * should be viewed as a best effort.
182      */
183     @UnstableApi
184     public int registeredChannels() {
185         return -1;
186     }
187 
188     /**
189      * @return read-only iterator of active {@link Channel}s registered with this {@link EventLoop}.
190      *         The returned value is not guaranteed to be exact accurate and
191      *         should be viewed as a best effort. This method is expected to be called from within
192      *         event loop.
193      * @throws UnsupportedOperationException if operation is not supported by implementation.
194      */
195     @UnstableApi
196     public Iterator<Channel> registeredChannelsIterator() {
197         throw new UnsupportedOperationException("registeredChannelsIterator");
198     }
199 
200     protected static final class ChannelsReadOnlyIterator<T extends Channel> implements Iterator<Channel> {
201         private final Iterator<T> channelIterator;
202 
203         public ChannelsReadOnlyIterator(Iterable<T> channelIterable) {
204             this.channelIterator =
205                     ObjectUtil.checkNotNull(channelIterable, "channelIterable").iterator();
206         }
207 
208         @Override
209         public boolean hasNext() {
210             return channelIterator.hasNext();
211         }
212 
213         @Override
214         public Channel next() {
215             return channelIterator.next();
216         }
217 
218         @Override
219         public void remove() {
220             throw new UnsupportedOperationException("remove");
221         }
222 
223         @SuppressWarnings("unchecked")
224         public static <T> Iterator<T> empty() {
225             return (Iterator<T>) EMPTY;
226         }
227 
228         private static final Iterator<Object> EMPTY = new Iterator<Object>() {
229             @Override
230             public boolean hasNext() {
231                 return false;
232             }
233 
234             @Override
235             public Object next() {
236                 throw new NoSuchElementException();
237             }
238 
239             @Override
240             public void remove() {
241                 throw new UnsupportedOperationException("remove");
242             }
243         };
244     }
245 }