View Javadoc
1   /*
2    * Copyright 2015 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.pool;
17  
18  import static io.netty.util.internal.ObjectUtil.checkPositive;
19  
20  import io.netty.bootstrap.Bootstrap;
21  import io.netty.channel.Channel;
22  import io.netty.util.concurrent.EventExecutor;
23  import io.netty.util.concurrent.Future;
24  import io.netty.util.concurrent.FutureListener;
25  import io.netty.util.concurrent.GlobalEventExecutor;
26  import io.netty.util.concurrent.Promise;
27  import io.netty.util.internal.ObjectUtil;
28  
29  import java.nio.channels.ClosedChannelException;
30  import java.util.ArrayDeque;
31  import java.util.Queue;
32  import java.util.concurrent.atomic.AtomicInteger;
33  import java.util.concurrent.Callable;
34  import java.util.concurrent.ScheduledFuture;
35  import java.util.concurrent.TimeUnit;
36  import java.util.concurrent.TimeoutException;
37  
38  /**
39   * {@link ChannelPool} implementation that takes another {@link ChannelPool} implementation and enforce a maximum
40   * number of concurrent connections.
41   */
42  public class FixedChannelPool extends SimpleChannelPool {
43  
44      public enum AcquireTimeoutAction {
45          /**
46           * Create a new connection when the timeout is detected.
47           */
48          NEW,
49  
50          /**
51           * Fail the {@link Future} of the acquire call with a {@link TimeoutException}.
52           */
53          FAIL
54      }
55  
56      private final EventExecutor executor;
57      private final long acquireTimeoutNanos;
58      private final Runnable timeoutTask;
59  
60      // There is no need to worry about synchronization as everything that modified the queue or counts is done
61      // by the above EventExecutor.
62      private final Queue<AcquireTask> pendingAcquireQueue = new ArrayDeque<AcquireTask>();
63      private final int maxConnections;
64      private final int maxPendingAcquires;
65      private final AtomicInteger acquiredChannelCount = new AtomicInteger();
66      private int pendingAcquireCount;
67      private boolean closed;
68  
69      /**
70       * Creates a new instance using the {@link ChannelHealthChecker#ACTIVE}.
71       *
72       * @param bootstrap         the {@link Bootstrap} that is used for connections
73       * @param handler           the {@link ChannelPoolHandler} that will be notified for the different pool actions
74       * @param maxConnections    the number of maximal active connections, once this is reached new tries to acquire
75       *                          a {@link Channel} will be delayed until a connection is returned to the pool again.
76       */
77      public FixedChannelPool(Bootstrap bootstrap,
78                              ChannelPoolHandler handler, int maxConnections) {
79          this(bootstrap, handler, maxConnections, Integer.MAX_VALUE);
80      }
81  
82      /**
83       * Creates a new instance using the {@link ChannelHealthChecker#ACTIVE}.
84       *
85       * @param bootstrap             the {@link Bootstrap} that is used for connections
86       * @param handler               the {@link ChannelPoolHandler} that will be notified for the different pool actions
87       * @param maxConnections        the number of maximal active connections, once this is reached new tries to
88       *                              acquire a {@link Channel} will be delayed until a connection is returned to the
89       *                              pool again.
90       * @param maxPendingAcquires    the maximum number of pending acquires. Once this is exceed acquire tries will
91       *                              be failed.
92       */
93      public FixedChannelPool(Bootstrap bootstrap,
94                              ChannelPoolHandler handler, int maxConnections, int maxPendingAcquires) {
95          this(bootstrap, handler, ChannelHealthChecker.ACTIVE, null, -1, maxConnections, maxPendingAcquires);
96      }
97  
98      /**
99       * Creates a new instance.
100      *
101      * @param bootstrap             the {@link Bootstrap} that is used for connections
102      * @param handler               the {@link ChannelPoolHandler} that will be notified for the different pool actions
103      * @param healthCheck           the {@link ChannelHealthChecker} that will be used to check if a {@link Channel} is
104      *                              still healthy when obtain from the {@link ChannelPool}
105      * @param action                the {@link AcquireTimeoutAction} to use or {@code null} if non should be used.
106      *                              In this case {@param acquireTimeoutMillis} must be {@code -1}.
107      * @param acquireTimeoutMillis  the time (in milliseconds) after which an pending acquire must complete or
108      *                              the {@link AcquireTimeoutAction} takes place.
109      * @param maxConnections        the number of maximal active connections, once this is reached new tries to
110      *                              acquire a {@link Channel} will be delayed until a connection is returned to the
111      *                              pool again.
112      * @param maxPendingAcquires    the maximum number of pending acquires. Once this is exceed acquire tries will
113      *                              be failed.
114      */
115     public FixedChannelPool(Bootstrap bootstrap,
116                             ChannelPoolHandler handler,
117                             ChannelHealthChecker healthCheck, AcquireTimeoutAction action,
118                             final long acquireTimeoutMillis,
119                             int maxConnections, int maxPendingAcquires) {
120         this(bootstrap, handler, healthCheck, action, acquireTimeoutMillis, maxConnections, maxPendingAcquires, true);
121     }
122 
123     /**
124      * Creates a new instance.
125      *
126      * @param bootstrap             the {@link Bootstrap} that is used for connections
127      * @param handler               the {@link ChannelPoolHandler} that will be notified for the different pool actions
128      * @param healthCheck           the {@link ChannelHealthChecker} that will be used to check if a {@link Channel} is
129      *                              still healthy when obtain from the {@link ChannelPool}
130      * @param action                the {@link AcquireTimeoutAction} to use or {@code null} if non should be used.
131      *                              In this case {@param acquireTimeoutMillis} must be {@code -1}.
132      * @param acquireTimeoutMillis  the time (in milliseconds) after which an pending acquire must complete or
133      *                              the {@link AcquireTimeoutAction} takes place.
134      * @param maxConnections        the number of maximal active connections, once this is reached new tries to
135      *                              acquire a {@link Channel} will be delayed until a connection is returned to the
136      *                              pool again.
137      * @param maxPendingAcquires    the maximum number of pending acquires. Once this is exceed acquire tries will
138      *                              be failed.
139      * @param releaseHealthCheck    will check channel health before offering back if this parameter set to
140      *                              {@code true}.
141      */
142     public FixedChannelPool(Bootstrap bootstrap,
143                             ChannelPoolHandler handler,
144                             ChannelHealthChecker healthCheck, AcquireTimeoutAction action,
145                             final long acquireTimeoutMillis,
146                             int maxConnections, int maxPendingAcquires, final boolean releaseHealthCheck) {
147         this(bootstrap, handler, healthCheck, action, acquireTimeoutMillis, maxConnections, maxPendingAcquires,
148                 releaseHealthCheck, true);
149     }
150 
151     /**
152      * Creates a new instance.
153      *
154      * @param bootstrap             the {@link Bootstrap} that is used for connections
155      * @param handler               the {@link ChannelPoolHandler} that will be notified for the different pool actions
156      * @param healthCheck           the {@link ChannelHealthChecker} that will be used to check if a {@link Channel} is
157      *                              still healthy when obtain from the {@link ChannelPool}
158      * @param action                the {@link AcquireTimeoutAction} to use or {@code null} if non should be used.
159      *                              In this case {@param acquireTimeoutMillis} must be {@code -1}.
160      * @param acquireTimeoutMillis  the time (in milliseconds) after which an pending acquire must complete or
161      *                              the {@link AcquireTimeoutAction} takes place.
162      * @param maxConnections        the number of maximal active connections, once this is reached new tries to
163      *                              acquire a {@link Channel} will be delayed until a connection is returned to the
164      *                              pool again.
165      * @param maxPendingAcquires    the maximum number of pending acquires. Once this is exceed acquire tries will
166      *                              be failed.
167      * @param releaseHealthCheck    will check channel health before offering back if this parameter set to
168      *                              {@code true}.
169      * @param lastRecentUsed        {@code true} {@link Channel} selection will be LIFO, if {@code false} FIFO.
170      */
171     public FixedChannelPool(Bootstrap bootstrap,
172                             ChannelPoolHandler handler,
173                             ChannelHealthChecker healthCheck, AcquireTimeoutAction action,
174                             final long acquireTimeoutMillis,
175                             int maxConnections, int maxPendingAcquires,
176                             boolean releaseHealthCheck, boolean lastRecentUsed) {
177         super(bootstrap, handler, healthCheck, releaseHealthCheck, lastRecentUsed);
178         checkPositive(maxConnections, "maxConnections");
179         checkPositive(maxPendingAcquires, "maxPendingAcquires");
180         if (action == null && acquireTimeoutMillis == -1) {
181             timeoutTask = null;
182             acquireTimeoutNanos = -1;
183         } else if (action == null && acquireTimeoutMillis != -1) {
184             throw new NullPointerException("action");
185         } else if (action != null && acquireTimeoutMillis < 0) {
186             throw new IllegalArgumentException("acquireTimeoutMillis: " + acquireTimeoutMillis + " (expected: >= 0)");
187         } else {
188             acquireTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(acquireTimeoutMillis);
189             switch (action) {
190             case FAIL:
191                 timeoutTask = new TimeoutTask() {
192                     @Override
193                     public void onTimeout(AcquireTask task) {
194                         // Fail the promise as we timed out.
195                         task.promise.setFailure(new AcquireTimeoutException());
196                     }
197                 };
198                 break;
199             case NEW:
200                 timeoutTask = new TimeoutTask() {
201                     @Override
202                     public void onTimeout(AcquireTask task) {
203                         // Increment the acquire count and delegate to super to actually acquire a Channel which will
204                         // create a new connection.
205                         task.acquired();
206 
207                         FixedChannelPool.super.acquire(task.promise);
208                     }
209                 };
210                 break;
211             default:
212                 throw new Error();
213             }
214         }
215         executor = bootstrap.config().group().next();
216         this.maxConnections = maxConnections;
217         this.maxPendingAcquires = maxPendingAcquires;
218     }
219 
220     /** Returns the number of acquired channels that this pool thinks it has. */
221     public int acquiredChannelCount() {
222         return acquiredChannelCount.get();
223     }
224 
225     @Override
226     public Future<Channel> acquire(final Promise<Channel> promise) {
227         try {
228             if (executor.inEventLoop()) {
229                 acquire0(promise);
230             } else {
231                 executor.execute(new Runnable() {
232                     @Override
233                     public void run() {
234                         acquire0(promise);
235                     }
236                 });
237             }
238         } catch (Throwable cause) {
239             promise.tryFailure(cause);
240         }
241         return promise;
242     }
243 
244     private void acquire0(final Promise<Channel> promise) {
245         try {
246             assert executor.inEventLoop();
247 
248             if (closed) {
249                 promise.setFailure(new IllegalStateException("FixedChannelPool was closed"));
250                 return;
251             }
252             if (acquiredChannelCount.get() < maxConnections) {
253                 assert acquiredChannelCount.get() >= 0;
254 
255                 // We need to create a new promise as we need to ensure the AcquireListener runs in the correct
256                 // EventLoop
257                 Promise<Channel> p = executor.newPromise();
258                 AcquireListener l = new AcquireListener(promise);
259                 l.acquired();
260                 p.addListener(l);
261                 super.acquire(p);
262             } else {
263                 if (pendingAcquireCount >= maxPendingAcquires) {
264                     tooManyOutstanding(promise);
265                 } else {
266                     AcquireTask task = new AcquireTask(promise);
267                     if (pendingAcquireQueue.offer(task)) {
268                         ++pendingAcquireCount;
269 
270                         if (timeoutTask != null) {
271                             task.timeoutFuture = executor.schedule(timeoutTask, acquireTimeoutNanos,
272                                   TimeUnit.NANOSECONDS);
273                         }
274                     } else {
275                         tooManyOutstanding(promise);
276                     }
277                 }
278 
279                 assert pendingAcquireCount > 0;
280             }
281         } catch (Throwable cause) {
282             promise.tryFailure(cause);
283         }
284     }
285 
286     private void tooManyOutstanding(Promise<?> promise) {
287         promise.setFailure(new IllegalStateException("Too many outstanding acquire operations"));
288     }
289 
290     @Override
291     public Future<Void> release(final Channel channel, final Promise<Void> promise) {
292         ObjectUtil.checkNotNull(promise, "promise");
293         final Promise<Void> p = executor.newPromise();
294         super.release(channel, p.addListener(new FutureListener<Void>() {
295 
296             @Override
297             public void operationComplete(Future<Void> future) {
298                 try {
299                     assert executor.inEventLoop();
300 
301                     if (closed) {
302                         // Since the pool is closed, we have no choice but to close the channel
303                         channel.close();
304                         promise.setFailure(new IllegalStateException("FixedChannelPool was closed"));
305                         return;
306                     }
307 
308                     if (future.isSuccess()) {
309                         decrementAndRunTaskQueue();
310                         promise.setSuccess(null);
311                     } else {
312                         Throwable cause = future.cause();
313                         // Check if the exception was not because of we passed the Channel to the wrong pool.
314                         if (!(cause instanceof IllegalArgumentException)) {
315                             decrementAndRunTaskQueue();
316                         }
317                         promise.setFailure(future.cause());
318                     }
319                 } catch (Throwable cause) {
320                     promise.tryFailure(cause);
321                 }
322             }
323         }));
324         return promise;
325     }
326 
327     private void decrementAndRunTaskQueue() {
328         // We should never have a negative value.
329         int currentCount = acquiredChannelCount.decrementAndGet();
330         assert currentCount >= 0;
331 
332         // Run the pending acquire tasks before notify the original promise so if the user would
333         // try to acquire again from the ChannelFutureListener and the pendingAcquireCount is >=
334         // maxPendingAcquires we may be able to run some pending tasks first and so allow to add
335         // more.
336         runTaskQueue();
337     }
338 
339     private void runTaskQueue() {
340         while (acquiredChannelCount.get() < maxConnections) {
341             AcquireTask task = pendingAcquireQueue.poll();
342             if (task == null) {
343                 break;
344             }
345 
346             // Cancel the timeout if one was scheduled
347             ScheduledFuture<?> timeoutFuture = task.timeoutFuture;
348             if (timeoutFuture != null) {
349                 timeoutFuture.cancel(false);
350             }
351 
352             --pendingAcquireCount;
353             task.acquired();
354 
355             super.acquire(task.promise);
356         }
357 
358         // We should never have a negative value.
359         assert pendingAcquireCount >= 0;
360         assert acquiredChannelCount.get() >= 0;
361     }
362 
363     // AcquireTask extends AcquireListener to reduce object creations and so GC pressure
364     private final class AcquireTask extends AcquireListener {
365         final Promise<Channel> promise;
366         final long expireNanoTime = System.nanoTime() + acquireTimeoutNanos;
367         ScheduledFuture<?> timeoutFuture;
368 
369         AcquireTask(Promise<Channel> promise) {
370             super(promise);
371             // We need to create a new promise as we need to ensure the AcquireListener runs in the correct
372             // EventLoop.
373             this.promise = executor.<Channel>newPromise().addListener(this);
374         }
375     }
376 
377     private abstract class TimeoutTask implements Runnable {
378         @Override
379         public final void run() {
380             assert executor.inEventLoop();
381             long nanoTime = System.nanoTime();
382             for (;;) {
383                 AcquireTask task = pendingAcquireQueue.peek();
384                 // Compare nanoTime as descripted in the javadocs of System.nanoTime()
385                 //
386                 // See https://docs.oracle.com/javase/7/docs/api/java/lang/System.html#nanoTime()
387                 // See https://github.com/netty/netty/issues/3705
388                 if (task == null || nanoTime - task.expireNanoTime < 0) {
389                     break;
390                 }
391                 pendingAcquireQueue.remove();
392 
393                 --pendingAcquireCount;
394                 onTimeout(task);
395             }
396         }
397 
398         public abstract void onTimeout(AcquireTask task);
399     }
400 
401     private class AcquireListener implements FutureListener<Channel> {
402         private final Promise<Channel> originalPromise;
403         protected boolean acquired;
404 
405         AcquireListener(Promise<Channel> originalPromise) {
406             this.originalPromise = originalPromise;
407         }
408 
409         @Override
410         public void operationComplete(Future<Channel> future) throws Exception {
411             try {
412                 assert executor.inEventLoop();
413 
414                 if (closed) {
415                     if (future.isSuccess()) {
416                         // Since the pool is closed, we have no choice but to close the channel
417                         future.getNow().close();
418                     }
419                     originalPromise.setFailure(new IllegalStateException("FixedChannelPool was closed"));
420                     return;
421                 }
422 
423                 if (future.isSuccess()) {
424                     originalPromise.setSuccess(future.getNow());
425                 } else {
426                     if (acquired) {
427                         decrementAndRunTaskQueue();
428                     } else {
429                         runTaskQueue();
430                     }
431 
432                     originalPromise.setFailure(future.cause());
433                 }
434             } catch (Throwable cause) {
435                 originalPromise.tryFailure(cause);
436             }
437         }
438 
439         public void acquired() {
440             if (acquired) {
441                 return;
442             }
443             acquiredChannelCount.incrementAndGet();
444             acquired = true;
445         }
446     }
447 
448     @Override
449     public void close() {
450         try {
451             closeAsync().await();
452         } catch (InterruptedException e) {
453             Thread.currentThread().interrupt();
454             throw new RuntimeException(e);
455         }
456     }
457 
458     /**
459      * Closes the pool in an async manner.
460      *
461      * @return Future which represents completion of the close task
462      */
463     @Override
464     public Future<Void> closeAsync() {
465         if (executor.inEventLoop()) {
466             return close0();
467         } else {
468             final Promise<Void> closeComplete = executor.newPromise();
469             executor.execute(new Runnable() {
470                 @Override
471                 public void run() {
472                     close0().addListener(new FutureListener<Void>() {
473                         @Override
474                         public void operationComplete(Future<Void> f) throws Exception {
475                             if (f.isSuccess()) {
476                                 closeComplete.setSuccess(null);
477                             } else {
478                                 closeComplete.setFailure(f.cause());
479                             }
480                         }
481                     });
482                 }
483             });
484             return closeComplete;
485         }
486     }
487 
488     private Future<Void> close0() {
489         assert executor.inEventLoop();
490 
491         if (!closed) {
492             closed = true;
493             for (;;) {
494                 AcquireTask task = pendingAcquireQueue.poll();
495                 if (task == null) {
496                     break;
497                 }
498                 ScheduledFuture<?> f = task.timeoutFuture;
499                 if (f != null) {
500                     f.cancel(false);
501                 }
502                 task.promise.setFailure(new ClosedChannelException());
503             }
504             acquiredChannelCount.set(0);
505             pendingAcquireCount = 0;
506 
507             // Ensure we dispatch this on another Thread as close0 will be called from the EventExecutor and we need
508             // to ensure we will not block in a EventExecutor.
509             return GlobalEventExecutor.INSTANCE.submit(new Callable<Void>() {
510                 @Override
511                 public Void call() throws Exception {
512                     FixedChannelPool.super.close();
513                     return null;
514                 }
515             });
516         }
517 
518         return GlobalEventExecutor.INSTANCE.newSucceededFuture(null);
519     }
520 
521     private static final class AcquireTimeoutException extends TimeoutException {
522 
523         private AcquireTimeoutException() {
524             super("Acquire operation took longer then configured maximum time");
525         }
526 
527         // Suppress a warning since the method doesn't need synchronization
528         @Override
529         public Throwable fillInStackTrace() {
530             return this;
531         }
532     }
533 }