View Javadoc
1   /*
2    * Copyright 2025 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel;
17  
18  import io.netty.util.concurrent.AbstractScheduledEventExecutor;
19  import io.netty.util.concurrent.DefaultPromise;
20  import io.netty.util.concurrent.EventExecutor;
21  import io.netty.util.concurrent.Future;
22  import io.netty.util.concurrent.GlobalEventExecutor;
23  import io.netty.util.concurrent.Promise;
24  import io.netty.util.concurrent.Ticker;
25  import io.netty.util.internal.ObjectUtil;
26  import io.netty.util.internal.PlatformDependent;
27  import io.netty.util.internal.ThreadExecutorMap;
28  
29  import java.util.Collection;
30  import java.util.List;
31  import java.util.Objects;
32  import java.util.Queue;
33  import java.util.concurrent.Callable;
34  import java.util.concurrent.ExecutionException;
35  import java.util.concurrent.RejectedExecutionException;
36  import java.util.concurrent.TimeUnit;
37  import java.util.concurrent.TimeoutException;
38  import java.util.concurrent.atomic.AtomicInteger;
39  import java.util.concurrent.atomic.AtomicReference;
40  
41  /**
42   * {@link IoEventLoop} implementation that is owned by the user and so needs to be driven by the user manually with the
43   * given {@link Thread}. This means that the user is responsible to call either {@link #runNow()} or {@link #run(long)}
44   * to execute IO and tasks that were submitted to this {@link IoEventLoop}.
45   * <p>
46   * This is for <strong>advanced use-cases only</strong>, where the user wants to own the {@link Thread} that drives the
47   * {@link IoEventLoop} to also do other work. Care must be taken that the {@link #runNow() or
48   * {@link #waitAndRun()}} methods are called in a timely fashion.
49   */
50  public class ManualIoEventLoop extends AbstractScheduledEventExecutor implements IoEventLoop {
51      private static final Runnable WAKEUP_TASK = () -> {
52          // NOOP
53      };
54      private static final int ST_STARTED = 0;
55      private static final int ST_SHUTTING_DOWN = 1;
56      private static final int ST_SHUTDOWN = 2;
57      private static final int ST_TERMINATED = 3;
58  
59      private final AtomicInteger state;
60      private final Promise<?> terminationFuture = new DefaultPromise<Void>(GlobalEventExecutor.INSTANCE);
61      private final Queue<Runnable> taskQueue = PlatformDependent.newMpscQueue();
62      private final IoHandlerContext nonBlockingContext = new IoHandlerContext() {
63          @Override
64          public boolean canBlock() {
65              assert inEventLoop();
66              return false;
67          }
68  
69          @Override
70          public long delayNanos(long currentTimeNanos) {
71              assert inEventLoop();
72              return 0;
73          }
74  
75          @Override
76          public long deadlineNanos() {
77              assert inEventLoop();
78              return -1;
79          }
80      };
81      private final BlockingIoHandlerContext blockingContext = new BlockingIoHandlerContext();
82      private final IoEventLoopGroup parent;
83      private final AtomicReference<Thread> owningThread;
84      private final IoHandler handler;
85      private final Ticker ticker;
86  
87      private volatile long gracefulShutdownQuietPeriod;
88      private volatile long gracefulShutdownTimeout;
89      private long gracefulShutdownStartTime;
90      private long lastExecutionTime;
91      private boolean initialized;
92  
93      /**
94       * This allows to specify additional blocking conditions which will be used by the {@link IoHandler} to decide
95       * whether it is allowed to block or not.
96       */
97      protected boolean canBlock() {
98          return true;
99      }
100 
101     /**
102      * Create a new {@link IoEventLoop} that is owned by the user and so needs to be driven by the user with the given
103      * {@link Thread}. This means that the user is responsible to call either {@link #runNow()} or
104      * {@link #run(long)} to execute IO or tasks that were submitted to this {@link IoEventLoop}.
105      *
106      * @param owningThread      the {@link Thread} that executes the IO and tasks for this {@link IoEventLoop}. The
107      *                          user will use this {@link Thread} to call {@link #runNow()} or {@link #run(long)} to
108      *                          make progress.
109      * @param factory           the {@link IoHandlerFactory} that will be used to create the {@link IoHandler} that is
110      *                          used by this {@link IoEventLoop}.
111      */
112     public ManualIoEventLoop(Thread owningThread, IoHandlerFactory factory) {
113         this(null, owningThread, factory);
114     }
115 
116     /**
117      * Create a new {@link IoEventLoop} that is owned by the user and so needs to be driven by the user with the given
118      * {@link Thread}. This means that the user is responsible to call either {@link #runNow()} or
119      * {@link #run(long)} to execute IO or tasks that were submitted to this {@link IoEventLoop}.
120      *
121      * @param parent            the parent {@link IoEventLoopGroup} or {@code null} if no parent.
122      * @param owningThread      the {@link Thread} that executes the IO and tasks for this {@link IoEventLoop}. The
123      *                          user will use this {@link Thread} to call {@link #runNow()} or {@link #run(long)} to
124      *                          make progress. If {@code null}, must be set later using
125      *                          {@link #setOwningThread(Thread)}.
126      * @param factory           the {@link IoHandlerFactory} that will be used to create the {@link IoHandler} that is
127      *                          used by this {@link IoEventLoop}.
128      */
129     public ManualIoEventLoop(IoEventLoopGroup parent, Thread owningThread, IoHandlerFactory factory) {
130         this(parent, owningThread, factory, Ticker.systemTicker());
131     }
132 
133     /**
134      * Create a new {@link IoEventLoop} that is owned by the user and so needs to be driven by the user with the given
135      * {@link Thread}. This means that the user is responsible to call either {@link #runNow()} or
136      * {@link #run(long)} to execute IO or tasks that were submitted to this {@link IoEventLoop}.
137      *
138      * @param parent            the parent {@link IoEventLoopGroup} or {@code null} if no parent.
139      * @param owningThread      the {@link Thread} that executes the IO and tasks for this {@link IoEventLoop}. The
140      *                          user will use this {@link Thread} to call {@link #runNow()} or {@link #run(long)} to
141      *                          make progress. If {@code null}, must be set later using
142      *                          {@link #setOwningThread(Thread)}.
143      * @param factory           the {@link IoHandlerFactory} that will be used to create the {@link IoHandler} that is
144      *                          used by this {@link IoEventLoop}.
145      * @param ticker            The {@link #ticker()} to use for this event loop. Note that the {@link IoHandler} does
146      *                          not use the ticker, so if the ticker advances faster than system time, you may have to
147      *                          {@link #wakeup()} this event loop manually.
148      */
149     public ManualIoEventLoop(IoEventLoopGroup parent, Thread owningThread, IoHandlerFactory factory, Ticker ticker) {
150         this.parent = parent;
151         this.owningThread = new AtomicReference<>(owningThread);
152         this.handler = factory.newHandler(this);
153         this.ticker = Objects.requireNonNull(ticker, "ticker");
154         state = new AtomicInteger(ST_STARTED);
155     }
156 
157     @Override
158     public final Ticker ticker() {
159         return ticker;
160     }
161 
162     /**
163      * Poll and run tasks from the task queue, until the task queue is empty or the given deadline is exceeded.<br>
164      * If {@code timeoutNanos} is less or equals 0, no deadline is applied.
165      *
166      * @param timeoutNanos the maximum time in nanoseconds to run tasks.
167      */
168     public final int runNonBlockingTasks(long timeoutNanos) {
169         return runAllTasks(timeoutNanos, true);
170     }
171 
172     private int runAllTasks(long timeoutNanos, boolean setCurrentExecutor) {
173         assert inEventLoop();
174         final Queue<Runnable> taskQueue = this.taskQueue;
175         // since the taskQueue is unbounded we don't need to keep on calling this while draining it.
176         boolean alwaysTrue = fetchFromScheduledTaskQueue(taskQueue);
177         assert alwaysTrue;
178         Runnable task = taskQueue.poll();
179         if (task == null) {
180             return 0;
181         }
182         EventExecutor old = setCurrentExecutor? ThreadExecutorMap.setCurrentExecutor(this) : null;
183         try {
184             final long deadline = timeoutNanos > 0 ? getCurrentTimeNanos() + timeoutNanos : 0;
185             int runTasks = 0;
186             long lastExecutionTime;
187             final Ticker ticker = this.ticker;
188             for (;;) {
189                 safeExecute(task);
190 
191                 runTasks++;
192 
193                if (timeoutNanos > 0) {
194                     lastExecutionTime = ticker.nanoTime();
195                     if ((lastExecutionTime - deadline) >= 0) {
196                         break;
197                     }
198                 }
199 
200                 task = taskQueue.poll();
201                 if (task == null) {
202                     lastExecutionTime = ticker.nanoTime();
203                     break;
204                 }
205             }
206             this.lastExecutionTime = lastExecutionTime;
207             return runTasks;
208         } finally {
209             if (setCurrentExecutor) {
210                 ThreadExecutorMap.setCurrentExecutor(old);
211             }
212         }
213     }
214 
215     private int run(IoHandlerContext context, long runAllTasksTimeoutNanos) {
216         if (!initialized) {
217             if (owningThread.get() == null) {
218                 throw new IllegalStateException("Owning thread not set");
219             }
220             initialized = true;
221             handler.initialize();
222         }
223         EventExecutor old = ThreadExecutorMap.setCurrentExecutor(this);
224         try {
225             if (isShuttingDown()) {
226                 if (terminationFuture.isDone()) {
227                     // Already completely terminated
228                     return 0;
229                 }
230                 return runAllTasksBeforeDestroy();
231             }
232             final int ioTasks = handler.run(context);
233             // Now run all tasks.
234             if (runAllTasksTimeoutNanos < 0) {
235                 return ioTasks;
236             }
237             assert runAllTasksTimeoutNanos >= 0;
238             return ioTasks + runAllTasks(runAllTasksTimeoutNanos, false);
239         } finally {
240             ThreadExecutorMap.setCurrentExecutor(old);
241         }
242     }
243 
244     private int runAllTasksBeforeDestroy() {
245         // Run all tasks before prepare to destroy.
246         int run = runAllTasks(-1, false);
247         handler.prepareToDestroy();
248         if (confirmShutdown()) {
249             // Destroy the handler now and run all remaining tasks.
250             try {
251                 handler.destroy();
252                 for (;;) {
253                     int r = runAllTasks(-1, false);
254                     run += r;
255                     if (r == 0) {
256                         break;
257                     }
258                 }
259             } finally {
260                 state.set(ST_TERMINATED);
261                 terminationFuture.setSuccess(null);
262             }
263         }
264         return run;
265     }
266 
267     /**
268      * Executes all ready IO and tasks for this {@link IoEventLoop}.
269      * This methods will <strong>NOT</strong> block and wait for IO / tasks to be ready, it will just
270      * return directly if there is nothing to do.
271      * <p>
272      * <strong>Must be called from the owning {@link Thread} that was passed as a parameter on construction.</strong>
273      * <p>
274      *
275      * @param runAllTasksTimeoutNanos the maximum time in nanoseconds to run tasks.
276      *                                If {@code = 0}, no timeout is applied; if {@code < 0} it just perform I/O tasks.
277      * @return the number of IO and tasks executed.
278      * @throws IllegalStateException if the method is not called from the owning {@link Thread}.
279      */
280     public final int runNow(long runAllTasksTimeoutNanos) {
281         checkCurrentThread();
282         return run(nonBlockingContext, runAllTasksTimeoutNanos);
283     }
284 
285     /**
286      * Run all ready IO and tasks for this {@link IoEventLoop}.
287      * This methods will <strong>NOT</strong> block and wait for IO / tasks to be ready, it will just
288      * return directly if there is nothing to do.
289      * <p>
290      * <strong>Must be called from the owning {@link Thread} that was passed as a parameter on construction.</strong>
291      *
292      * @return the number of IO and tasks executed.
293      */
294     public final int runNow() {
295         checkCurrentThread();
296         return run(nonBlockingContext, 0);
297     }
298 
299     /**
300      * Run all ready IO and tasks for this {@link IoEventLoop}.
301      * This methods will block and wait for IO / tasks to be ready if there is nothing to process atm for the given
302      * {@code waitNanos}.
303      * <p>
304      * <strong>Must be called from the owning {@link Thread} that was passed as an parameter on construction.</strong>
305      *
306      * @param runAllTasksTimeoutNanos the maximum time in nanoseconds to run tasks.
307      *                                If {@code = 0}, no timeout is applied; if {@code < 0} it just perform I/O tasks.
308      * @param waitNanos the maximum amount of nanoseconds to wait before returning. IF {@code 0} it will block until
309      *                  there is some IO / tasks ready, if {@code -1} will not block at all and just return directly
310      *                  if there is nothing to run (like {@link #runNow()}).
311      * @return          the number of IO and tasks executed.
312      */
313     public final int run(long waitNanos, long runAllTasksTimeoutNanos) {
314         checkCurrentThread();
315 
316         final IoHandlerContext context;
317         if (waitNanos < 0) {
318             context = nonBlockingContext;
319         } else {
320             context = blockingContext;
321             blockingContext.maxBlockingNanos = waitNanos == 0 ? Long.MAX_VALUE : waitNanos;
322         }
323         return run(context, runAllTasksTimeoutNanos);
324     }
325 
326     /**
327      * Run all ready IO and tasks for this {@link IoEventLoop}.
328      * This methods will block and wait for IO / tasks to be ready if there is nothing to process atm for the given
329      * {@code waitNanos}.
330      * <p>
331      * <strong>Must be called from the owning {@link Thread} that was passed as an parameter on construction.</strong>
332      *
333      * @param waitNanos the maximum amount of nanoseconds to wait before returning. IF {@code 0} it will block until
334      *                  there is some IO / tasks ready, if {@code -1} will not block at all and just return directly
335      *                  if there is nothing to run (like {@link #runNow()}).
336      * @return          the number of IO and tasks executed.
337      */
338     public final int run(long waitNanos) {
339         return run(waitNanos, 0);
340     }
341 
342     private void checkCurrentThread() {
343         if (!inEventLoop(Thread.currentThread())) {
344             throw new IllegalStateException();
345         }
346     }
347 
348     /**
349      * Force a wakeup and so the {@link #run(long)} method will unblock and return even if there was nothing to do.
350      */
351     public final void wakeup() {
352         if (isShuttingDown()) {
353             return;
354         }
355         handler.wakeup();
356     }
357 
358     @Override
359     public final ManualIoEventLoop next() {
360         return this;
361     }
362 
363     @Override
364     public final IoEventLoopGroup parent() {
365         return parent;
366     }
367 
368     @Deprecated
369     @Override
370     public final ChannelFuture register(Channel channel) {
371         return register(new DefaultChannelPromise(channel, this));
372     }
373 
374     @Deprecated
375     @Override
376     public final ChannelFuture register(final ChannelPromise promise) {
377         ObjectUtil.checkNotNull(promise, "promise");
378         promise.channel().unsafe().register(this, promise);
379         return promise;
380     }
381 
382     @Override
383     public final Future<IoRegistration> register(final IoHandle handle) {
384         Promise<IoRegistration> promise = newPromise();
385         if (inEventLoop()) {
386             registerForIo0(handle, promise);
387         } else {
388             execute(() -> registerForIo0(handle, promise));
389         }
390 
391         return promise;
392     }
393 
394     private void registerForIo0(final IoHandle handle, Promise<IoRegistration> promise) {
395         assert inEventLoop();
396         final IoRegistration registration;
397         try {
398             registration = handler.register(handle);
399         } catch (Exception e) {
400             promise.setFailure(e);
401             return;
402         }
403         promise.setSuccess(registration);
404     }
405 
406     @Deprecated
407     @Override
408     public final ChannelFuture register(final Channel channel, final ChannelPromise promise) {
409         ObjectUtil.checkNotNull(promise, "promise");
410         ObjectUtil.checkNotNull(channel, "channel");
411         channel.unsafe().register(this, promise);
412         return promise;
413     }
414 
415     @Override
416     public final boolean isCompatible(Class<? extends IoHandle> handleType) {
417         return handler.isCompatible(handleType);
418     }
419 
420     @Override
421     public final boolean isIoType(Class<? extends IoHandler> handlerType) {
422         return handler.getClass().equals(handlerType);
423     }
424 
425     @Override
426     public final boolean inEventLoop(Thread thread) {
427         return this.owningThread.get() == thread;
428     }
429 
430     /**
431      * Set the owning thread that will call {@link #run}. May only be called once, and only if the owning thread was
432      * not set in the constructor already.
433      *
434      * @param owningThread The owning thread
435      */
436     public final void setOwningThread(Thread owningThread) {
437         Objects.requireNonNull(owningThread, "owningThread");
438         if (!this.owningThread.compareAndSet(null, owningThread)) {
439             throw new IllegalStateException("Owning thread already set");
440         }
441     }
442 
443     private void shutdown0(long quietPeriod, long timeout, int shutdownState) {
444         boolean inEventLoop = inEventLoop();
445         boolean wakeup;
446         int oldState;
447         for (;;) {
448             if (isShuttingDown()) {
449                 return;
450             }
451             int newState;
452             wakeup = true;
453             oldState = state.get();
454             if (inEventLoop) {
455                 newState = shutdownState;
456             } else if (oldState == ST_STARTED) {
457                 newState = shutdownState;
458             } else {
459                 newState = oldState;
460                 wakeup = false;
461             }
462 
463             if (state.compareAndSet(oldState, newState)) {
464                 break;
465             }
466         }
467         if (quietPeriod != -1) {
468             gracefulShutdownQuietPeriod = quietPeriod;
469         }
470         if (timeout != -1) {
471             gracefulShutdownTimeout = timeout;
472         }
473 
474         if (wakeup) {
475             // same as AbstractScheduledEventExecutor.WAKEUP_TASK
476             taskQueue.offer(WAKEUP_TASK);
477             handler.wakeup();
478         }
479     }
480 
481     @Override
482     public final Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
483         ObjectUtil.checkPositiveOrZero(quietPeriod, "quietPeriod");
484         if (timeout < quietPeriod) {
485             throw new IllegalArgumentException(
486                     "timeout: " + timeout + " (expected >= quietPeriod (" + quietPeriod + "))");
487         }
488         ObjectUtil.checkNotNull(unit, "unit");
489 
490         shutdown0(unit.toNanos(quietPeriod), unit.toNanos(timeout), ST_SHUTTING_DOWN);
491         return terminationFuture();
492     }
493 
494     @Override
495     @Deprecated
496     public final void shutdown() {
497         shutdown0(-1, -1, ST_SHUTDOWN);
498     }
499 
500     @Override
501     public final Future<?> terminationFuture() {
502         return terminationFuture;
503     }
504 
505     @Override
506     public final boolean isShuttingDown() {
507         return state.get() >= ST_SHUTTING_DOWN;
508     }
509 
510     @Override
511     public final boolean isShutdown() {
512         return state.get() >= ST_SHUTDOWN;
513     }
514 
515     @Override
516     public final boolean isTerminated() {
517         return state.get() == ST_TERMINATED;
518     }
519 
520     @Override
521     public final boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
522         return terminationFuture.await(timeout, unit);
523     }
524 
525     @Override
526     public final void execute(Runnable command) {
527         Objects.requireNonNull(command, "command");
528         boolean inEventLoop = inEventLoop();
529         if (inEventLoop) {
530             if (isShutdown()) {
531                 throw new RejectedExecutionException("event executor terminated");
532             }
533         }
534         taskQueue.add(command);
535         if (!inEventLoop) {
536             if (isShutdown()) {
537                 boolean reject = false;
538                 try {
539                     if (taskQueue.remove(command)) {
540                         reject = true;
541                     }
542                 } catch (UnsupportedOperationException e) {
543                     // The task queue does not support removal so the best thing we can do is to just move on and
544                     // hope we will be able to pick-up the task before its completely terminated.
545                     // In worst case we will log on termination.
546                 }
547                 if (reject) {
548                     throw new RejectedExecutionException("event executor terminated");
549                 }
550             }
551             handler.wakeup();
552         }
553     }
554 
555     private boolean hasTasks() {
556         return !taskQueue.isEmpty();
557     }
558 
559     private boolean confirmShutdown() {
560         if (!isShuttingDown()) {
561             return false;
562         }
563 
564         if (!inEventLoop()) {
565             throw new IllegalStateException("must be invoked from an event loop");
566         }
567 
568         cancelScheduledTasks();
569 
570         if (gracefulShutdownStartTime == 0) {
571             gracefulShutdownStartTime = ticker.nanoTime();
572         }
573 
574         if (runAllTasks(-1, false) > 0) {
575             if (isShutdown()) {
576                 // Executor shut down - no new tasks anymore.
577                 return true;
578             }
579 
580             // There were tasks in the queue. Wait a little bit more until no tasks are queued for the quiet period or
581             // terminate if the quiet period is 0.
582             // See https://github.com/netty/netty/issues/4241
583             if (gracefulShutdownQuietPeriod == 0) {
584                 return true;
585             }
586             return false;
587         }
588 
589         final long nanoTime = ticker.nanoTime();
590 
591         if (isShutdown() || nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout) {
592             return true;
593         }
594 
595         if (nanoTime - lastExecutionTime <= gracefulShutdownQuietPeriod) {
596             try {
597                 Thread.sleep(100);
598             } catch (InterruptedException e) {
599                 // Ignore
600             }
601 
602             return false;
603         }
604 
605         // No tasks were added for last quiet period - hopefully safe to shut down.
606         // (Hopefully because we really cannot make a guarantee that there will be no execute() calls by a user.)
607         return true;
608     }
609 
610     @Override
611     public final <T> T invokeAny(Collection<? extends Callable<T>> tasks)
612             throws InterruptedException, ExecutionException {
613         // We need to check if the method was called from within the EventLoop as this would cause a deadlock.
614         throwIfInEventLoop("invokeAny");
615         return super.invokeAny(tasks);
616     }
617 
618     @Override
619     public final <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
620             throws InterruptedException, ExecutionException, TimeoutException {
621         // We need to check if the method was called from within the EventLoop as this would cause a deadlock.
622         throwIfInEventLoop("invokeAny");
623         return super.invokeAny(tasks, timeout, unit);
624     }
625 
626     @Override
627     public final <T> List<java.util.concurrent.Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
628             throws InterruptedException {
629         // We need to check if the method was called from within the EventLoop as this would cause a deadlock.
630         throwIfInEventLoop("invokeAll");
631         return super.invokeAll(tasks);
632     }
633 
634     @Override
635     public final <T> List<java.util.concurrent.Future<T>> invokeAll(
636             Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
637         // We need to check if the method was called from within the EventLoop as this would cause a deadlock.
638         throwIfInEventLoop("invokeAll");
639         return super.invokeAll(tasks, timeout, unit);
640     }
641 
642     private void throwIfInEventLoop(String method) {
643         if (inEventLoop()) {
644             throw new RejectedExecutionException(
645                     "Calling " + method + " from within the EventLoop is not allowed as it would deadlock");
646         }
647     }
648 
649     private class BlockingIoHandlerContext implements IoHandlerContext {
650         // this is a positive amount of nanos or Long.MAX_VALUE for no limit
651         long maxBlockingNanos = Long.MAX_VALUE;
652 
653         @Override
654         public boolean canBlock() {
655             assert inEventLoop();
656             return !hasTasks() && !hasScheduledTasks() && ManualIoEventLoop.this.canBlock();
657         }
658 
659         @Override
660         public long delayNanos(long currentTimeNanos) {
661             assert inEventLoop();
662             return Math.min(maxBlockingNanos, ManualIoEventLoop.this.delayNanos(currentTimeNanos, maxBlockingNanos));
663         }
664 
665         @Override
666         public long deadlineNanos() {
667             assert inEventLoop();
668             long next = nextScheduledTaskDeadlineNanos();
669             if (maxBlockingNanos == Long.MAX_VALUE) {
670                 // next == -1? -1 : next i.e. return next
671                 return next;
672             }
673             long now = ticker.nanoTime();
674             // we cannot just check Math.min as nanoTime can be negative or wrap around!
675             if (next == -1 || next - now > maxBlockingNanos) {
676                 return now + maxBlockingNanos;
677             }
678             return next;
679         }
680     }
681 }