View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel;
17  
18  import io.netty.util.concurrent.RejectedExecutionHandler;
19  import io.netty.util.concurrent.RejectedExecutionHandlers;
20  import io.netty.util.concurrent.SingleThreadEventExecutor;
21  import io.netty.util.internal.ObjectUtil;
22  import io.netty.util.internal.SystemPropertyUtil;
23  import io.netty.util.internal.UnstableApi;
24  
25  import java.util.Iterator;
26  import java.util.NoSuchElementException;
27  import java.util.Queue;
28  import java.util.concurrent.Executor;
29  import java.util.concurrent.ThreadFactory;
30  
31  /**
32   * Abstract base class for {@link EventLoop}s that execute all its submitted tasks in a single thread.
33   */
34  public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
35  
36      protected static final int DEFAULT_MAX_PENDING_TASKS = Math.max(16,
37              SystemPropertyUtil.getInt("io.netty.eventLoop.maxPendingTasks", Integer.MAX_VALUE));
38  
39      private final Queue<Runnable> tailTasks;
40  
41      protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
42          this(parent, threadFactory, addTaskWakesUp, DEFAULT_MAX_PENDING_TASKS, RejectedExecutionHandlers.reject());
43      }
44  
45      protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor, boolean addTaskWakesUp) {
46          this(parent, executor, addTaskWakesUp, DEFAULT_MAX_PENDING_TASKS, RejectedExecutionHandlers.reject());
47      }
48  
49      protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory,
50                                      boolean addTaskWakesUp, int maxPendingTasks,
51                                      RejectedExecutionHandler rejectedExecutionHandler) {
52          super(parent, threadFactory, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
53          tailTasks = newTaskQueue(maxPendingTasks);
54      }
55  
56      protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
57                                      boolean addTaskWakesUp, int maxPendingTasks,
58                                      RejectedExecutionHandler rejectedExecutionHandler) {
59          super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
60          tailTasks = newTaskQueue(maxPendingTasks);
61      }
62  
63      protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
64                                      boolean addTaskWakesUp, Queue<Runnable> taskQueue, Queue<Runnable> tailTaskQueue,
65                                      RejectedExecutionHandler rejectedExecutionHandler) {
66          super(parent, executor, addTaskWakesUp, taskQueue, rejectedExecutionHandler);
67          tailTasks = ObjectUtil.checkNotNull(tailTaskQueue, "tailTaskQueue");
68      }
69  
70      @Override
71      public EventLoopGroup parent() {
72          return (EventLoopGroup) super.parent();
73      }
74  
75      @Override
76      public EventLoop next() {
77          return (EventLoop) super.next();
78      }
79  
80      @Override
81      public ChannelFuture register(Channel channel) {
82          return register(new DefaultChannelPromise(channel, this));
83      }
84  
85      @Override
86      public ChannelFuture register(final ChannelPromise promise) {
87          ObjectUtil.checkNotNull(promise, "promise");
88          promise.channel().unsafe().register(this, promise);
89          return promise;
90      }
91  
92      @Deprecated
93      @Override
94      public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
95          ObjectUtil.checkNotNull(promise, "promise");
96          ObjectUtil.checkNotNull(channel, "channel");
97          channel.unsafe().register(this, promise);
98          return promise;
99      }
100 
101     /**
102      * Adds a task to be run once at the end of next (or current) {@code eventloop} iteration.
103      *
104      * @param task to be added.
105      */
106     public final void executeAfterEventLoopIteration(Runnable task) {
107         ObjectUtil.checkNotNull(task, "task");
108         if (isShutdown()) {
109             reject();
110         }
111 
112         if (!tailTasks.offer(task)) {
113             reject(task);
114         }
115 
116         if (wakesUpForTask(task)) {
117             wakeup(inEventLoop());
118         }
119     }
120 
121     /**
122      * Removes a task that was added previously via {@link #executeAfterEventLoopIteration(Runnable)}.
123      *
124      * @param task to be removed.
125      *
126      * @return {@code true} if the task was removed as a result of this call.
127      */
128     final boolean removeAfterEventLoopIterationTask(Runnable task) {
129         return tailTasks.remove(ObjectUtil.checkNotNull(task, "task"));
130     }
131 
132     @Override
133     protected void afterRunningAllTasks() {
134         runAllTasksFrom(tailTasks);
135     }
136 
137     @Override
138     protected boolean hasTasks() {
139         return super.hasTasks() || !tailTasks.isEmpty();
140     }
141 
142     @Override
143     public int pendingTasks() {
144         return super.pendingTasks() + tailTasks.size();
145     }
146 
147     /**
148      * Returns the number of {@link Channel}s registered with this {@link EventLoop} or {@code -1}
149      * if operation is not supported. The returned value is not guaranteed to be exact accurate and
150      * should be viewed as a best effort.
151      */
152     @UnstableApi
153     public int registeredChannels() {
154         return -1;
155     }
156 
157     /**
158      * @return read-only iterator of active {@link Channel}s registered with this {@link EventLoop}.
159      *         The returned value is not guaranteed to be exact accurate and
160      *         should be viewed as a best effort. This method is expected to be called from within
161      *         event loop.
162      * @throws UnsupportedOperationException if operation is not supported by implementation.
163      */
164     @UnstableApi
165     public Iterator<Channel> registeredChannelsIterator() {
166         throw new UnsupportedOperationException("registeredChannelsIterator");
167     }
168 
169     protected static final class ChannelsReadOnlyIterator<T extends Channel> implements Iterator<Channel> {
170         private final Iterator<T> channelIterator;
171 
172         public ChannelsReadOnlyIterator(Iterable<T> channelIterable) {
173             this.channelIterator =
174                     ObjectUtil.checkNotNull(channelIterable, "channelIterable").iterator();
175         }
176 
177         @Override
178         public boolean hasNext() {
179             return channelIterator.hasNext();
180         }
181 
182         @Override
183         public Channel next() {
184             return channelIterator.next();
185         }
186 
187         @Override
188         public void remove() {
189             throw new UnsupportedOperationException("remove");
190         }
191 
192         @SuppressWarnings("unchecked")
193         public static <T> Iterator<T> empty() {
194             return (Iterator<T>) EMPTY;
195         }
196 
197         private static final Iterator<Object> EMPTY = new Iterator<Object>() {
198             @Override
199             public boolean hasNext() {
200                 return false;
201             }
202 
203             @Override
204             public Object next() {
205                 throw new NoSuchElementException();
206             }
207 
208             @Override
209             public void remove() {
210                 throw new UnsupportedOperationException("remove");
211             }
212         };
213     }
214 }