1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.pool;
17
18 import io.netty.bootstrap.Bootstrap;
19 import io.netty.channel.Channel;
20 import io.netty.util.concurrent.EventExecutor;
21 import io.netty.util.concurrent.Future;
22 import io.netty.util.concurrent.FutureListener;
23 import io.netty.util.concurrent.Promise;
24 import io.netty.util.internal.ObjectUtil;
25 import io.netty.util.internal.ThrowableUtil;
26
27 import java.nio.channels.ClosedChannelException;
28 import java.util.ArrayDeque;
29 import java.util.Queue;
30 import java.util.concurrent.ScheduledFuture;
31 import java.util.concurrent.TimeUnit;
32 import java.util.concurrent.TimeoutException;
33
34
35
36
37
38 public class FixedChannelPool extends SimpleChannelPool {
39 private static final IllegalStateException FULL_EXCEPTION = ThrowableUtil.unknownStackTrace(
40 new IllegalStateException("Too many outstanding acquire operations"),
41 FixedChannelPool.class, "acquire0(...)");
42 private static final TimeoutException TIMEOUT_EXCEPTION = ThrowableUtil.unknownStackTrace(
43 new TimeoutException("Acquire operation took longer then configured maximum time"),
44 FixedChannelPool.class, "<init>(...)");
45 static final IllegalStateException POOL_CLOSED_ON_RELEASE_EXCEPTION = ThrowableUtil.unknownStackTrace(
46 new IllegalStateException("FixedChannelPooled was closed"),
47 FixedChannelPool.class, "release(...)");
48 static final IllegalStateException POOL_CLOSED_ON_ACQUIRE_EXCEPTION = ThrowableUtil.unknownStackTrace(
49 new IllegalStateException("FixedChannelPooled was closed"),
50 FixedChannelPool.class, "acquire0(...)");
51 public enum AcquireTimeoutAction {
52
53
54
55 NEW,
56
57
58
59
60 FAIL
61 }
62
63 private final EventExecutor executor;
64 private final long acquireTimeoutNanos;
65 private final Runnable timeoutTask;
66
67
68
69 private final Queue<AcquireTask> pendingAcquireQueue = new ArrayDeque<AcquireTask>();
70 private final int maxConnections;
71 private final int maxPendingAcquires;
72 private int acquiredChannelCount;
73 private int pendingAcquireCount;
74 private boolean closed;
75
76
77
78
79
80
81
82
83
84 public FixedChannelPool(Bootstrap bootstrap,
85 ChannelPoolHandler handler, int maxConnections) {
86 this(bootstrap, handler, maxConnections, Integer.MAX_VALUE);
87 }
88
89
90
91
92
93
94
95
96
97
98
99
100 public FixedChannelPool(Bootstrap bootstrap,
101 ChannelPoolHandler handler, int maxConnections, int maxPendingAcquires) {
102 this(bootstrap, handler, ChannelHealthChecker.ACTIVE, null, -1, maxConnections, maxPendingAcquires);
103 }
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122 public FixedChannelPool(Bootstrap bootstrap,
123 ChannelPoolHandler handler,
124 ChannelHealthChecker healthCheck, AcquireTimeoutAction action,
125 final long acquireTimeoutMillis,
126 int maxConnections, int maxPendingAcquires) {
127 this(bootstrap, handler, healthCheck, action, acquireTimeoutMillis, maxConnections, maxPendingAcquires, true);
128 }
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149 public FixedChannelPool(Bootstrap bootstrap,
150 ChannelPoolHandler handler,
151 ChannelHealthChecker healthCheck, AcquireTimeoutAction action,
152 final long acquireTimeoutMillis,
153 int maxConnections, int maxPendingAcquires, final boolean releaseHealthCheck) {
154 this(bootstrap, handler, healthCheck, action, acquireTimeoutMillis, maxConnections, maxPendingAcquires,
155 releaseHealthCheck, true);
156 }
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178 public FixedChannelPool(Bootstrap bootstrap,
179 ChannelPoolHandler handler,
180 ChannelHealthChecker healthCheck, AcquireTimeoutAction action,
181 final long acquireTimeoutMillis,
182 int maxConnections, int maxPendingAcquires,
183 boolean releaseHealthCheck, boolean lastRecentUsed) {
184 super(bootstrap, handler, healthCheck, releaseHealthCheck, lastRecentUsed);
185 if (maxConnections < 1) {
186 throw new IllegalArgumentException("maxConnections: " + maxConnections + " (expected: >= 1)");
187 }
188 if (maxPendingAcquires < 1) {
189 throw new IllegalArgumentException("maxPendingAcquires: " + maxPendingAcquires + " (expected: >= 1)");
190 }
191 if (action == null && acquireTimeoutMillis == -1) {
192 timeoutTask = null;
193 acquireTimeoutNanos = -1;
194 } else if (action == null && acquireTimeoutMillis != -1) {
195 throw new NullPointerException("action");
196 } else if (action != null && acquireTimeoutMillis < 0) {
197 throw new IllegalArgumentException("acquireTimeoutMillis: " + acquireTimeoutMillis + " (expected: >= 0)");
198 } else {
199 acquireTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(acquireTimeoutMillis);
200 switch (action) {
201 case FAIL:
202 timeoutTask = new TimeoutTask() {
203 @Override
204 public void onTimeout(AcquireTask task) {
205
206 task.promise.setFailure(TIMEOUT_EXCEPTION);
207 }
208 };
209 break;
210 case NEW:
211 timeoutTask = new TimeoutTask() {
212 @Override
213 public void onTimeout(AcquireTask task) {
214
215
216 task.acquired();
217
218 FixedChannelPool.super.acquire(task.promise);
219 }
220 };
221 break;
222 default:
223 throw new Error();
224 }
225 }
226 executor = bootstrap.group().next();
227 this.maxConnections = maxConnections;
228 this.maxPendingAcquires = maxPendingAcquires;
229 }
230
231 @Override
232 public Future<Channel> acquire(final Promise<Channel> promise) {
233 try {
234 if (executor.inEventLoop()) {
235 acquire0(promise);
236 } else {
237 executor.execute(new Runnable() {
238 @Override
239 public void run() {
240 acquire0(promise);
241 }
242 });
243 }
244 } catch (Throwable cause) {
245 promise.setFailure(cause);
246 }
247 return promise;
248 }
249
250 private void acquire0(final Promise<Channel> promise) {
251 assert executor.inEventLoop();
252
253 if (closed) {
254 promise.setFailure(POOL_CLOSED_ON_ACQUIRE_EXCEPTION);
255 return;
256 }
257 if (acquiredChannelCount < maxConnections) {
258 assert acquiredChannelCount >= 0;
259
260
261
262 Promise<Channel> p = executor.newPromise();
263 AcquireListener l = new AcquireListener(promise);
264 l.acquired();
265 p.addListener(l);
266 super.acquire(p);
267 } else {
268 if (pendingAcquireCount >= maxPendingAcquires) {
269 promise.setFailure(FULL_EXCEPTION);
270 } else {
271 AcquireTask task = new AcquireTask(promise);
272 if (pendingAcquireQueue.offer(task)) {
273 ++pendingAcquireCount;
274
275 if (timeoutTask != null) {
276 task.timeoutFuture = executor.schedule(timeoutTask, acquireTimeoutNanos, TimeUnit.NANOSECONDS);
277 }
278 } else {
279 promise.setFailure(FULL_EXCEPTION);
280 }
281 }
282
283 assert pendingAcquireCount > 0;
284 }
285 }
286
287 @Override
288 public Future<Void> release(final Channel channel, final Promise<Void> promise) {
289 ObjectUtil.checkNotNull(promise, "promise");
290 final Promise<Void> p = executor.newPromise();
291 super.release(channel, p.addListener(new FutureListener<Void>() {
292
293 @Override
294 public void operationComplete(Future<Void> future) throws Exception {
295 assert executor.inEventLoop();
296
297 if (closed) {
298
299 channel.close();
300 promise.setFailure(POOL_CLOSED_ON_RELEASE_EXCEPTION);
301 return;
302 }
303
304 if (future.isSuccess()) {
305 decrementAndRunTaskQueue();
306 promise.setSuccess(null);
307 } else {
308 Throwable cause = future.cause();
309
310 if (!(cause instanceof IllegalArgumentException)) {
311 decrementAndRunTaskQueue();
312 }
313 promise.setFailure(future.cause());
314 }
315 }
316 }));
317 return promise;
318 }
319
320 private void decrementAndRunTaskQueue() {
321 --acquiredChannelCount;
322
323
324 assert acquiredChannelCount >= 0;
325
326
327
328
329
330 runTaskQueue();
331 }
332
333 private void runTaskQueue() {
334 while (acquiredChannelCount < maxConnections) {
335 AcquireTask task = pendingAcquireQueue.poll();
336 if (task == null) {
337 break;
338 }
339
340
341 ScheduledFuture<?> timeoutFuture = task.timeoutFuture;
342 if (timeoutFuture != null) {
343 timeoutFuture.cancel(false);
344 }
345
346 --pendingAcquireCount;
347 task.acquired();
348
349 super.acquire(task.promise);
350 }
351
352
353 assert pendingAcquireCount >= 0;
354 assert acquiredChannelCount >= 0;
355 }
356
357
358 private final class AcquireTask extends AcquireListener {
359 final Promise<Channel> promise;
360 final long expireNanoTime = System.nanoTime() + acquireTimeoutNanos;
361 ScheduledFuture<?> timeoutFuture;
362
363 public AcquireTask(Promise<Channel> promise) {
364 super(promise);
365
366
367 this.promise = executor.<Channel>newPromise().addListener(this);
368 }
369 }
370
371 private abstract class TimeoutTask implements Runnable {
372 @Override
373 public final void run() {
374 assert executor.inEventLoop();
375 long nanoTime = System.nanoTime();
376 for (;;) {
377 AcquireTask task = pendingAcquireQueue.peek();
378
379
380
381
382 if (task == null || nanoTime - task.expireNanoTime < 0) {
383 break;
384 }
385 pendingAcquireQueue.remove();
386
387 --pendingAcquireCount;
388 onTimeout(task);
389 }
390 }
391
392 public abstract void onTimeout(AcquireTask task);
393 }
394
395 private class AcquireListener implements FutureListener<Channel> {
396 private final Promise<Channel> originalPromise;
397 protected boolean acquired;
398
399 AcquireListener(Promise<Channel> originalPromise) {
400 this.originalPromise = originalPromise;
401 }
402
403 @Override
404 public void operationComplete(Future<Channel> future) throws Exception {
405 assert executor.inEventLoop();
406
407 if (closed) {
408 if (future.isSuccess()) {
409
410 future.getNow().close();
411 }
412 originalPromise.setFailure(POOL_CLOSED_ON_ACQUIRE_EXCEPTION);
413 return;
414 }
415
416 if (future.isSuccess()) {
417 originalPromise.setSuccess(future.getNow());
418 } else {
419 if (acquired) {
420 decrementAndRunTaskQueue();
421 } else {
422 runTaskQueue();
423 }
424
425 originalPromise.setFailure(future.cause());
426 }
427 }
428
429 public void acquired() {
430 if (acquired) {
431 return;
432 }
433 acquiredChannelCount++;
434 acquired = true;
435 }
436 }
437
438 @Override
439 public void close() {
440 executor.execute(new Runnable() {
441 @Override
442 public void run() {
443 if (!closed) {
444 closed = true;
445 for (;;) {
446 AcquireTask task = pendingAcquireQueue.poll();
447 if (task == null) {
448 break;
449 }
450 ScheduledFuture<?> f = task.timeoutFuture;
451 if (f != null) {
452 f.cancel(false);
453 }
454 task.promise.setFailure(new ClosedChannelException());
455 }
456 acquiredChannelCount = 0;
457 pendingAcquireCount = 0;
458 FixedChannelPool.super.close();
459 }
460 }
461 });
462 }
463 }