View Javadoc
1   /*
2    * Copyright 2015 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.pool;
17  
18  import io.netty.bootstrap.Bootstrap;
19  import io.netty.channel.Channel;
20  import io.netty.util.concurrent.EventExecutor;
21  import io.netty.util.concurrent.Future;
22  import io.netty.util.concurrent.FutureListener;
23  import io.netty.util.concurrent.GlobalEventExecutor;
24  import io.netty.util.concurrent.Promise;
25  import io.netty.util.internal.ObjectUtil;
26  import io.netty.util.internal.ThrowableUtil;
27  
28  import java.nio.channels.ClosedChannelException;
29  import java.util.ArrayDeque;
30  import java.util.Queue;
31  import java.util.concurrent.atomic.AtomicInteger;
32  import java.util.concurrent.ScheduledFuture;
33  import java.util.concurrent.TimeUnit;
34  import java.util.concurrent.TimeoutException;
35  
36  /**
37   * {@link ChannelPool} implementation that takes another {@link ChannelPool} implementation and enforce a maximum
38   * number of concurrent connections.
39   */
40  public class FixedChannelPool extends SimpleChannelPool {
41      private static final IllegalStateException FULL_EXCEPTION = ThrowableUtil.unknownStackTrace(
42              new IllegalStateException("Too many outstanding acquire operations"),
43              FixedChannelPool.class, "acquire0(...)");
44      private static final TimeoutException TIMEOUT_EXCEPTION = ThrowableUtil.unknownStackTrace(
45              new TimeoutException("Acquire operation took longer then configured maximum time"),
46              FixedChannelPool.class, "<init>(...)");
47      static final IllegalStateException POOL_CLOSED_ON_RELEASE_EXCEPTION = ThrowableUtil.unknownStackTrace(
48              new IllegalStateException("FixedChannelPool was closed"),
49              FixedChannelPool.class, "release(...)");
50      static final IllegalStateException POOL_CLOSED_ON_ACQUIRE_EXCEPTION = ThrowableUtil.unknownStackTrace(
51              new IllegalStateException("FixedChannelPool was closed"),
52              FixedChannelPool.class, "acquire0(...)");
53      public enum AcquireTimeoutAction {
54          /**
55           * Create a new connection when the timeout is detected.
56           */
57          NEW,
58  
59          /**
60           * Fail the {@link Future} of the acquire call with a {@link TimeoutException}.
61           */
62          FAIL
63      }
64  
65      private final EventExecutor executor;
66      private final long acquireTimeoutNanos;
67      private final Runnable timeoutTask;
68  
69      // There is no need to worry about synchronization as everything that modified the queue or counts is done
70      // by the above EventExecutor.
71      private final Queue<AcquireTask> pendingAcquireQueue = new ArrayDeque<AcquireTask>();
72      private final int maxConnections;
73      private final int maxPendingAcquires;
74      private final AtomicInteger acquiredChannelCount = new AtomicInteger();
75      private int pendingAcquireCount;
76      private boolean closed;
77  
78      /**
79       * Creates a new instance using the {@link ChannelHealthChecker#ACTIVE}.
80       *
81       * @param bootstrap         the {@link Bootstrap} that is used for connections
82       * @param handler           the {@link ChannelPoolHandler} that will be notified for the different pool actions
83       * @param maxConnections    the number of maximal active connections, once this is reached new tries to acquire
84       *                          a {@link Channel} will be delayed until a connection is returned to the pool again.
85       */
86      public FixedChannelPool(Bootstrap bootstrap,
87                              ChannelPoolHandler handler, int maxConnections) {
88          this(bootstrap, handler, maxConnections, Integer.MAX_VALUE);
89      }
90  
91      /**
92       * Creates a new instance using the {@link ChannelHealthChecker#ACTIVE}.
93       *
94       * @param bootstrap             the {@link Bootstrap} that is used for connections
95       * @param handler               the {@link ChannelPoolHandler} that will be notified for the different pool actions
96       * @param maxConnections        the number of maximal active connections, once this is reached new tries to
97       *                              acquire a {@link Channel} will be delayed until a connection is returned to the
98       *                              pool again.
99       * @param maxPendingAcquires    the maximum number of pending acquires. Once this is exceed acquire tries will
100      *                              be failed.
101      */
102     public FixedChannelPool(Bootstrap bootstrap,
103                             ChannelPoolHandler handler, int maxConnections, int maxPendingAcquires) {
104         this(bootstrap, handler, ChannelHealthChecker.ACTIVE, null, -1, maxConnections, maxPendingAcquires);
105     }
106 
107     /**
108      * Creates a new instance.
109      *
110      * @param bootstrap             the {@link Bootstrap} that is used for connections
111      * @param handler               the {@link ChannelPoolHandler} that will be notified for the different pool actions
112      * @param healthCheck           the {@link ChannelHealthChecker} that will be used to check if a {@link Channel} is
113      *                              still healthy when obtain from the {@link ChannelPool}
114      * @param action                the {@link AcquireTimeoutAction} to use or {@code null} if non should be used.
115      *                              In this case {@param acquireTimeoutMillis} must be {@code -1}.
116      * @param acquireTimeoutMillis  the time (in milliseconds) after which an pending acquire must complete or
117      *                              the {@link AcquireTimeoutAction} takes place.
118      * @param maxConnections        the number of maximal active connections, once this is reached new tries to
119      *                              acquire a {@link Channel} will be delayed until a connection is returned to the
120      *                              pool again.
121      * @param maxPendingAcquires    the maximum number of pending acquires. Once this is exceed acquire tries will
122      *                              be failed.
123      */
124     public FixedChannelPool(Bootstrap bootstrap,
125                             ChannelPoolHandler handler,
126                             ChannelHealthChecker healthCheck, AcquireTimeoutAction action,
127                             final long acquireTimeoutMillis,
128                             int maxConnections, int maxPendingAcquires) {
129         this(bootstrap, handler, healthCheck, action, acquireTimeoutMillis, maxConnections, maxPendingAcquires, true);
130     }
131 
132     /**
133      * Creates a new instance.
134      *
135      * @param bootstrap             the {@link Bootstrap} that is used for connections
136      * @param handler               the {@link ChannelPoolHandler} that will be notified for the different pool actions
137      * @param healthCheck           the {@link ChannelHealthChecker} that will be used to check if a {@link Channel} is
138      *                              still healthy when obtain from the {@link ChannelPool}
139      * @param action                the {@link AcquireTimeoutAction} to use or {@code null} if non should be used.
140      *                              In this case {@param acquireTimeoutMillis} must be {@code -1}.
141      * @param acquireTimeoutMillis  the time (in milliseconds) after which an pending acquire must complete or
142      *                              the {@link AcquireTimeoutAction} takes place.
143      * @param maxConnections        the number of maximal active connections, once this is reached new tries to
144      *                              acquire a {@link Channel} will be delayed until a connection is returned to the
145      *                              pool again.
146      * @param maxPendingAcquires    the maximum number of pending acquires. Once this is exceed acquire tries will
147      *                              be failed.
148      * @param releaseHealthCheck    will check channel health before offering back if this parameter set to
149      *                              {@code true}.
150      */
151     public FixedChannelPool(Bootstrap bootstrap,
152                             ChannelPoolHandler handler,
153                             ChannelHealthChecker healthCheck, AcquireTimeoutAction action,
154                             final long acquireTimeoutMillis,
155                             int maxConnections, int maxPendingAcquires, final boolean releaseHealthCheck) {
156         this(bootstrap, handler, healthCheck, action, acquireTimeoutMillis, maxConnections, maxPendingAcquires,
157                 releaseHealthCheck, true);
158     }
159 
160     /**
161      * Creates a new instance.
162      *
163      * @param bootstrap             the {@link Bootstrap} that is used for connections
164      * @param handler               the {@link ChannelPoolHandler} that will be notified for the different pool actions
165      * @param healthCheck           the {@link ChannelHealthChecker} that will be used to check if a {@link Channel} is
166      *                              still healthy when obtain from the {@link ChannelPool}
167      * @param action                the {@link AcquireTimeoutAction} to use or {@code null} if non should be used.
168      *                              In this case {@param acquireTimeoutMillis} must be {@code -1}.
169      * @param acquireTimeoutMillis  the time (in milliseconds) after which an pending acquire must complete or
170      *                              the {@link AcquireTimeoutAction} takes place.
171      * @param maxConnections        the number of maximal active connections, once this is reached new tries to
172      *                              acquire a {@link Channel} will be delayed until a connection is returned to the
173      *                              pool again.
174      * @param maxPendingAcquires    the maximum number of pending acquires. Once this is exceed acquire tries will
175      *                              be failed.
176      * @param releaseHealthCheck    will check channel health before offering back if this parameter set to
177      *                              {@code true}.
178      * @param lastRecentUsed        {@code true} {@link Channel} selection will be LIFO, if {@code false} FIFO.
179      */
180     public FixedChannelPool(Bootstrap bootstrap,
181                             ChannelPoolHandler handler,
182                             ChannelHealthChecker healthCheck, AcquireTimeoutAction action,
183                             final long acquireTimeoutMillis,
184                             int maxConnections, int maxPendingAcquires,
185                             boolean releaseHealthCheck, boolean lastRecentUsed) {
186         super(bootstrap, handler, healthCheck, releaseHealthCheck, lastRecentUsed);
187         if (maxConnections < 1) {
188             throw new IllegalArgumentException("maxConnections: " + maxConnections + " (expected: >= 1)");
189         }
190         if (maxPendingAcquires < 1) {
191             throw new IllegalArgumentException("maxPendingAcquires: " + maxPendingAcquires + " (expected: >= 1)");
192         }
193         if (action == null && acquireTimeoutMillis == -1) {
194             timeoutTask = null;
195             acquireTimeoutNanos = -1;
196         } else if (action == null && acquireTimeoutMillis != -1) {
197             throw new NullPointerException("action");
198         } else if (action != null && acquireTimeoutMillis < 0) {
199             throw new IllegalArgumentException("acquireTimeoutMillis: " + acquireTimeoutMillis + " (expected: >= 0)");
200         } else {
201             acquireTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(acquireTimeoutMillis);
202             switch (action) {
203             case FAIL:
204                 timeoutTask = new TimeoutTask() {
205                     @Override
206                     public void onTimeout(AcquireTask task) {
207                         // Fail the promise as we timed out.
208                         task.promise.setFailure(TIMEOUT_EXCEPTION);
209                     }
210                 };
211                 break;
212             case NEW:
213                 timeoutTask = new TimeoutTask() {
214                     @Override
215                     public void onTimeout(AcquireTask task) {
216                         // Increment the acquire count and delegate to super to actually acquire a Channel which will
217                         // create a new connection.
218                         task.acquired();
219 
220                         FixedChannelPool.super.acquire(task.promise);
221                     }
222                 };
223                 break;
224             default:
225                 throw new Error();
226             }
227         }
228         executor = bootstrap.config().group().next();
229         this.maxConnections = maxConnections;
230         this.maxPendingAcquires = maxPendingAcquires;
231     }
232 
233     /** Returns the number of acquired channels that this pool thinks it has. */
234     public int acquiredChannelCount() {
235         return acquiredChannelCount.get();
236     }
237 
238     @Override
239     public Future<Channel> acquire(final Promise<Channel> promise) {
240         try {
241             if (executor.inEventLoop()) {
242                 acquire0(promise);
243             } else {
244                 executor.execute(new Runnable() {
245                     @Override
246                     public void run() {
247                         acquire0(promise);
248                     }
249                 });
250             }
251         } catch (Throwable cause) {
252             promise.setFailure(cause);
253         }
254         return promise;
255     }
256 
257     private void acquire0(final Promise<Channel> promise) {
258         assert executor.inEventLoop();
259 
260         if (closed) {
261             promise.setFailure(POOL_CLOSED_ON_ACQUIRE_EXCEPTION);
262             return;
263         }
264         if (acquiredChannelCount.get() < maxConnections) {
265             assert acquiredChannelCount.get() >= 0;
266 
267             // We need to create a new promise as we need to ensure the AcquireListener runs in the correct
268             // EventLoop
269             Promise<Channel> p = executor.newPromise();
270             AcquireListener l = new AcquireListener(promise);
271             l.acquired();
272             p.addListener(l);
273             super.acquire(p);
274         } else {
275             if (pendingAcquireCount >= maxPendingAcquires) {
276                 promise.setFailure(FULL_EXCEPTION);
277             } else {
278                 AcquireTask task = new AcquireTask(promise);
279                 if (pendingAcquireQueue.offer(task)) {
280                     ++pendingAcquireCount;
281 
282                     if (timeoutTask != null) {
283                         task.timeoutFuture = executor.schedule(timeoutTask, acquireTimeoutNanos, TimeUnit.NANOSECONDS);
284                     }
285                 } else {
286                     promise.setFailure(FULL_EXCEPTION);
287                 }
288             }
289 
290             assert pendingAcquireCount > 0;
291         }
292     }
293 
294     @Override
295     public Future<Void> release(final Channel channel, final Promise<Void> promise) {
296         ObjectUtil.checkNotNull(promise, "promise");
297         final Promise<Void> p = executor.newPromise();
298         super.release(channel, p.addListener(new FutureListener<Void>() {
299 
300             @Override
301             public void operationComplete(Future<Void> future) throws Exception {
302                 assert executor.inEventLoop();
303 
304                 if (closed) {
305                     // Since the pool is closed, we have no choice but to close the channel
306                     channel.close();
307                     promise.setFailure(POOL_CLOSED_ON_RELEASE_EXCEPTION);
308                     return;
309                 }
310 
311                 if (future.isSuccess()) {
312                     decrementAndRunTaskQueue();
313                     promise.setSuccess(null);
314                 } else {
315                     Throwable cause = future.cause();
316                     // Check if the exception was not because of we passed the Channel to the wrong pool.
317                     if (!(cause instanceof IllegalArgumentException)) {
318                         decrementAndRunTaskQueue();
319                     }
320                     promise.setFailure(future.cause());
321                 }
322             }
323         }));
324         return promise;
325     }
326 
327     private void decrementAndRunTaskQueue() {
328         // We should never have a negative value.
329         int currentCount = acquiredChannelCount.decrementAndGet();
330         assert currentCount >= 0;
331 
332         // Run the pending acquire tasks before notify the original promise so if the user would
333         // try to acquire again from the ChannelFutureListener and the pendingAcquireCount is >=
334         // maxPendingAcquires we may be able to run some pending tasks first and so allow to add
335         // more.
336         runTaskQueue();
337     }
338 
339     private void runTaskQueue() {
340         while (acquiredChannelCount.get() < maxConnections) {
341             AcquireTask task = pendingAcquireQueue.poll();
342             if (task == null) {
343                 break;
344             }
345 
346             // Cancel the timeout if one was scheduled
347             ScheduledFuture<?> timeoutFuture = task.timeoutFuture;
348             if (timeoutFuture != null) {
349                 timeoutFuture.cancel(false);
350             }
351 
352             --pendingAcquireCount;
353             task.acquired();
354 
355             super.acquire(task.promise);
356         }
357 
358         // We should never have a negative value.
359         assert pendingAcquireCount >= 0;
360         assert acquiredChannelCount.get() >= 0;
361     }
362 
363     // AcquireTask extends AcquireListener to reduce object creations and so GC pressure
364     private final class AcquireTask extends AcquireListener {
365         final Promise<Channel> promise;
366         final long expireNanoTime = System.nanoTime() + acquireTimeoutNanos;
367         ScheduledFuture<?> timeoutFuture;
368 
369         public AcquireTask(Promise<Channel> promise) {
370             super(promise);
371             // We need to create a new promise as we need to ensure the AcquireListener runs in the correct
372             // EventLoop.
373             this.promise = executor.<Channel>newPromise().addListener(this);
374         }
375     }
376 
377     private abstract class TimeoutTask implements Runnable {
378         @Override
379         public final void run() {
380             assert executor.inEventLoop();
381             long nanoTime = System.nanoTime();
382             for (;;) {
383                 AcquireTask task = pendingAcquireQueue.peek();
384                 // Compare nanoTime as descripted in the javadocs of System.nanoTime()
385                 //
386                 // See https://docs.oracle.com/javase/7/docs/api/java/lang/System.html#nanoTime()
387                 // See https://github.com/netty/netty/issues/3705
388                 if (task == null || nanoTime - task.expireNanoTime < 0) {
389                     break;
390                 }
391                 pendingAcquireQueue.remove();
392 
393                 --pendingAcquireCount;
394                 onTimeout(task);
395             }
396         }
397 
398         public abstract void onTimeout(AcquireTask task);
399     }
400 
401     private class AcquireListener implements FutureListener<Channel> {
402         private final Promise<Channel> originalPromise;
403         protected boolean acquired;
404 
405         AcquireListener(Promise<Channel> originalPromise) {
406             this.originalPromise = originalPromise;
407         }
408 
409         @Override
410         public void operationComplete(Future<Channel> future) throws Exception {
411             assert executor.inEventLoop();
412 
413             if (closed) {
414                 if (future.isSuccess()) {
415                     // Since the pool is closed, we have no choice but to close the channel
416                     future.getNow().close();
417                 }
418                 originalPromise.setFailure(POOL_CLOSED_ON_ACQUIRE_EXCEPTION);
419                 return;
420             }
421 
422             if (future.isSuccess()) {
423                 originalPromise.setSuccess(future.getNow());
424             } else {
425                 if (acquired) {
426                     decrementAndRunTaskQueue();
427                 } else {
428                     runTaskQueue();
429                 }
430 
431                 originalPromise.setFailure(future.cause());
432             }
433         }
434 
435         public void acquired() {
436             if (acquired) {
437                 return;
438             }
439             acquiredChannelCount.incrementAndGet();
440             acquired = true;
441         }
442     }
443 
444     @Override
445     public void close() {
446         if (executor.inEventLoop()) {
447             close0();
448         } else {
449             executor.submit(new Runnable() {
450                 @Override
451                 public void run() {
452                     close0();
453                 }
454             }).awaitUninterruptibly();
455         }
456     }
457 
458     private void close0() {
459         if (!closed) {
460             closed = true;
461             for (;;) {
462                 AcquireTask task = pendingAcquireQueue.poll();
463                 if (task == null) {
464                     break;
465                 }
466                 ScheduledFuture<?> f = task.timeoutFuture;
467                 if (f != null) {
468                     f.cancel(false);
469                 }
470                 task.promise.setFailure(new ClosedChannelException());
471             }
472             acquiredChannelCount.set(0);
473             pendingAcquireCount = 0;
474 
475             // Ensure we dispatch this on another Thread as close0 will be called from the EventExecutor and we need
476             // to ensure we will not block in a EventExecutor.
477             GlobalEventExecutor.INSTANCE.execute(new Runnable() {
478                 @Override
479                 public void run() {
480                     FixedChannelPool.super.close();
481                 }
482             });
483         }
484     }
485 }