1 /*
2 * Copyright 2015 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License,
5 * version 2.0 (the "License"); you may not use this file except in compliance
6 * with the License. You may obtain a copy of the License at:
7 *
8 * https://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations
14 * under the License.
15 */
16 package io.netty.channel.pool;
17
18 import io.netty.bootstrap.Bootstrap;
19 import io.netty.channel.Channel;
20 import io.netty.channel.ChannelFuture;
21 import io.netty.channel.ChannelFutureListener;
22 import io.netty.channel.ChannelInitializer;
23 import io.netty.channel.EventLoop;
24 import io.netty.util.AttributeKey;
25 import io.netty.util.concurrent.Future;
26 import io.netty.util.concurrent.FutureListener;
27 import io.netty.util.concurrent.GlobalEventExecutor;
28 import io.netty.util.concurrent.Promise;
29 import io.netty.util.internal.PlatformDependent;
30
31 import java.util.Deque;
32 import java.util.concurrent.Callable;
33
34 import static io.netty.util.internal.ObjectUtil.*;
35
36 /**
37 * Simple {@link ChannelPool} implementation which will create new {@link Channel}s if someone tries to acquire
38 * a {@link Channel} but none is in the pool atm. No limit on the maximal concurrent {@link Channel}s is enforced.
39 *
40 * This implementation uses LIFO order for {@link Channel}s in the {@link ChannelPool}.
41 *
42 */
43 public class SimpleChannelPool implements ChannelPool {
44 private static final AttributeKey<SimpleChannelPool> POOL_KEY =
45 AttributeKey.newInstance("io.netty.channel.pool.SimpleChannelPool");
46 private final Deque<Channel> deque = PlatformDependent.newConcurrentDeque();
47 private final ChannelPoolHandler handler;
48 private final ChannelHealthChecker healthCheck;
49 private final Bootstrap bootstrap;
50 private final boolean releaseHealthCheck;
51 private final boolean lastRecentUsed;
52
53 /**
54 * Creates a new instance using the {@link ChannelHealthChecker#ACTIVE}.
55 *
56 * @param bootstrap the {@link Bootstrap} that is used for connections
57 * @param handler the {@link ChannelPoolHandler} that will be notified for the different pool actions
58 */
59 public SimpleChannelPool(Bootstrap bootstrap, final ChannelPoolHandler handler) {
60 this(bootstrap, handler, ChannelHealthChecker.ACTIVE);
61 }
62
63 /**
64 * Creates a new instance.
65 *
66 * @param bootstrap the {@link Bootstrap} that is used for connections
67 * @param handler the {@link ChannelPoolHandler} that will be notified for the different pool actions
68 * @param healthCheck the {@link ChannelHealthChecker} that will be used to check if a {@link Channel} is
69 * still healthy when obtain from the {@link ChannelPool}
70 */
71 public SimpleChannelPool(Bootstrap bootstrap, final ChannelPoolHandler handler, ChannelHealthChecker healthCheck) {
72 this(bootstrap, handler, healthCheck, true);
73 }
74
75 /**
76 * Creates a new instance.
77 *
78 * @param bootstrap the {@link Bootstrap} that is used for connections
79 * @param handler the {@link ChannelPoolHandler} that will be notified for the different pool actions
80 * @param healthCheck the {@link ChannelHealthChecker} that will be used to check if a {@link Channel} is
81 * still healthy when obtain from the {@link ChannelPool}
82 * @param releaseHealthCheck will check channel health before offering back if this parameter set to {@code true};
83 * otherwise, channel health is only checked at acquisition time
84 */
85 public SimpleChannelPool(Bootstrap bootstrap, final ChannelPoolHandler handler, ChannelHealthChecker healthCheck,
86 boolean releaseHealthCheck) {
87 this(bootstrap, handler, healthCheck, releaseHealthCheck, true);
88 }
89
90 /**
91 * Creates a new instance.
92 *
93 * @param bootstrap the {@link Bootstrap} that is used for connections
94 * @param handler the {@link ChannelPoolHandler} that will be notified for the different pool actions
95 * @param healthCheck the {@link ChannelHealthChecker} that will be used to check if a {@link Channel} is
96 * still healthy when obtain from the {@link ChannelPool}
97 * @param releaseHealthCheck will check channel health before offering back if this parameter set to {@code true};
98 * otherwise, channel health is only checked at acquisition time
99 * @param lastRecentUsed {@code true} {@link Channel} selection will be LIFO, if {@code false} FIFO.
100 */
101 public SimpleChannelPool(Bootstrap bootstrap, final ChannelPoolHandler handler, ChannelHealthChecker healthCheck,
102 boolean releaseHealthCheck, boolean lastRecentUsed) {
103 this.handler = checkNotNull(handler, "handler");
104 this.healthCheck = checkNotNull(healthCheck, "healthCheck");
105 this.releaseHealthCheck = releaseHealthCheck;
106 // Clone the original Bootstrap as we want to set our own handler
107 this.bootstrap = checkNotNull(bootstrap, "bootstrap").clone();
108 this.bootstrap.handler(new ChannelInitializer<Channel>() {
109 @Override
110 protected void initChannel(Channel ch) throws Exception {
111 assert ch.eventLoop().inEventLoop();
112 handler.channelCreated(ch);
113 }
114 });
115 this.lastRecentUsed = lastRecentUsed;
116 }
117
118 /**
119 * Returns the {@link Bootstrap} this pool will use to open new connections.
120 *
121 * @return the {@link Bootstrap} this pool will use to open new connections
122 */
123 protected Bootstrap bootstrap() {
124 return bootstrap;
125 }
126
127 /**
128 * Returns the {@link ChannelPoolHandler} that will be notified for the different pool actions.
129 *
130 * @return the {@link ChannelPoolHandler} that will be notified for the different pool actions
131 */
132 protected ChannelPoolHandler handler() {
133 return handler;
134 }
135
136 /**
137 * Returns the {@link ChannelHealthChecker} that will be used to check if a {@link Channel} is healthy.
138 *
139 * @return the {@link ChannelHealthChecker} that will be used to check if a {@link Channel} is healthy
140 */
141 protected ChannelHealthChecker healthChecker() {
142 return healthCheck;
143 }
144
145 /**
146 * Indicates whether this pool will check the health of channels before offering them back into the pool.
147 *
148 * @return {@code true} if this pool will check the health of channels before offering them back into the pool, or
149 * {@code false} if channel health is only checked at acquisition time
150 */
151 protected boolean releaseHealthCheck() {
152 return releaseHealthCheck;
153 }
154
155 @Override
156 public final Future<Channel> acquire() {
157 return acquire(bootstrap.config().group().next().<Channel>newPromise());
158 }
159
160 @Override
161 public Future<Channel> acquire(final Promise<Channel> promise) {
162 return acquireHealthyFromPoolOrNew(checkNotNull(promise, "promise"));
163 }
164
165 /**
166 * Tries to retrieve healthy channel from the pool if any or creates a new channel otherwise.
167 * @param promise the promise to provide acquire result.
168 * @return future for acquiring a channel.
169 */
170 private Future<Channel> acquireHealthyFromPoolOrNew(final Promise<Channel> promise) {
171 try {
172 final Channel ch = pollChannel();
173 if (ch == null) {
174 // No Channel left in the pool bootstrap a new Channel
175 Bootstrap bs = bootstrap.clone();
176 bs.attr(POOL_KEY, this);
177 ChannelFuture f = connectChannel(bs);
178 if (f.isDone()) {
179 notifyConnect(f, promise);
180 } else {
181 f.addListener(new ChannelFutureListener() {
182 @Override
183 public void operationComplete(ChannelFuture future) throws Exception {
184 notifyConnect(future, promise);
185 }
186 });
187 }
188 } else {
189 EventLoop loop = ch.eventLoop();
190 if (loop.inEventLoop()) {
191 doHealthCheck(ch, promise);
192 } else {
193 loop.execute(new Runnable() {
194 @Override
195 public void run() {
196 doHealthCheck(ch, promise);
197 }
198 });
199 }
200 }
201 } catch (Throwable cause) {
202 promise.tryFailure(cause);
203 }
204 return promise;
205 }
206
207 private void notifyConnect(ChannelFuture future, Promise<Channel> promise) {
208 Channel channel = null;
209 try {
210 if (future.isSuccess()) {
211 channel = future.channel();
212 handler.channelAcquired(channel);
213 if (!promise.trySuccess(channel)) {
214 // Promise was completed in the meantime (like cancelled), just release the channel again
215 release(channel);
216 }
217 } else {
218 promise.tryFailure(future.cause());
219 }
220 } catch (Throwable cause) {
221 closeAndFail(channel, cause, promise);
222 }
223 }
224
225 private void doHealthCheck(final Channel channel, final Promise<Channel> promise) {
226 try {
227 assert channel.eventLoop().inEventLoop();
228 Future<Boolean> f = healthCheck.isHealthy(channel);
229 if (f.isDone()) {
230 notifyHealthCheck(f, channel, promise);
231 } else {
232 f.addListener(new FutureListener<Boolean>() {
233 @Override
234 public void operationComplete(Future<Boolean> future) {
235 notifyHealthCheck(future, channel, promise);
236 }
237 });
238 }
239 } catch (Throwable cause) {
240 closeAndFail(channel, cause, promise);
241 }
242 }
243
244 private void notifyHealthCheck(Future<Boolean> future, Channel channel, Promise<Channel> promise) {
245 try {
246 assert channel.eventLoop().inEventLoop();
247 if (future.isSuccess() && future.getNow()) {
248 channel.attr(POOL_KEY).set(this);
249 handler.channelAcquired(channel);
250 promise.setSuccess(channel);
251 } else {
252 closeChannel(channel);
253 acquireHealthyFromPoolOrNew(promise);
254 }
255 } catch (Throwable cause) {
256 closeAndFail(channel, cause, promise);
257 }
258 }
259
260 /**
261 * Bootstrap a new {@link Channel}. The default implementation uses {@link Bootstrap#connect()}, sub-classes may
262 * override this.
263 * <p>
264 * The {@link Bootstrap} that is passed in here is cloned via {@link Bootstrap#clone()}, so it is safe to modify.
265 */
266 protected ChannelFuture connectChannel(Bootstrap bs) {
267 return bs.connect();
268 }
269
270 @Override
271 public final Future<Void> release(Channel channel) {
272 return release(channel, channel.eventLoop().<Void>newPromise());
273 }
274
275 @Override
276 public Future<Void> release(final Channel channel, final Promise<Void> promise) {
277 try {
278 checkNotNull(channel, "channel");
279 checkNotNull(promise, "promise");
280 EventLoop loop = channel.eventLoop();
281 if (loop.inEventLoop()) {
282 doReleaseChannel(channel, promise);
283 } else {
284 loop.execute(new Runnable() {
285 @Override
286 public void run() {
287 doReleaseChannel(channel, promise);
288 }
289 });
290 }
291 } catch (Throwable cause) {
292 closeAndFail(channel, cause, promise);
293 }
294 return promise;
295 }
296
297 private void doReleaseChannel(Channel channel, Promise<Void> promise) {
298 try {
299 assert channel.eventLoop().inEventLoop();
300 // Remove the POOL_KEY attribute from the Channel and check if it was acquired from this pool, if not fail.
301 if (channel.attr(POOL_KEY).getAndSet(null) != this) {
302 closeAndFail(channel,
303 // Better include a stacktrace here as this is an user error.
304 new IllegalArgumentException(
305 "Channel " + channel + " was not acquired from this ChannelPool"),
306 promise);
307 } else {
308 if (releaseHealthCheck) {
309 doHealthCheckOnRelease(channel, promise);
310 } else {
311 releaseAndOffer(channel, promise);
312 }
313 }
314 } catch (Throwable cause) {
315 closeAndFail(channel, cause, promise);
316 }
317 }
318
319 private void doHealthCheckOnRelease(final Channel channel, final Promise<Void> promise) throws Exception {
320 final Future<Boolean> f = healthCheck.isHealthy(channel);
321 if (f.isDone()) {
322 releaseAndOfferIfHealthy(channel, promise, f);
323 } else {
324 f.addListener(new FutureListener<Boolean>() {
325 @Override
326 public void operationComplete(Future<Boolean> future) throws Exception {
327 releaseAndOfferIfHealthy(channel, promise, f);
328 }
329 });
330 }
331 }
332
333 /**
334 * Adds the channel back to the pool only if the channel is healthy.
335 * @param channel the channel to put back to the pool
336 * @param promise offer operation promise.
337 * @param future the future that contains information fif channel is healthy or not.
338 * @throws Exception in case when failed to notify handler about release operation.
339 */
340 private void releaseAndOfferIfHealthy(Channel channel, Promise<Void> promise, Future<Boolean> future) {
341 try {
342 if (future.getNow()) { //channel turns out to be healthy, offering and releasing it.
343 releaseAndOffer(channel, promise);
344 } else { //channel not healthy, just releasing it.
345 handler.channelReleased(channel);
346 promise.setSuccess(null);
347 }
348 } catch (Throwable cause) {
349 closeAndFail(channel, cause, promise);
350 }
351 }
352
353 private void releaseAndOffer(Channel channel, Promise<Void> promise) throws Exception {
354 if (offerChannel(channel)) {
355 handler.channelReleased(channel);
356 promise.setSuccess(null);
357 } else {
358 closeAndFail(channel, new ChannelPoolFullException(), promise);
359 }
360 }
361
362 private void closeChannel(Channel channel) throws Exception {
363 channel.attr(POOL_KEY).getAndSet(null);
364 channel.close();
365 }
366
367 private void closeAndFail(Channel channel, Throwable cause, Promise<?> promise) {
368 if (channel != null) {
369 try {
370 closeChannel(channel);
371 } catch (Throwable t) {
372 promise.tryFailure(t);
373 }
374 }
375 promise.tryFailure(cause);
376 }
377
378 /**
379 * Poll a {@link Channel} out of the internal storage to reuse it. This will return {@code null} if no
380 * {@link Channel} is ready to be reused.
381 *
382 * Sub-classes may override {@link #pollChannel()} and {@link #offerChannel(Channel)}. Be aware that
383 * implementations of these methods needs to be thread-safe!
384 */
385 protected Channel pollChannel() {
386 return lastRecentUsed ? deque.pollLast() : deque.pollFirst();
387 }
388
389 /**
390 * Offer a {@link Channel} back to the internal storage. This will return {@code true} if the {@link Channel}
391 * could be added, {@code false} otherwise.
392 *
393 * Sub-classes may override {@link #pollChannel()} and {@link #offerChannel(Channel)}. Be aware that
394 * implementations of these methods needs to be thread-safe!
395 */
396 protected boolean offerChannel(Channel channel) {
397 return deque.offer(channel);
398 }
399
400 @Override
401 public void close() {
402 for (;;) {
403 Channel channel = pollChannel();
404 if (channel == null) {
405 break;
406 }
407 // Just ignore any errors that are reported back from close().
408 channel.close().awaitUninterruptibly();
409 }
410 }
411
412 /**
413 * Closes the pool in an async manner.
414 *
415 * @return Future which represents completion of the close task
416 */
417 public Future<Void> closeAsync() {
418 // Execute close asynchronously in case this is being invoked on an eventloop to avoid blocking
419 return GlobalEventExecutor.INSTANCE.submit(new Callable<Void>() {
420 @Override
421 public Void call() throws Exception {
422 close();
423 return null;
424 }
425 });
426 }
427
428 private static final class ChannelPoolFullException extends IllegalStateException {
429
430 private ChannelPoolFullException() {
431 super("ChannelPool full");
432 }
433
434 // Suppress a warning since the method doesn't need synchronization
435 @Override
436 public Throwable fillInStackTrace() {
437 return this;
438 }
439 }
440 }