View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel;
17  
18  import io.netty.util.concurrent.RejectedExecutionHandler;
19  import io.netty.util.concurrent.RejectedExecutionHandlers;
20  import io.netty.util.concurrent.SingleThreadEventExecutor;
21  import io.netty.util.internal.ObjectUtil;
22  import io.netty.util.internal.SystemPropertyUtil;
23  import io.netty.util.internal.UnstableApi;
24  
25  import java.util.Queue;
26  import java.util.concurrent.Executor;
27  import java.util.concurrent.ThreadFactory;
28  
29  /**
30   * Abstract base class for {@link EventLoop}s that execute all its submitted tasks in a single thread.
31   *
32   */
33  public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
34  
35      protected static final int DEFAULT_MAX_PENDING_TASKS = Math.max(16,
36              SystemPropertyUtil.getInt("io.netty.eventLoop.maxPendingTasks", Integer.MAX_VALUE));
37  
38      private final Queue<Runnable> tailTasks;
39  
40      protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
41          this(parent, threadFactory, addTaskWakesUp, DEFAULT_MAX_PENDING_TASKS, RejectedExecutionHandlers.reject());
42      }
43  
44      protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor, boolean addTaskWakesUp) {
45          this(parent, executor, addTaskWakesUp, DEFAULT_MAX_PENDING_TASKS, RejectedExecutionHandlers.reject());
46      }
47  
48      protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory,
49                                      boolean addTaskWakesUp, int maxPendingTasks,
50                                      RejectedExecutionHandler rejectedExecutionHandler) {
51          super(parent, threadFactory, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
52          tailTasks = newTaskQueue(maxPendingTasks);
53      }
54  
55      protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
56                                      boolean addTaskWakesUp, int maxPendingTasks,
57                                      RejectedExecutionHandler rejectedExecutionHandler) {
58          super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
59          tailTasks = newTaskQueue(maxPendingTasks);
60      }
61  
62      @Override
63      public EventLoopGroup parent() {
64          return (EventLoopGroup) super.parent();
65      }
66  
67      @Override
68      public EventLoop next() {
69          return (EventLoop) super.next();
70      }
71  
72      @Override
73      public ChannelFuture register(Channel channel) {
74          return register(new DefaultChannelPromise(channel, this));
75      }
76  
77      @Override
78      public ChannelFuture register(final ChannelPromise promise) {
79          ObjectUtil.checkNotNull(promise, "promise");
80          promise.channel().unsafe().register(this, promise);
81          return promise;
82      }
83  
84      @Deprecated
85      @Override
86      public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
87          if (channel == null) {
88              throw new NullPointerException("channel");
89          }
90          if (promise == null) {
91              throw new NullPointerException("promise");
92          }
93  
94          channel.unsafe().register(this, promise);
95          return promise;
96      }
97  
98      /**
99       * Adds a task to be run once at the end of next (or current) {@code eventloop} iteration.
100      *
101      * @param task to be added.
102      */
103     @UnstableApi
104     public final void executeAfterEventLoopIteration(Runnable task) {
105         ObjectUtil.checkNotNull(task, "task");
106         if (isShutdown()) {
107             reject();
108         }
109 
110         if (!tailTasks.offer(task)) {
111             reject(task);
112         }
113 
114         if (wakesUpForTask(task)) {
115             wakeup(inEventLoop());
116         }
117     }
118 
119     /**
120      * Removes a task that was added previously via {@link #executeAfterEventLoopIteration(Runnable)}.
121      *
122      * @param task to be removed.
123      *
124      * @return {@code true} if the task was removed as a result of this call.
125      */
126     @UnstableApi
127     final boolean removeAfterEventLoopIterationTask(Runnable task) {
128         return tailTasks.remove(ObjectUtil.checkNotNull(task, "task"));
129     }
130 
131     @Override
132     protected boolean wakesUpForTask(Runnable task) {
133         return !(task instanceof NonWakeupRunnable);
134     }
135 
136     @Override
137     protected void afterRunningAllTasks() {
138         runAllTasksFrom(tailTasks);
139     }
140 
141     @Override
142     protected boolean hasTasks() {
143         return super.hasTasks() || !tailTasks.isEmpty();
144     }
145 
146     @Override
147     public int pendingTasks() {
148         return super.pendingTasks() + tailTasks.size();
149     }
150 
151     /**
152      * Marker interface for {@link Runnable} that will not trigger an {@link #wakeup(boolean)} in all cases.
153      */
154     interface NonWakeupRunnable extends Runnable { }
155 }