1 /*
2 * Copyright 2015 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License,
5 * version 2.0 (the "License"); you may not use this file except in compliance
6 * with the License. You may obtain a copy of the License at:
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations
14 * under the License.
15 */
16 package io.netty.channel.pool;
17
18 import io.netty.bootstrap.Bootstrap;
19 import io.netty.channel.Channel;
20 import io.netty.util.concurrent.EventExecutor;
21 import io.netty.util.concurrent.Future;
22 import io.netty.util.concurrent.FutureListener;
23 import io.netty.util.concurrent.Promise;
24 import io.netty.util.internal.ObjectUtil;
25 import io.netty.util.internal.ThrowableUtil;
26
27 import java.nio.channels.ClosedChannelException;
28 import java.util.ArrayDeque;
29 import java.util.Queue;
30 import java.util.concurrent.ScheduledFuture;
31 import java.util.concurrent.TimeUnit;
32 import java.util.concurrent.TimeoutException;
33
34 /**
35 * {@link ChannelPool} implementation that takes another {@link ChannelPool} implementation and enforce a maximum
36 * number of concurrent connections.
37 */
38 public class FixedChannelPool extends SimpleChannelPool {
39 private static final IllegalStateException FULL_EXCEPTION = ThrowableUtil.unknownStackTrace(
40 new IllegalStateException("Too many outstanding acquire operations"),
41 FixedChannelPool.class, "acquire0(...)");
42 private static final TimeoutException TIMEOUT_EXCEPTION = ThrowableUtil.unknownStackTrace(
43 new TimeoutException("Acquire operation took longer then configured maximum time"),
44 FixedChannelPool.class, "<init>(...)");
45 static final IllegalStateException POOL_CLOSED_ON_RELEASE_EXCEPTION = ThrowableUtil.unknownStackTrace(
46 new IllegalStateException("FixedChannelPooled was closed"),
47 FixedChannelPool.class, "release(...)");
48 static final IllegalStateException POOL_CLOSED_ON_ACQUIRE_EXCEPTION = ThrowableUtil.unknownStackTrace(
49 new IllegalStateException("FixedChannelPooled was closed"),
50 FixedChannelPool.class, "acquire0(...)");
51 public enum AcquireTimeoutAction {
52 /**
53 * Create a new connection when the timeout is detected.
54 */
55 NEW,
56
57 /**
58 * Fail the {@link Future} of the acquire call with a {@link TimeoutException}.
59 */
60 FAIL
61 }
62
63 private final EventExecutor executor;
64 private final long acquireTimeoutNanos;
65 private final Runnable timeoutTask;
66
67 // There is no need to worry about synchronization as everything that modified the queue or counts is done
68 // by the above EventExecutor.
69 private final Queue<AcquireTask> pendingAcquireQueue = new ArrayDeque<AcquireTask>();
70 private final int maxConnections;
71 private final int maxPendingAcquires;
72 private int acquiredChannelCount;
73 private int pendingAcquireCount;
74 private boolean closed;
75
76 /**
77 * Creates a new instance using the {@link ChannelHealthChecker#ACTIVE}.
78 *
79 * @param bootstrap the {@link Bootstrap} that is used for connections
80 * @param handler the {@link ChannelPoolHandler} that will be notified for the different pool actions
81 * @param maxConnections the number of maximal active connections, once this is reached new tries to acquire
82 * a {@link Channel} will be delayed until a connection is returned to the pool again.
83 */
84 public FixedChannelPool(Bootstrap bootstrap,
85 ChannelPoolHandler handler, int maxConnections) {
86 this(bootstrap, handler, maxConnections, Integer.MAX_VALUE);
87 }
88
89 /**
90 * Creates a new instance using the {@link ChannelHealthChecker#ACTIVE}.
91 *
92 * @param bootstrap the {@link Bootstrap} that is used for connections
93 * @param handler the {@link ChannelPoolHandler} that will be notified for the different pool actions
94 * @param maxConnections the number of maximal active connections, once this is reached new tries to
95 * acquire a {@link Channel} will be delayed until a connection is returned to the
96 * pool again.
97 * @param maxPendingAcquires the maximum number of pending acquires. Once this is exceed acquire tries will
98 * be failed.
99 */
100 public FixedChannelPool(Bootstrap bootstrap,
101 ChannelPoolHandler handler, int maxConnections, int maxPendingAcquires) {
102 this(bootstrap, handler, ChannelHealthChecker.ACTIVE, null, -1, maxConnections, maxPendingAcquires);
103 }
104
105 /**
106 * Creates a new instance.
107 *
108 * @param bootstrap the {@link Bootstrap} that is used for connections
109 * @param handler the {@link ChannelPoolHandler} that will be notified for the different pool actions
110 * @param healthCheck the {@link ChannelHealthChecker} that will be used to check if a {@link Channel} is
111 * still healthy when obtain from the {@link ChannelPool}
112 * @param action the {@link AcquireTimeoutAction} to use or {@code null} if non should be used.
113 * In this case {@param acquireTimeoutMillis} must be {@code -1}.
114 * @param acquireTimeoutMillis the time (in milliseconds) after which an pending acquire must complete or
115 * the {@link AcquireTimeoutAction} takes place.
116 * @param maxConnections the number of maximal active connections, once this is reached new tries to
117 * acquire a {@link Channel} will be delayed until a connection is returned to the
118 * pool again.
119 * @param maxPendingAcquires the maximum number of pending acquires. Once this is exceed acquire tries will
120 * be failed.
121 */
122 public FixedChannelPool(Bootstrap bootstrap,
123 ChannelPoolHandler handler,
124 ChannelHealthChecker healthCheck, AcquireTimeoutAction action,
125 final long acquireTimeoutMillis,
126 int maxConnections, int maxPendingAcquires) {
127 this(bootstrap, handler, healthCheck, action, acquireTimeoutMillis, maxConnections, maxPendingAcquires, true);
128 }
129
130 /**
131 * Creates a new instance.
132 *
133 * @param bootstrap the {@link Bootstrap} that is used for connections
134 * @param handler the {@link ChannelPoolHandler} that will be notified for the different pool actions
135 * @param healthCheck the {@link ChannelHealthChecker} that will be used to check if a {@link Channel} is
136 * still healthy when obtain from the {@link ChannelPool}
137 * @param action the {@link AcquireTimeoutAction} to use or {@code null} if non should be used.
138 * In this case {@param acquireTimeoutMillis} must be {@code -1}.
139 * @param acquireTimeoutMillis the time (in milliseconds) after which an pending acquire must complete or
140 * the {@link AcquireTimeoutAction} takes place.
141 * @param maxConnections the number of maximal active connections, once this is reached new tries to
142 * acquire a {@link Channel} will be delayed until a connection is returned to the
143 * pool again.
144 * @param maxPendingAcquires the maximum number of pending acquires. Once this is exceed acquire tries will
145 * be failed.
146 * @param releaseHealthCheck will check channel health before offering back if this parameter set to
147 * {@code true}.
148 */
149 public FixedChannelPool(Bootstrap bootstrap,
150 ChannelPoolHandler handler,
151 ChannelHealthChecker healthCheck, AcquireTimeoutAction action,
152 final long acquireTimeoutMillis,
153 int maxConnections, int maxPendingAcquires, final boolean releaseHealthCheck) {
154 this(bootstrap, handler, healthCheck, action, acquireTimeoutMillis, maxConnections, maxPendingAcquires,
155 releaseHealthCheck, true);
156 }
157
158 /**
159 * Creates a new instance.
160 *
161 * @param bootstrap the {@link Bootstrap} that is used for connections
162 * @param handler the {@link ChannelPoolHandler} that will be notified for the different pool actions
163 * @param healthCheck the {@link ChannelHealthChecker} that will be used to check if a {@link Channel} is
164 * still healthy when obtain from the {@link ChannelPool}
165 * @param action the {@link AcquireTimeoutAction} to use or {@code null} if non should be used.
166 * In this case {@param acquireTimeoutMillis} must be {@code -1}.
167 * @param acquireTimeoutMillis the time (in milliseconds) after which an pending acquire must complete or
168 * the {@link AcquireTimeoutAction} takes place.
169 * @param maxConnections the number of maximal active connections, once this is reached new tries to
170 * acquire a {@link Channel} will be delayed until a connection is returned to the
171 * pool again.
172 * @param maxPendingAcquires the maximum number of pending acquires. Once this is exceed acquire tries will
173 * be failed.
174 * @param releaseHealthCheck will check channel health before offering back if this parameter set to
175 * {@code true}.
176 * @param lastRecentUsed {@code true} {@link Channel} selection will be LIFO, if {@code false} FIFO.
177 */
178 public FixedChannelPool(Bootstrap bootstrap,
179 ChannelPoolHandler handler,
180 ChannelHealthChecker healthCheck, AcquireTimeoutAction action,
181 final long acquireTimeoutMillis,
182 int maxConnections, int maxPendingAcquires,
183 boolean releaseHealthCheck, boolean lastRecentUsed) {
184 super(bootstrap, handler, healthCheck, releaseHealthCheck, lastRecentUsed);
185 if (maxConnections < 1) {
186 throw new IllegalArgumentException("maxConnections: " + maxConnections + " (expected: >= 1)");
187 }
188 if (maxPendingAcquires < 1) {
189 throw new IllegalArgumentException("maxPendingAcquires: " + maxPendingAcquires + " (expected: >= 1)");
190 }
191 if (action == null && acquireTimeoutMillis == -1) {
192 timeoutTask = null;
193 acquireTimeoutNanos = -1;
194 } else if (action == null && acquireTimeoutMillis != -1) {
195 throw new NullPointerException("action");
196 } else if (action != null && acquireTimeoutMillis < 0) {
197 throw new IllegalArgumentException("acquireTimeoutMillis: " + acquireTimeoutMillis + " (expected: >= 0)");
198 } else {
199 acquireTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(acquireTimeoutMillis);
200 switch (action) {
201 case FAIL:
202 timeoutTask = new TimeoutTask() {
203 @Override
204 public void onTimeout(AcquireTask task) {
205 // Fail the promise as we timed out.
206 task.promise.setFailure(TIMEOUT_EXCEPTION);
207 }
208 };
209 break;
210 case NEW:
211 timeoutTask = new TimeoutTask() {
212 @Override
213 public void onTimeout(AcquireTask task) {
214 // Increment the acquire count and delegate to super to actually acquire a Channel which will
215 // create a new connection.
216 task.acquired();
217
218 FixedChannelPool.super.acquire(task.promise);
219 }
220 };
221 break;
222 default:
223 throw new Error();
224 }
225 }
226 executor = bootstrap.group().next();
227 this.maxConnections = maxConnections;
228 this.maxPendingAcquires = maxPendingAcquires;
229 }
230
231 @Override
232 public Future<Channel> acquire(final Promise<Channel> promise) {
233 try {
234 if (executor.inEventLoop()) {
235 acquire0(promise);
236 } else {
237 executor.execute(new Runnable() {
238 @Override
239 public void run() {
240 acquire0(promise);
241 }
242 });
243 }
244 } catch (Throwable cause) {
245 promise.setFailure(cause);
246 }
247 return promise;
248 }
249
250 private void acquire0(final Promise<Channel> promise) {
251 assert executor.inEventLoop();
252
253 if (closed) {
254 promise.setFailure(POOL_CLOSED_ON_ACQUIRE_EXCEPTION);
255 return;
256 }
257 if (acquiredChannelCount < maxConnections) {
258 assert acquiredChannelCount >= 0;
259
260 // We need to create a new promise as we need to ensure the AcquireListener runs in the correct
261 // EventLoop
262 Promise<Channel> p = executor.newPromise();
263 AcquireListener l = new AcquireListener(promise);
264 l.acquired();
265 p.addListener(l);
266 super.acquire(p);
267 } else {
268 if (pendingAcquireCount >= maxPendingAcquires) {
269 promise.setFailure(FULL_EXCEPTION);
270 } else {
271 AcquireTask task = new AcquireTask(promise);
272 if (pendingAcquireQueue.offer(task)) {
273 ++pendingAcquireCount;
274
275 if (timeoutTask != null) {
276 task.timeoutFuture = executor.schedule(timeoutTask, acquireTimeoutNanos, TimeUnit.NANOSECONDS);
277 }
278 } else {
279 promise.setFailure(FULL_EXCEPTION);
280 }
281 }
282
283 assert pendingAcquireCount > 0;
284 }
285 }
286
287 @Override
288 public Future<Void> release(final Channel channel, final Promise<Void> promise) {
289 ObjectUtil.checkNotNull(promise, "promise");
290 final Promise<Void> p = executor.newPromise();
291 super.release(channel, p.addListener(new FutureListener<Void>() {
292
293 @Override
294 public void operationComplete(Future<Void> future) throws Exception {
295 assert executor.inEventLoop();
296
297 if (closed) {
298 // Since the pool is closed, we have no choice but to close the channel
299 channel.close();
300 promise.setFailure(POOL_CLOSED_ON_RELEASE_EXCEPTION);
301 return;
302 }
303
304 if (future.isSuccess()) {
305 decrementAndRunTaskQueue();
306 promise.setSuccess(null);
307 } else {
308 Throwable cause = future.cause();
309 // Check if the exception was not because of we passed the Channel to the wrong pool.
310 if (!(cause instanceof IllegalArgumentException)) {
311 decrementAndRunTaskQueue();
312 }
313 promise.setFailure(future.cause());
314 }
315 }
316 }));
317 return promise;
318 }
319
320 private void decrementAndRunTaskQueue() {
321 --acquiredChannelCount;
322
323 // We should never have a negative value.
324 assert acquiredChannelCount >= 0;
325
326 // Run the pending acquire tasks before notify the original promise so if the user would
327 // try to acquire again from the ChannelFutureListener and the pendingAcquireCount is >=
328 // maxPendingAcquires we may be able to run some pending tasks first and so allow to add
329 // more.
330 runTaskQueue();
331 }
332
333 private void runTaskQueue() {
334 while (acquiredChannelCount < maxConnections) {
335 AcquireTask task = pendingAcquireQueue.poll();
336 if (task == null) {
337 break;
338 }
339
340 // Cancel the timeout if one was scheduled
341 ScheduledFuture<?> timeoutFuture = task.timeoutFuture;
342 if (timeoutFuture != null) {
343 timeoutFuture.cancel(false);
344 }
345
346 --pendingAcquireCount;
347 task.acquired();
348
349 super.acquire(task.promise);
350 }
351
352 // We should never have a negative value.
353 assert pendingAcquireCount >= 0;
354 assert acquiredChannelCount >= 0;
355 }
356
357 // AcquireTask extends AcquireListener to reduce object creations and so GC pressure
358 private final class AcquireTask extends AcquireListener {
359 final Promise<Channel> promise;
360 final long expireNanoTime = System.nanoTime() + acquireTimeoutNanos;
361 ScheduledFuture<?> timeoutFuture;
362
363 public AcquireTask(Promise<Channel> promise) {
364 super(promise);
365 // We need to create a new promise as we need to ensure the AcquireListener runs in the correct
366 // EventLoop.
367 this.promise = executor.<Channel>newPromise().addListener(this);
368 }
369 }
370
371 private abstract class TimeoutTask implements Runnable {
372 @Override
373 public final void run() {
374 assert executor.inEventLoop();
375 long nanoTime = System.nanoTime();
376 for (;;) {
377 AcquireTask task = pendingAcquireQueue.peek();
378 // Compare nanoTime as descripted in the javadocs of System.nanoTime()
379 //
380 // See https://docs.oracle.com/javase/7/docs/api/java/lang/System.html#nanoTime()
381 // See https://github.com/netty/netty/issues/3705
382 if (task == null || nanoTime - task.expireNanoTime < 0) {
383 break;
384 }
385 pendingAcquireQueue.remove();
386
387 --pendingAcquireCount;
388 onTimeout(task);
389 }
390 }
391
392 public abstract void onTimeout(AcquireTask task);
393 }
394
395 private class AcquireListener implements FutureListener<Channel> {
396 private final Promise<Channel> originalPromise;
397 protected boolean acquired;
398
399 AcquireListener(Promise<Channel> originalPromise) {
400 this.originalPromise = originalPromise;
401 }
402
403 @Override
404 public void operationComplete(Future<Channel> future) throws Exception {
405 assert executor.inEventLoop();
406
407 if (closed) {
408 if (future.isSuccess()) {
409 // Since the pool is closed, we have no choice but to close the channel
410 future.getNow().close();
411 }
412 originalPromise.setFailure(POOL_CLOSED_ON_ACQUIRE_EXCEPTION);
413 return;
414 }
415
416 if (future.isSuccess()) {
417 originalPromise.setSuccess(future.getNow());
418 } else {
419 if (acquired) {
420 decrementAndRunTaskQueue();
421 } else {
422 runTaskQueue();
423 }
424
425 originalPromise.setFailure(future.cause());
426 }
427 }
428
429 public void acquired() {
430 if (acquired) {
431 return;
432 }
433 acquiredChannelCount++;
434 acquired = true;
435 }
436 }
437
438 @Override
439 public void close() {
440 executor.execute(new Runnable() {
441 @Override
442 public void run() {
443 if (!closed) {
444 closed = true;
445 for (;;) {
446 AcquireTask task = pendingAcquireQueue.poll();
447 if (task == null) {
448 break;
449 }
450 ScheduledFuture<?> f = task.timeoutFuture;
451 if (f != null) {
452 f.cancel(false);
453 }
454 task.promise.setFailure(new ClosedChannelException());
455 }
456 acquiredChannelCount = 0;
457 pendingAcquireCount = 0;
458 FixedChannelPool.super.close();
459 }
460 }
461 });
462 }
463 }