View Javadoc
1   /*
2    * Copyright 2013 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  
17  package io.netty.util.concurrent;
18  
19  import io.netty.util.internal.CallableEventExecutorAdapter;
20  import io.netty.util.internal.OneTimeTask;
21  
22  import java.util.Queue;
23  import java.util.concurrent.Callable;
24  import java.util.concurrent.Delayed;
25  import java.util.concurrent.TimeUnit;
26  import java.util.concurrent.atomic.AtomicLong;
27  
28  @SuppressWarnings("ComparableImplementedButEqualsNotOverridden")
29  final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFuture<V> {
30      private static final AtomicLong nextTaskId = new AtomicLong();
31      private static final long START_TIME = System.nanoTime();
32  
33      static long nanoTime() {
34          return System.nanoTime() - START_TIME;
35      }
36  
37      static long deadlineNanos(long delay) {
38          return nanoTime() + delay;
39      }
40  
41      private final long id = nextTaskId.getAndIncrement();
42      private long deadlineNanos;
43      /* 0 - no repeat, >0 - repeat at fixed rate, <0 - repeat with fixed delay */
44      private final long periodNanos;
45  
46      ScheduledFutureTask(EventExecutor executor,
47                          Callable<V> callable, long nanoTime, long period) {
48          super(executor.unwrap(), callable);
49          if (period == 0) {
50              throw new IllegalArgumentException("period: 0 (expected: != 0)");
51          }
52  
53          deadlineNanos = nanoTime;
54          periodNanos = period;
55      }
56  
57      ScheduledFutureTask(EventExecutor executor,
58                          Callable<V> callable, long nanoTime) {
59          super(executor.unwrap(), callable);
60          deadlineNanos = nanoTime;
61          periodNanos = 0;
62      }
63  
64      public long deadlineNanos() {
65          return deadlineNanos;
66      }
67  
68      public long delayNanos() {
69          return Math.max(0, deadlineNanos() - nanoTime());
70      }
71  
72      public long delayNanos(long currentTimeNanos) {
73          return Math.max(0, deadlineNanos() - (currentTimeNanos - START_TIME));
74      }
75  
76      @Override
77      public long getDelay(TimeUnit unit) {
78          return unit.convert(delayNanos(), TimeUnit.NANOSECONDS);
79      }
80  
81      @Override
82      public int compareTo(Delayed o) {
83          if (this == o) {
84              return 0;
85          }
86  
87          ScheduledFutureTask<?> that = (ScheduledFutureTask<?>) o;
88          long d = deadlineNanos() - that.deadlineNanos();
89          if (d < 0) {
90              return -1;
91          } else if (d > 0) {
92              return 1;
93          } else if (id < that.id) {
94              return -1;
95          } else if (id == that.id) {
96              throw new Error();
97          } else {
98              return 1;
99          }
100     }
101 
102     @Override
103     public void run() {
104         assert executor().inEventLoop();
105 
106         try {
107             if (isMigrationPending()) {
108                 scheduleWithNewExecutor();
109             } else if (needsLaterExecution()) {
110                 if (!executor().isShutdown()) {
111                     // Try again in ten microseconds.
112                     deadlineNanos = nanoTime() + TimeUnit.MICROSECONDS.toNanos(10);
113                     if (!isCancelled()) {
114                         // scheduledTaskQueue can never be null as we lazy init it before submit the task!
115                         Queue<ScheduledFutureTask<?>> scheduledTaskQueue =
116                                 ((AbstractScheduledEventExecutor) executor()).scheduledTaskQueue;
117                         assert scheduledTaskQueue != null;
118                         scheduledTaskQueue.add(this);
119                     }
120                 }
121             } else {
122                 // delayed tasks executed once
123                 if (periodNanos == 0) {
124                     if (setUncancellableInternal()) {
125                         V result = task.call();
126                         setSuccessInternal(result);
127                     }
128                     // periodically executed tasks
129                 } else {
130                     // check if is done as it may was cancelled
131                     if (!isCancelled()) {
132                         task.call();
133                         if (!executor().isShutdown()) {
134                             long p = periodNanos;
135                             if (p > 0) {
136                                 deadlineNanos += p;
137                             } else {
138                                 deadlineNanos = nanoTime() - p;
139                             }
140                             if (!isCancelled()) {
141                                 // scheduledTaskQueue can never be null as we lazy init it before submit the task!
142                                 Queue<ScheduledFutureTask<?>> scheduledTaskQueue =
143                                         ((AbstractScheduledEventExecutor) executor()).scheduledTaskQueue;
144                                 assert scheduledTaskQueue != null;
145                                 scheduledTaskQueue.add(this);
146                             }
147                         }
148                     }
149                 }
150             }
151         } catch (Throwable cause) {
152             setFailureInternal(cause);
153         }
154     }
155 
156     @Override
157     protected StringBuilder toStringBuilder() {
158         StringBuilder buf = super.toStringBuilder();
159         buf.setCharAt(buf.length() - 1, ',');
160 
161         return buf.append(" id: ")
162                   .append(id)
163                   .append(", deadline: ")
164                   .append(deadlineNanos)
165                   .append(", period: ")
166                   .append(periodNanos)
167                   .append(')');
168     }
169 
170     /**
171      * When this condition is met it usually means that the channel associated with this task
172      * was deregistered from its eventloop and has not yet been registered with another eventloop.
173      */
174     private boolean needsLaterExecution() {
175         return task instanceof CallableEventExecutorAdapter &&
176                 ((CallableEventExecutorAdapter<?>) task).executor() instanceof PausableEventExecutor &&
177                 !((PausableEventExecutor) ((CallableEventExecutorAdapter<?>) task).executor()).isAcceptingNewTasks();
178     }
179 
180     /**
181      * When this condition is met it usually means that the channel associated with this task
182      * was re-registered with another eventloop, after having been deregistered beforehand.
183      */
184     private boolean isMigrationPending() {
185         return !isCancelled() &&
186                 task instanceof CallableEventExecutorAdapter &&
187                 executor() != ((CallableEventExecutorAdapter<?>) task).executor().unwrap();
188     }
189 
190     private void scheduleWithNewExecutor() {
191         EventExecutor newExecutor = ((CallableEventExecutorAdapter<?>) task).executor().unwrap();
192 
193         if (newExecutor instanceof SingleThreadEventExecutor) {
194             if (!newExecutor.isShutdown()) {
195                 executor = newExecutor;
196                 final Queue<ScheduledFutureTask<?>> scheduledTaskQueue
197                         = ((SingleThreadEventExecutor) newExecutor).scheduledTaskQueue();
198 
199                 executor.execute(new OneTimeTask() {
200                     @Override
201                     public void run() {
202                         // Execute as soon as possible.
203                         deadlineNanos = nanoTime();
204                         if (!isCancelled()) {
205                             scheduledTaskQueue.add(ScheduledFutureTask.this);
206                         }
207                     }
208                 });
209             }
210         } else {
211             throw new UnsupportedOperationException("task migration unsupported");
212         }
213     }
214 }