View Javadoc
1   /*
2    * Copyright 2015 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.pool;
17  
18  import static io.netty.util.internal.ObjectUtil.checkPositive;
19  
20  import io.netty.bootstrap.Bootstrap;
21  import io.netty.channel.Channel;
22  import io.netty.util.concurrent.EventExecutor;
23  import io.netty.util.concurrent.Future;
24  import io.netty.util.concurrent.FutureListener;
25  import io.netty.util.concurrent.GlobalEventExecutor;
26  import io.netty.util.concurrent.Promise;
27  import io.netty.util.internal.ObjectUtil;
28  
29  import java.nio.channels.ClosedChannelException;
30  import java.util.ArrayDeque;
31  import java.util.Queue;
32  import java.util.concurrent.atomic.AtomicInteger;
33  import java.util.concurrent.Callable;
34  import java.util.concurrent.ScheduledFuture;
35  import java.util.concurrent.TimeUnit;
36  import java.util.concurrent.TimeoutException;
37  
38  /**
39   * {@link ChannelPool} implementation that takes another {@link ChannelPool} implementation and enforce a maximum
40   * number of concurrent connections.
41   */
42  public class FixedChannelPool extends SimpleChannelPool {
43  
44      public enum AcquireTimeoutAction {
45          /**
46           * Create a new connection when the timeout is detected.
47           */
48          NEW,
49  
50          /**
51           * Fail the {@link Future} of the acquire call with a {@link TimeoutException}.
52           */
53          FAIL
54      }
55  
56      private final EventExecutor executor;
57      private final long acquireTimeoutNanos;
58      private final Runnable timeoutTask;
59  
60      // There is no need to worry about synchronization as everything that modified the queue or counts is done
61      // by the above EventExecutor.
62      private final Queue<AcquireTask> pendingAcquireQueue = new ArrayDeque<AcquireTask>();
63      private final int maxConnections;
64      private final int maxPendingAcquires;
65      private final AtomicInteger acquiredChannelCount = new AtomicInteger();
66      private int pendingAcquireCount;
67      private boolean closed;
68  
69      /**
70       * Creates a new instance using the {@link ChannelHealthChecker#ACTIVE}.
71       *
72       * @param bootstrap         the {@link Bootstrap} that is used for connections
73       * @param handler           the {@link ChannelPoolHandler} that will be notified for the different pool actions
74       * @param maxConnections    the number of maximal active connections, once this is reached new tries to acquire
75       *                          a {@link Channel} will be delayed until a connection is returned to the pool again.
76       */
77      public FixedChannelPool(Bootstrap bootstrap,
78                              ChannelPoolHandler handler, int maxConnections) {
79          this(bootstrap, handler, maxConnections, Integer.MAX_VALUE);
80      }
81  
82      /**
83       * Creates a new instance using the {@link ChannelHealthChecker#ACTIVE}.
84       *
85       * @param bootstrap             the {@link Bootstrap} that is used for connections
86       * @param handler               the {@link ChannelPoolHandler} that will be notified for the different pool actions
87       * @param maxConnections        the number of maximal active connections, once this is reached new tries to
88       *                              acquire a {@link Channel} will be delayed until a connection is returned to the
89       *                              pool again.
90       * @param maxPendingAcquires    the maximum number of pending acquires. Once this is exceed acquire tries will
91       *                              be failed.
92       */
93      public FixedChannelPool(Bootstrap bootstrap,
94                              ChannelPoolHandler handler, int maxConnections, int maxPendingAcquires) {
95          this(bootstrap, handler, ChannelHealthChecker.ACTIVE, null, -1, maxConnections, maxPendingAcquires);
96      }
97  
98      /**
99       * Creates a new instance.
100      *
101      * @param bootstrap             the {@link Bootstrap} that is used for connections
102      * @param handler               the {@link ChannelPoolHandler} that will be notified for the different pool actions
103      * @param healthCheck           the {@link ChannelHealthChecker} that will be used to check if a {@link Channel} is
104      *                              still healthy when obtain from the {@link ChannelPool}
105      * @param action                the {@link AcquireTimeoutAction} to use or {@code null} if non should be used.
106      *                              In this case {@param acquireTimeoutMillis} must be {@code -1}.
107      * @param acquireTimeoutMillis  the time (in milliseconds) after which an pending acquire must complete or
108      *                              the {@link AcquireTimeoutAction} takes place.
109      * @param maxConnections        the number of maximal active connections, once this is reached new tries to
110      *                              acquire a {@link Channel} will be delayed until a connection is returned to the
111      *                              pool again.
112      * @param maxPendingAcquires    the maximum number of pending acquires. Once this is exceed acquire tries will
113      *                              be failed.
114      */
115     public FixedChannelPool(Bootstrap bootstrap,
116                             ChannelPoolHandler handler,
117                             ChannelHealthChecker healthCheck, AcquireTimeoutAction action,
118                             final long acquireTimeoutMillis,
119                             int maxConnections, int maxPendingAcquires) {
120         this(bootstrap, handler, healthCheck, action, acquireTimeoutMillis, maxConnections, maxPendingAcquires, true);
121     }
122 
123     /**
124      * Creates a new instance.
125      *
126      * @param bootstrap             the {@link Bootstrap} that is used for connections
127      * @param handler               the {@link ChannelPoolHandler} that will be notified for the different pool actions
128      * @param healthCheck           the {@link ChannelHealthChecker} that will be used to check if a {@link Channel} is
129      *                              still healthy when obtain from the {@link ChannelPool}
130      * @param action                the {@link AcquireTimeoutAction} to use or {@code null} if non should be used.
131      *                              In this case {@param acquireTimeoutMillis} must be {@code -1}.
132      * @param acquireTimeoutMillis  the time (in milliseconds) after which an pending acquire must complete or
133      *                              the {@link AcquireTimeoutAction} takes place.
134      * @param maxConnections        the number of maximal active connections, once this is reached new tries to
135      *                              acquire a {@link Channel} will be delayed until a connection is returned to the
136      *                              pool again.
137      * @param maxPendingAcquires    the maximum number of pending acquires. Once this is exceed acquire tries will
138      *                              be failed.
139      * @param releaseHealthCheck    will check channel health before offering back if this parameter set to
140      *                              {@code true}.
141      */
142     public FixedChannelPool(Bootstrap bootstrap,
143                             ChannelPoolHandler handler,
144                             ChannelHealthChecker healthCheck, AcquireTimeoutAction action,
145                             final long acquireTimeoutMillis,
146                             int maxConnections, int maxPendingAcquires, final boolean releaseHealthCheck) {
147         this(bootstrap, handler, healthCheck, action, acquireTimeoutMillis, maxConnections, maxPendingAcquires,
148                 releaseHealthCheck, true);
149     }
150 
151     /**
152      * Creates a new instance.
153      *
154      * @param bootstrap             the {@link Bootstrap} that is used for connections
155      * @param handler               the {@link ChannelPoolHandler} that will be notified for the different pool actions
156      * @param healthCheck           the {@link ChannelHealthChecker} that will be used to check if a {@link Channel} is
157      *                              still healthy when obtain from the {@link ChannelPool}
158      * @param action                the {@link AcquireTimeoutAction} to use or {@code null} if non should be used.
159      *                              In this case {@param acquireTimeoutMillis} must be {@code -1}.
160      * @param acquireTimeoutMillis  the time (in milliseconds) after which an pending acquire must complete or
161      *                              the {@link AcquireTimeoutAction} takes place.
162      * @param maxConnections        the number of maximal active connections, once this is reached new tries to
163      *                              acquire a {@link Channel} will be delayed until a connection is returned to the
164      *                              pool again.
165      * @param maxPendingAcquires    the maximum number of pending acquires. Once this is exceed acquire tries will
166      *                              be failed.
167      * @param releaseHealthCheck    will check channel health before offering back if this parameter set to
168      *                              {@code true}.
169      * @param lastRecentUsed        {@code true} {@link Channel} selection will be LIFO, if {@code false} FIFO.
170      */
171     public FixedChannelPool(Bootstrap bootstrap,
172                             ChannelPoolHandler handler,
173                             ChannelHealthChecker healthCheck, AcquireTimeoutAction action,
174                             final long acquireTimeoutMillis,
175                             int maxConnections, int maxPendingAcquires,
176                             boolean releaseHealthCheck, boolean lastRecentUsed) {
177         super(bootstrap, handler, healthCheck, releaseHealthCheck, lastRecentUsed);
178         checkPositive(maxConnections, "maxConnections");
179         checkPositive(maxPendingAcquires, "maxPendingAcquires");
180         if (action == null && acquireTimeoutMillis == -1) {
181             timeoutTask = null;
182             acquireTimeoutNanos = -1;
183         } else if (action == null && acquireTimeoutMillis != -1) {
184             throw new NullPointerException("action");
185         } else if (action != null && acquireTimeoutMillis < 0) {
186             throw new IllegalArgumentException("acquireTimeoutMillis: " + acquireTimeoutMillis + " (expected: >= 0)");
187         } else {
188             acquireTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(acquireTimeoutMillis);
189             switch (action) {
190             case FAIL:
191                 timeoutTask = new TimeoutTask() {
192                     @Override
193                     public void onTimeout(AcquireTask task) {
194                         // Fail the promise as we timed out.
195                         task.promise.setFailure(new AcquireTimeoutException());
196                     }
197                 };
198                 break;
199             case NEW:
200                 timeoutTask = new TimeoutTask() {
201                     @Override
202                     public void onTimeout(AcquireTask task) {
203                         // Increment the acquire count and delegate to super to actually acquire a Channel which will
204                         // create a new connection.
205                         task.acquired();
206 
207                         FixedChannelPool.super.acquire(task.promise);
208                     }
209                 };
210                 break;
211             default:
212                 throw new Error("Unexpected AcquireTimeoutAction: " + action);
213             }
214         }
215         executor = bootstrap.config().group().next();
216         this.maxConnections = maxConnections;
217         this.maxPendingAcquires = maxPendingAcquires;
218     }
219 
220     /** Returns the number of acquired channels that this pool thinks it has. */
221     public int acquiredChannelCount() {
222         return acquiredChannelCount.get();
223     }
224 
225     @Override
226     public Future<Channel> acquire(final Promise<Channel> promise) {
227         try {
228             if (executor.inEventLoop()) {
229                 acquire0(promise);
230             } else {
231                 executor.execute(new Runnable() {
232                     @Override
233                     public void run() {
234                         acquire0(promise);
235                     }
236                 });
237             }
238         } catch (Throwable cause) {
239             promise.tryFailure(cause);
240         }
241         return promise;
242     }
243 
244     private void acquire0(final Promise<Channel> promise) {
245         try {
246             assert executor.inEventLoop();
247 
248             if (closed) {
249                 promise.setFailure(new IllegalStateException("FixedChannelPool was closed"));
250                 return;
251             }
252             if (acquiredChannelCount.get() < maxConnections) {
253                 assert acquiredChannelCount.get() >= 0;
254 
255                 // We need to create a new promise as we need to ensure the AcquireListener runs in the correct
256                 // EventLoop
257                 Promise<Channel> p = executor.newPromise();
258                 AcquireListener l = new AcquireListener(promise);
259                 l.acquired();
260                 p.addListener(l);
261                 super.acquire(p);
262             } else {
263                 if (pendingAcquireCount >= maxPendingAcquires) {
264                     tooManyOutstanding(promise);
265                 } else {
266                     AcquireTask task = new AcquireTask(promise);
267                     if (pendingAcquireQueue.offer(task)) {
268                         ++pendingAcquireCount;
269 
270                         if (timeoutTask != null) {
271                             task.timeoutFuture = executor.schedule(timeoutTask, acquireTimeoutNanos,
272                                   TimeUnit.NANOSECONDS);
273                         }
274                     } else {
275                         tooManyOutstanding(promise);
276                     }
277                 }
278 
279                 assert pendingAcquireCount > 0;
280             }
281         } catch (Throwable cause) {
282             promise.tryFailure(cause);
283         }
284     }
285 
286     private void tooManyOutstanding(Promise<?> promise) {
287         promise.setFailure(new IllegalStateException("Too many outstanding acquire operations"));
288     }
289 
290     @Override
291     public Future<Void> release(final Channel channel, final Promise<Void> promise) {
292         ObjectUtil.checkNotNull(promise, "promise");
293         final Promise<Void> p = executor.newPromise();
294         super.release(channel, p.addListener((FutureListener<Void>) future -> {
295             try {
296                 assert executor.inEventLoop();
297 
298                 if (closed) {
299                     // Since the pool is closed, we have no choice but to close the channel
300                     channel.close();
301                     promise.setFailure(new IllegalStateException("FixedChannelPool was closed"));
302                     return;
303                 }
304 
305                 if (future.isSuccess()) {
306                     decrementAndRunTaskQueue();
307                     promise.setSuccess(null);
308                 } else {
309                     Throwable cause = future.cause();
310                     // Check if the exception was not because of we passed the Channel to the wrong pool.
311                     if (!(cause instanceof IllegalArgumentException)) {
312                         decrementAndRunTaskQueue();
313                     }
314                     promise.setFailure(future.cause());
315                 }
316             } catch (Throwable cause) {
317                 promise.tryFailure(cause);
318             }
319         }));
320         return promise;
321     }
322 
323     private void decrementAndRunTaskQueue() {
324         // We should never have a negative value.
325         int currentCount = acquiredChannelCount.decrementAndGet();
326         assert currentCount >= 0;
327 
328         // Run the pending acquire tasks before notify the original promise so if the user would
329         // try to acquire again from the ChannelFutureListener and the pendingAcquireCount is >=
330         // maxPendingAcquires we may be able to run some pending tasks first and so allow to add
331         // more.
332         runTaskQueue();
333     }
334 
335     private void runTaskQueue() {
336         while (acquiredChannelCount.get() < maxConnections) {
337             AcquireTask task = pendingAcquireQueue.poll();
338             if (task == null) {
339                 break;
340             }
341 
342             // Cancel the timeout if one was scheduled
343             ScheduledFuture<?> timeoutFuture = task.timeoutFuture;
344             if (timeoutFuture != null) {
345                 timeoutFuture.cancel(false);
346             }
347 
348             --pendingAcquireCount;
349             task.acquired();
350 
351             super.acquire(task.promise);
352         }
353 
354         // We should never have a negative value.
355         assert pendingAcquireCount >= 0;
356         assert acquiredChannelCount.get() >= 0;
357     }
358 
359     // AcquireTask extends AcquireListener to reduce object creations and so GC pressure
360     private final class AcquireTask extends AcquireListener {
361         final Promise<Channel> promise;
362         final long expireNanoTime = System.nanoTime() + acquireTimeoutNanos;
363         ScheduledFuture<?> timeoutFuture;
364 
365         AcquireTask(Promise<Channel> promise) {
366             super(promise);
367             // We need to create a new promise as we need to ensure the AcquireListener runs in the correct
368             // EventLoop.
369             this.promise = executor.<Channel>newPromise().addListener(this);
370         }
371     }
372 
373     private abstract class TimeoutTask implements Runnable {
374         @Override
375         public final void run() {
376             assert executor.inEventLoop();
377             long nanoTime = System.nanoTime();
378             for (;;) {
379                 AcquireTask task = pendingAcquireQueue.peek();
380                 // Compare nanoTime as descripted in the javadocs of System.nanoTime()
381                 //
382                 // See https://docs.oracle.com/javase/7/docs/api/java/lang/System.html#nanoTime()
383                 // See https://github.com/netty/netty/issues/3705
384                 if (task == null || nanoTime - task.expireNanoTime < 0) {
385                     break;
386                 }
387                 pendingAcquireQueue.remove();
388 
389                 --pendingAcquireCount;
390                 onTimeout(task);
391             }
392         }
393 
394         public abstract void onTimeout(AcquireTask task);
395     }
396 
397     private class AcquireListener implements FutureListener<Channel> {
398         private final Promise<Channel> originalPromise;
399         protected boolean acquired;
400 
401         AcquireListener(Promise<Channel> originalPromise) {
402             this.originalPromise = originalPromise;
403         }
404 
405         @Override
406         public void operationComplete(Future<Channel> future) throws Exception {
407             try {
408                 assert executor.inEventLoop();
409 
410                 if (closed) {
411                     if (future.isSuccess()) {
412                         // Since the pool is closed, we have no choice but to close the channel
413                         future.getNow().close();
414                     }
415                     originalPromise.setFailure(new IllegalStateException("FixedChannelPool was closed"));
416                     return;
417                 }
418 
419                 if (future.isSuccess()) {
420                     originalPromise.setSuccess(future.getNow());
421                 } else {
422                     if (acquired) {
423                         decrementAndRunTaskQueue();
424                     } else {
425                         runTaskQueue();
426                     }
427 
428                     originalPromise.setFailure(future.cause());
429                 }
430             } catch (Throwable cause) {
431                 originalPromise.tryFailure(cause);
432             }
433         }
434 
435         public void acquired() {
436             if (acquired) {
437                 return;
438             }
439             acquiredChannelCount.incrementAndGet();
440             acquired = true;
441         }
442     }
443 
444     @Override
445     public void close() {
446         try {
447             closeAsync().await();
448         } catch (InterruptedException e) {
449             Thread.currentThread().interrupt();
450             throw new RuntimeException(e);
451         }
452     }
453 
454     /**
455      * Closes the pool in an async manner.
456      *
457      * @return Future which represents completion of the close task
458      */
459     @Override
460     public Future<Void> closeAsync() {
461         if (executor.inEventLoop()) {
462             return close0();
463         } else {
464             final Promise<Void> closeComplete = executor.newPromise();
465             executor.execute(new Runnable() {
466                 @Override
467                 public void run() {
468                     close0().addListener((FutureListener<Void>) f -> {
469                         if (f.isSuccess()) {
470                             closeComplete.setSuccess(null);
471                         } else {
472                             closeComplete.setFailure(f.cause());
473                         }
474                     });
475                 }
476             });
477             return closeComplete;
478         }
479     }
480 
481     private Future<Void> close0() {
482         assert executor.inEventLoop();
483 
484         if (!closed) {
485             closed = true;
486             for (;;) {
487                 AcquireTask task = pendingAcquireQueue.poll();
488                 if (task == null) {
489                     break;
490                 }
491                 ScheduledFuture<?> f = task.timeoutFuture;
492                 if (f != null) {
493                     f.cancel(false);
494                 }
495                 task.promise.setFailure(new ClosedChannelException());
496             }
497             acquiredChannelCount.set(0);
498             pendingAcquireCount = 0;
499 
500             // Ensure we dispatch this on another Thread as close0 will be called from the EventExecutor and we need
501             // to ensure we will not block in a EventExecutor.
502             return GlobalEventExecutor.INSTANCE.submit(new Callable<Void>() {
503                 @Override
504                 public Void call() throws Exception {
505                     FixedChannelPool.super.close();
506                     return null;
507                 }
508             });
509         }
510 
511         return GlobalEventExecutor.INSTANCE.newSucceededFuture(null);
512     }
513 
514     private static final class AcquireTimeoutException extends TimeoutException {
515 
516         private AcquireTimeoutException() {
517             super("Acquire operation took longer then configured maximum time");
518         }
519 
520         // Suppress a warning since the method doesn't need synchronization
521         @Override
522         public Throwable fillInStackTrace() {
523             return this;
524         }
525     }
526 }