View Javadoc

1   /*
2    * Copyright 2015 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.pool;
17  
18  import io.netty.bootstrap.Bootstrap;
19  import io.netty.channel.Channel;
20  import io.netty.util.concurrent.EventExecutor;
21  import io.netty.util.concurrent.Future;
22  import io.netty.util.concurrent.FutureListener;
23  import io.netty.util.concurrent.Promise;
24  import io.netty.util.internal.ObjectUtil;
25  import io.netty.util.internal.ThrowableUtil;
26  
27  import java.nio.channels.ClosedChannelException;
28  import java.util.ArrayDeque;
29  import java.util.Queue;
30  import java.util.concurrent.ScheduledFuture;
31  import java.util.concurrent.TimeUnit;
32  import java.util.concurrent.TimeoutException;
33  
34  /**
35   * {@link ChannelPool} implementation that takes another {@link ChannelPool} implementation and enforce a maximum
36   * number of concurrent connections.
37   */
38  public class FixedChannelPool extends SimpleChannelPool {
39      private static final IllegalStateException FULL_EXCEPTION = ThrowableUtil.unknownStackTrace(
40              new IllegalStateException("Too many outstanding acquire operations"),
41              FixedChannelPool.class, "acquire0(...)");
42      private static final TimeoutException TIMEOUT_EXCEPTION = ThrowableUtil.unknownStackTrace(
43              new TimeoutException("Acquire operation took longer then configured maximum time"),
44              FixedChannelPool.class, "<init>(...)");
45      static final IllegalStateException POOL_CLOSED_ON_RELEASE_EXCEPTION = ThrowableUtil.unknownStackTrace(
46              new IllegalStateException("FixedChannelPooled was closed"),
47              FixedChannelPool.class, "release(...)");
48      static final IllegalStateException POOL_CLOSED_ON_ACQUIRE_EXCEPTION = ThrowableUtil.unknownStackTrace(
49              new IllegalStateException("FixedChannelPooled was closed"),
50              FixedChannelPool.class, "acquire0(...)");
51      public enum AcquireTimeoutAction {
52          /**
53           * Create a new connection when the timeout is detected.
54           */
55          NEW,
56  
57          /**
58           * Fail the {@link Future} of the acquire call with a {@link TimeoutException}.
59           */
60          FAIL
61      }
62  
63      private final EventExecutor executor;
64      private final long acquireTimeoutNanos;
65      private final Runnable timeoutTask;
66  
67      // There is no need to worry about synchronization as everything that modified the queue or counts is done
68      // by the above EventExecutor.
69      private final Queue<AcquireTask> pendingAcquireQueue = new ArrayDeque<AcquireTask>();
70      private final int maxConnections;
71      private final int maxPendingAcquires;
72      private int acquiredChannelCount;
73      private int pendingAcquireCount;
74      private boolean closed;
75  
76      /**
77       * Creates a new instance using the {@link ChannelHealthChecker#ACTIVE}.
78       *
79       * @param bootstrap         the {@link Bootstrap} that is used for connections
80       * @param handler           the {@link ChannelPoolHandler} that will be notified for the different pool actions
81       * @param maxConnections    the number of maximal active connections, once this is reached new tries to acquire
82       *                          a {@link Channel} will be delayed until a connection is returned to the pool again.
83       */
84      public FixedChannelPool(Bootstrap bootstrap,
85                              ChannelPoolHandler handler, int maxConnections) {
86          this(bootstrap, handler, maxConnections, Integer.MAX_VALUE);
87      }
88  
89      /**
90       * Creates a new instance using the {@link ChannelHealthChecker#ACTIVE}.
91       *
92       * @param bootstrap             the {@link Bootstrap} that is used for connections
93       * @param handler               the {@link ChannelPoolHandler} that will be notified for the different pool actions
94       * @param maxConnections        the number of maximal active connections, once this is reached new tries to
95       *                              acquire a {@link Channel} will be delayed until a connection is returned to the
96       *                              pool again.
97       * @param maxPendingAcquires    the maximum number of pending acquires. Once this is exceed acquire tries will
98       *                              be failed.
99       */
100     public FixedChannelPool(Bootstrap bootstrap,
101                             ChannelPoolHandler handler, int maxConnections, int maxPendingAcquires) {
102         this(bootstrap, handler, ChannelHealthChecker.ACTIVE, null, -1, maxConnections, maxPendingAcquires);
103     }
104 
105     /**
106      * Creates a new instance.
107      *
108      * @param bootstrap             the {@link Bootstrap} that is used for connections
109      * @param handler               the {@link ChannelPoolHandler} that will be notified for the different pool actions
110      * @param healthCheck           the {@link ChannelHealthChecker} that will be used to check if a {@link Channel} is
111      *                              still healthy when obtain from the {@link ChannelPool}
112      * @param action                the {@link AcquireTimeoutAction} to use or {@code null} if non should be used.
113      *                              In this case {@param acquireTimeoutMillis} must be {@code -1}.
114      * @param acquireTimeoutMillis  the time (in milliseconds) after which an pending acquire must complete or
115      *                              the {@link AcquireTimeoutAction} takes place.
116      * @param maxConnections        the number of maximal active connections, once this is reached new tries to
117      *                              acquire a {@link Channel} will be delayed until a connection is returned to the
118      *                              pool again.
119      * @param maxPendingAcquires    the maximum number of pending acquires. Once this is exceed acquire tries will
120      *                              be failed.
121      */
122     public FixedChannelPool(Bootstrap bootstrap,
123                             ChannelPoolHandler handler,
124                             ChannelHealthChecker healthCheck, AcquireTimeoutAction action,
125                             final long acquireTimeoutMillis,
126                             int maxConnections, int maxPendingAcquires) {
127         this(bootstrap, handler, healthCheck, action, acquireTimeoutMillis, maxConnections, maxPendingAcquires, true);
128     }
129 
130     /**
131      * Creates a new instance.
132      *
133      * @param bootstrap             the {@link Bootstrap} that is used for connections
134      * @param handler               the {@link ChannelPoolHandler} that will be notified for the different pool actions
135      * @param healthCheck           the {@link ChannelHealthChecker} that will be used to check if a {@link Channel} is
136      *                              still healthy when obtain from the {@link ChannelPool}
137      * @param action                the {@link AcquireTimeoutAction} to use or {@code null} if non should be used.
138      *                              In this case {@param acquireTimeoutMillis} must be {@code -1}.
139      * @param acquireTimeoutMillis  the time (in milliseconds) after which an pending acquire must complete or
140      *                              the {@link AcquireTimeoutAction} takes place.
141      * @param maxConnections        the number of maximal active connections, once this is reached new tries to
142      *                              acquire a {@link Channel} will be delayed until a connection is returned to the
143      *                              pool again.
144      * @param maxPendingAcquires    the maximum number of pending acquires. Once this is exceed acquire tries will
145      *                              be failed.
146      * @param releaseHealthCheck    will check channel health before offering back if this parameter set to
147      *                              {@code true}.
148      */
149     public FixedChannelPool(Bootstrap bootstrap,
150                             ChannelPoolHandler handler,
151                             ChannelHealthChecker healthCheck, AcquireTimeoutAction action,
152                             final long acquireTimeoutMillis,
153                             int maxConnections, int maxPendingAcquires, final boolean releaseHealthCheck) {
154         this(bootstrap, handler, healthCheck, action, acquireTimeoutMillis, maxConnections, maxPendingAcquires,
155                 releaseHealthCheck, true);
156     }
157 
158     /**
159      * Creates a new instance.
160      *
161      * @param bootstrap             the {@link Bootstrap} that is used for connections
162      * @param handler               the {@link ChannelPoolHandler} that will be notified for the different pool actions
163      * @param healthCheck           the {@link ChannelHealthChecker} that will be used to check if a {@link Channel} is
164      *                              still healthy when obtain from the {@link ChannelPool}
165      * @param action                the {@link AcquireTimeoutAction} to use or {@code null} if non should be used.
166      *                              In this case {@param acquireTimeoutMillis} must be {@code -1}.
167      * @param acquireTimeoutMillis  the time (in milliseconds) after which an pending acquire must complete or
168      *                              the {@link AcquireTimeoutAction} takes place.
169      * @param maxConnections        the number of maximal active connections, once this is reached new tries to
170      *                              acquire a {@link Channel} will be delayed until a connection is returned to the
171      *                              pool again.
172      * @param maxPendingAcquires    the maximum number of pending acquires. Once this is exceed acquire tries will
173      *                              be failed.
174      * @param releaseHealthCheck    will check channel health before offering back if this parameter set to
175      *                              {@code true}.
176      * @param lastRecentUsed        {@code true} {@link Channel} selection will be LIFO, if {@code false} FIFO.
177      */
178     public FixedChannelPool(Bootstrap bootstrap,
179                             ChannelPoolHandler handler,
180                             ChannelHealthChecker healthCheck, AcquireTimeoutAction action,
181                             final long acquireTimeoutMillis,
182                             int maxConnections, int maxPendingAcquires,
183                             boolean releaseHealthCheck, boolean lastRecentUsed) {
184         super(bootstrap, handler, healthCheck, releaseHealthCheck, lastRecentUsed);
185         if (maxConnections < 1) {
186             throw new IllegalArgumentException("maxConnections: " + maxConnections + " (expected: >= 1)");
187         }
188         if (maxPendingAcquires < 1) {
189             throw new IllegalArgumentException("maxPendingAcquires: " + maxPendingAcquires + " (expected: >= 1)");
190         }
191         if (action == null && acquireTimeoutMillis == -1) {
192             timeoutTask = null;
193             acquireTimeoutNanos = -1;
194         } else if (action == null && acquireTimeoutMillis != -1) {
195             throw new NullPointerException("action");
196         } else if (action != null && acquireTimeoutMillis < 0) {
197             throw new IllegalArgumentException("acquireTimeoutMillis: " + acquireTimeoutMillis + " (expected: >= 0)");
198         } else {
199             acquireTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(acquireTimeoutMillis);
200             switch (action) {
201             case FAIL:
202                 timeoutTask = new TimeoutTask() {
203                     @Override
204                     public void onTimeout(AcquireTask task) {
205                         // Fail the promise as we timed out.
206                         task.promise.setFailure(TIMEOUT_EXCEPTION);
207                     }
208                 };
209                 break;
210             case NEW:
211                 timeoutTask = new TimeoutTask() {
212                     @Override
213                     public void onTimeout(AcquireTask task) {
214                         // Increment the acquire count and delegate to super to actually acquire a Channel which will
215                         // create a new connection.
216                         task.acquired();
217 
218                         FixedChannelPool.super.acquire(task.promise);
219                     }
220                 };
221                 break;
222             default:
223                 throw new Error();
224             }
225         }
226         executor = bootstrap.group().next();
227         this.maxConnections = maxConnections;
228         this.maxPendingAcquires = maxPendingAcquires;
229     }
230 
231     @Override
232     public Future<Channel> acquire(final Promise<Channel> promise) {
233         try {
234             if (executor.inEventLoop()) {
235                 acquire0(promise);
236             } else {
237                 executor.execute(new Runnable() {
238                     @Override
239                     public void run() {
240                         acquire0(promise);
241                     }
242                 });
243             }
244         } catch (Throwable cause) {
245             promise.setFailure(cause);
246         }
247         return promise;
248     }
249 
250     private void acquire0(final Promise<Channel> promise) {
251         assert executor.inEventLoop();
252 
253         if (closed) {
254             promise.setFailure(POOL_CLOSED_ON_ACQUIRE_EXCEPTION);
255             return;
256         }
257         if (acquiredChannelCount < maxConnections) {
258             assert acquiredChannelCount >= 0;
259 
260             // We need to create a new promise as we need to ensure the AcquireListener runs in the correct
261             // EventLoop
262             Promise<Channel> p = executor.newPromise();
263             AcquireListener l = new AcquireListener(promise);
264             l.acquired();
265             p.addListener(l);
266             super.acquire(p);
267         } else {
268             if (pendingAcquireCount >= maxPendingAcquires) {
269                 promise.setFailure(FULL_EXCEPTION);
270             } else {
271                 AcquireTask task = new AcquireTask(promise);
272                 if (pendingAcquireQueue.offer(task)) {
273                     ++pendingAcquireCount;
274 
275                     if (timeoutTask != null) {
276                         task.timeoutFuture = executor.schedule(timeoutTask, acquireTimeoutNanos, TimeUnit.NANOSECONDS);
277                     }
278                 } else {
279                     promise.setFailure(FULL_EXCEPTION);
280                 }
281             }
282 
283             assert pendingAcquireCount > 0;
284         }
285     }
286 
287     @Override
288     public Future<Void> release(final Channel channel, final Promise<Void> promise) {
289         ObjectUtil.checkNotNull(promise, "promise");
290         final Promise<Void> p = executor.newPromise();
291         super.release(channel, p.addListener(new FutureListener<Void>() {
292 
293             @Override
294             public void operationComplete(Future<Void> future) throws Exception {
295                 assert executor.inEventLoop();
296 
297                 if (closed) {
298                     // Since the pool is closed, we have no choice but to close the channel
299                     channel.close();
300                     promise.setFailure(POOL_CLOSED_ON_RELEASE_EXCEPTION);
301                     return;
302                 }
303 
304                 if (future.isSuccess()) {
305                     decrementAndRunTaskQueue();
306                     promise.setSuccess(null);
307                 } else {
308                     Throwable cause = future.cause();
309                     // Check if the exception was not because of we passed the Channel to the wrong pool.
310                     if (!(cause instanceof IllegalArgumentException)) {
311                         decrementAndRunTaskQueue();
312                     }
313                     promise.setFailure(future.cause());
314                 }
315             }
316         }));
317         return promise;
318     }
319 
320     private void decrementAndRunTaskQueue() {
321         --acquiredChannelCount;
322 
323         // We should never have a negative value.
324         assert acquiredChannelCount >= 0;
325 
326         // Run the pending acquire tasks before notify the original promise so if the user would
327         // try to acquire again from the ChannelFutureListener and the pendingAcquireCount is >=
328         // maxPendingAcquires we may be able to run some pending tasks first and so allow to add
329         // more.
330         runTaskQueue();
331     }
332 
333     private void runTaskQueue() {
334         while (acquiredChannelCount < maxConnections) {
335             AcquireTask task = pendingAcquireQueue.poll();
336             if (task == null) {
337                 break;
338             }
339 
340             // Cancel the timeout if one was scheduled
341             ScheduledFuture<?> timeoutFuture = task.timeoutFuture;
342             if (timeoutFuture != null) {
343                 timeoutFuture.cancel(false);
344             }
345 
346             --pendingAcquireCount;
347             task.acquired();
348 
349             super.acquire(task.promise);
350         }
351 
352         // We should never have a negative value.
353         assert pendingAcquireCount >= 0;
354         assert acquiredChannelCount >= 0;
355     }
356 
357     // AcquireTask extends AcquireListener to reduce object creations and so GC pressure
358     private final class AcquireTask extends AcquireListener {
359         final Promise<Channel> promise;
360         final long expireNanoTime = System.nanoTime() + acquireTimeoutNanos;
361         ScheduledFuture<?> timeoutFuture;
362 
363         public AcquireTask(Promise<Channel> promise) {
364             super(promise);
365             // We need to create a new promise as we need to ensure the AcquireListener runs in the correct
366             // EventLoop.
367             this.promise = executor.<Channel>newPromise().addListener(this);
368         }
369     }
370 
371     private abstract class TimeoutTask implements Runnable {
372         @Override
373         public final void run() {
374             assert executor.inEventLoop();
375             long nanoTime = System.nanoTime();
376             for (;;) {
377                 AcquireTask task = pendingAcquireQueue.peek();
378                 // Compare nanoTime as descripted in the javadocs of System.nanoTime()
379                 //
380                 // See https://docs.oracle.com/javase/7/docs/api/java/lang/System.html#nanoTime()
381                 // See https://github.com/netty/netty/issues/3705
382                 if (task == null || nanoTime - task.expireNanoTime < 0) {
383                     break;
384                 }
385                 pendingAcquireQueue.remove();
386 
387                 --pendingAcquireCount;
388                 onTimeout(task);
389             }
390         }
391 
392         public abstract void onTimeout(AcquireTask task);
393     }
394 
395     private class AcquireListener implements FutureListener<Channel> {
396         private final Promise<Channel> originalPromise;
397         protected boolean acquired;
398 
399         AcquireListener(Promise<Channel> originalPromise) {
400             this.originalPromise = originalPromise;
401         }
402 
403         @Override
404         public void operationComplete(Future<Channel> future) throws Exception {
405             assert executor.inEventLoop();
406 
407             if (closed) {
408                 if (future.isSuccess()) {
409                     // Since the pool is closed, we have no choice but to close the channel
410                     future.getNow().close();
411                 }
412                 originalPromise.setFailure(POOL_CLOSED_ON_ACQUIRE_EXCEPTION);
413                 return;
414             }
415 
416             if (future.isSuccess()) {
417                 originalPromise.setSuccess(future.getNow());
418             } else {
419                 if (acquired) {
420                     decrementAndRunTaskQueue();
421                 } else {
422                     runTaskQueue();
423                 }
424 
425                 originalPromise.setFailure(future.cause());
426             }
427         }
428 
429         public void acquired() {
430             if (acquired) {
431                 return;
432             }
433             acquiredChannelCount++;
434             acquired = true;
435         }
436     }
437 
438     @Override
439     public void close() {
440         executor.execute(new Runnable() {
441             @Override
442             public void run() {
443                 if (!closed) {
444                     closed = true;
445                     for (;;) {
446                         AcquireTask task = pendingAcquireQueue.poll();
447                         if (task == null) {
448                             break;
449                         }
450                         ScheduledFuture<?> f = task.timeoutFuture;
451                         if (f != null) {
452                             f.cancel(false);
453                         }
454                         task.promise.setFailure(new ClosedChannelException());
455                     }
456                     acquiredChannelCount = 0;
457                     pendingAcquireCount = 0;
458                     FixedChannelPool.super.close();
459                 }
460             }
461         });
462     }
463 }