1 /*
2 * Copyright 2015 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License,
5 * version 2.0 (the "License"); you may not use this file except in compliance
6 * with the License. You may obtain a copy of the License at:
7 *
8 * https://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations
14 * under the License.
15 */
16 package io.netty.channel.pool;
17
18 import io.netty.bootstrap.Bootstrap;
19 import io.netty.channel.Channel;
20 import io.netty.channel.ChannelFuture;
21 import io.netty.channel.ChannelFutureListener;
22 import io.netty.channel.ChannelInitializer;
23 import io.netty.channel.EventLoop;
24 import io.netty.util.AttributeKey;
25 import io.netty.util.concurrent.Future;
26 import io.netty.util.concurrent.FutureListener;
27 import io.netty.util.concurrent.GlobalEventExecutor;
28 import io.netty.util.concurrent.Promise;
29 import io.netty.util.internal.PlatformDependent;
30
31 import java.util.Deque;
32 import java.util.concurrent.Callable;
33
34 import static io.netty.util.internal.ObjectUtil.*;
35
36 /**
37 * Simple {@link ChannelPool} implementation which will create new {@link Channel}s if someone tries to acquire
38 * a {@link Channel} but none is in the pool atm. No limit on the maximal concurrent {@link Channel}s is enforced.
39 *
40 * This implementation uses LIFO order for {@link Channel}s in the {@link ChannelPool}.
41 *
42 */
43 public class SimpleChannelPool implements ChannelPool {
44 private static final AttributeKey<SimpleChannelPool> POOL_KEY =
45 AttributeKey.newInstance("io.netty.channel.pool.SimpleChannelPool");
46 private final Deque<Channel> deque = PlatformDependent.newConcurrentDeque();
47 private final ChannelPoolHandler handler;
48 private final ChannelHealthChecker healthCheck;
49 private final Bootstrap bootstrap;
50 private final boolean releaseHealthCheck;
51 private final boolean lastRecentUsed;
52
53 /**
54 * Creates a new instance using the {@link ChannelHealthChecker#ACTIVE}.
55 *
56 * @param bootstrap the {@link Bootstrap} that is used for connections
57 * @param handler the {@link ChannelPoolHandler} that will be notified for the different pool actions
58 */
59 public SimpleChannelPool(Bootstrap bootstrap, final ChannelPoolHandler handler) {
60 this(bootstrap, handler, ChannelHealthChecker.ACTIVE);
61 }
62
63 /**
64 * Creates a new instance.
65 *
66 * @param bootstrap the {@link Bootstrap} that is used for connections
67 * @param handler the {@link ChannelPoolHandler} that will be notified for the different pool actions
68 * @param healthCheck the {@link ChannelHealthChecker} that will be used to check if a {@link Channel} is
69 * still healthy when obtain from the {@link ChannelPool}
70 */
71 public SimpleChannelPool(Bootstrap bootstrap, final ChannelPoolHandler handler, ChannelHealthChecker healthCheck) {
72 this(bootstrap, handler, healthCheck, true);
73 }
74
75 /**
76 * Creates a new instance.
77 *
78 * @param bootstrap the {@link Bootstrap} that is used for connections
79 * @param handler the {@link ChannelPoolHandler} that will be notified for the different pool actions
80 * @param healthCheck the {@link ChannelHealthChecker} that will be used to check if a {@link Channel} is
81 * still healthy when obtain from the {@link ChannelPool}
82 * @param releaseHealthCheck will check channel health before offering back if this parameter set to {@code true};
83 * otherwise, channel health is only checked at acquisition time
84 */
85 public SimpleChannelPool(Bootstrap bootstrap, final ChannelPoolHandler handler, ChannelHealthChecker healthCheck,
86 boolean releaseHealthCheck) {
87 this(bootstrap, handler, healthCheck, releaseHealthCheck, true);
88 }
89
90 /**
91 * Creates a new instance.
92 *
93 * @param bootstrap the {@link Bootstrap} that is used for connections
94 * @param handler the {@link ChannelPoolHandler} that will be notified for the different pool actions
95 * @param healthCheck the {@link ChannelHealthChecker} that will be used to check if a {@link Channel} is
96 * still healthy when obtain from the {@link ChannelPool}
97 * @param releaseHealthCheck will check channel health before offering back if this parameter set to {@code true};
98 * otherwise, channel health is only checked at acquisition time
99 * @param lastRecentUsed {@code true} {@link Channel} selection will be LIFO, if {@code false} FIFO.
100 */
101 public SimpleChannelPool(Bootstrap bootstrap, final ChannelPoolHandler handler, ChannelHealthChecker healthCheck,
102 boolean releaseHealthCheck, boolean lastRecentUsed) {
103 this.handler = checkNotNull(handler, "handler");
104 this.healthCheck = checkNotNull(healthCheck, "healthCheck");
105 this.releaseHealthCheck = releaseHealthCheck;
106 // Clone the original Bootstrap as we want to set our own handler
107 this.bootstrap = checkNotNull(bootstrap, "bootstrap").clone();
108 this.bootstrap.handler(new ChannelInitializer<Channel>() {
109 @Override
110 protected void initChannel(Channel ch) throws Exception {
111 assert ch.eventLoop().inEventLoop();
112 handler.channelCreated(ch);
113 }
114 });
115 this.lastRecentUsed = lastRecentUsed;
116 }
117
118 /**
119 * Returns the {@link Bootstrap} this pool will use to open new connections.
120 *
121 * @return the {@link Bootstrap} this pool will use to open new connections
122 */
123 protected Bootstrap bootstrap() {
124 return bootstrap;
125 }
126
127 /**
128 * Returns the {@link ChannelPoolHandler} that will be notified for the different pool actions.
129 *
130 * @return the {@link ChannelPoolHandler} that will be notified for the different pool actions
131 */
132 protected ChannelPoolHandler handler() {
133 return handler;
134 }
135
136 /**
137 * Returns the {@link ChannelHealthChecker} that will be used to check if a {@link Channel} is healthy.
138 *
139 * @return the {@link ChannelHealthChecker} that will be used to check if a {@link Channel} is healthy
140 */
141 protected ChannelHealthChecker healthChecker() {
142 return healthCheck;
143 }
144
145 /**
146 * Indicates whether this pool will check the health of channels before offering them back into the pool.
147 *
148 * @return {@code true} if this pool will check the health of channels before offering them back into the pool, or
149 * {@code false} if channel health is only checked at acquisition time
150 */
151 protected boolean releaseHealthCheck() {
152 return releaseHealthCheck;
153 }
154
155 @Override
156 public final Future<Channel> acquire() {
157 return acquire(bootstrap.config().group().next().<Channel>newPromise());
158 }
159
160 @Override
161 public Future<Channel> acquire(final Promise<Channel> promise) {
162 return acquireHealthyFromPoolOrNew(checkNotNull(promise, "promise"));
163 }
164
165 /**
166 * Tries to retrieve healthy channel from the pool if any or creates a new channel otherwise.
167 * @param promise the promise to provide acquire result.
168 * @return future for acquiring a channel.
169 */
170 private Future<Channel> acquireHealthyFromPoolOrNew(final Promise<Channel> promise) {
171 try {
172 final Channel ch = pollChannel();
173 if (ch == null) {
174 // No Channel left in the pool bootstrap a new Channel
175 Bootstrap bs = bootstrap.clone();
176 bs.attr(POOL_KEY, this);
177 ChannelFuture f = connectChannel(bs);
178 if (f.isDone()) {
179 notifyConnect(f, promise);
180 } else {
181 f.addListener((ChannelFutureListener) future -> notifyConnect(future, promise));
182 }
183 } else {
184 EventLoop loop = ch.eventLoop();
185 if (loop.inEventLoop()) {
186 doHealthCheck(ch, promise);
187 } else {
188 loop.execute(new Runnable() {
189 @Override
190 public void run() {
191 doHealthCheck(ch, promise);
192 }
193 });
194 }
195 }
196 } catch (Throwable cause) {
197 promise.tryFailure(cause);
198 }
199 return promise;
200 }
201
202 private void notifyConnect(ChannelFuture future, Promise<Channel> promise) {
203 Channel channel = null;
204 try {
205 if (future.isSuccess()) {
206 channel = future.channel();
207 handler.channelAcquired(channel);
208 if (!promise.trySuccess(channel)) {
209 // Promise was completed in the meantime (like cancelled), just release the channel again
210 release(channel);
211 }
212 } else {
213 promise.tryFailure(future.cause());
214 }
215 } catch (Throwable cause) {
216 closeAndFail(channel, cause, promise);
217 }
218 }
219
220 private void doHealthCheck(final Channel channel, final Promise<Channel> promise) {
221 try {
222 assert channel.eventLoop().inEventLoop();
223 Future<Boolean> f = healthCheck.isHealthy(channel);
224 if (f.isDone()) {
225 notifyHealthCheck(f, channel, promise);
226 } else {
227 f.addListener((FutureListener<Boolean>) future -> notifyHealthCheck(future, channel, promise));
228 }
229 } catch (Throwable cause) {
230 closeAndFail(channel, cause, promise);
231 }
232 }
233
234 private void notifyHealthCheck(Future<Boolean> future, Channel channel, Promise<Channel> promise) {
235 try {
236 assert channel.eventLoop().inEventLoop();
237 if (future.isSuccess() && future.getNow()) {
238 channel.attr(POOL_KEY).set(this);
239 handler.channelAcquired(channel);
240 promise.setSuccess(channel);
241 } else {
242 closeChannel(channel);
243 acquireHealthyFromPoolOrNew(promise);
244 }
245 } catch (Throwable cause) {
246 closeAndFail(channel, cause, promise);
247 }
248 }
249
250 /**
251 * Bootstrap a new {@link Channel}. The default implementation uses {@link Bootstrap#connect()}, sub-classes may
252 * override this.
253 * <p>
254 * The {@link Bootstrap} that is passed in here is cloned via {@link Bootstrap#clone()}, so it is safe to modify.
255 */
256 protected ChannelFuture connectChannel(Bootstrap bs) {
257 return bs.connect();
258 }
259
260 @Override
261 public final Future<Void> release(Channel channel) {
262 return release(channel, channel.eventLoop().<Void>newPromise());
263 }
264
265 @Override
266 public Future<Void> release(final Channel channel, final Promise<Void> promise) {
267 try {
268 checkNotNull(channel, "channel");
269 checkNotNull(promise, "promise");
270 EventLoop loop = channel.eventLoop();
271 if (loop.inEventLoop()) {
272 doReleaseChannel(channel, promise);
273 } else {
274 loop.execute(new Runnable() {
275 @Override
276 public void run() {
277 doReleaseChannel(channel, promise);
278 }
279 });
280 }
281 } catch (Throwable cause) {
282 closeAndFail(channel, cause, promise);
283 }
284 return promise;
285 }
286
287 private void doReleaseChannel(Channel channel, Promise<Void> promise) {
288 try {
289 assert channel.eventLoop().inEventLoop();
290 // Remove the POOL_KEY attribute from the Channel and check if it was acquired from this pool, if not fail.
291 if (channel.attr(POOL_KEY).getAndSet(null) != this) {
292 closeAndFail(channel,
293 // Better include a stacktrace here as this is an user error.
294 new IllegalArgumentException(
295 "Channel " + channel + " was not acquired from this ChannelPool"),
296 promise);
297 } else {
298 if (releaseHealthCheck) {
299 doHealthCheckOnRelease(channel, promise);
300 } else {
301 releaseAndOffer(channel, promise);
302 }
303 }
304 } catch (Throwable cause) {
305 closeAndFail(channel, cause, promise);
306 }
307 }
308
309 private void doHealthCheckOnRelease(final Channel channel, final Promise<Void> promise) throws Exception {
310 final Future<Boolean> f = healthCheck.isHealthy(channel);
311 if (f.isDone()) {
312 releaseAndOfferIfHealthy(channel, promise, f);
313 } else {
314 f.addListener((FutureListener<Boolean>) future -> releaseAndOfferIfHealthy(channel, promise, f));
315 }
316 }
317
318 /**
319 * Adds the channel back to the pool only if the channel is healthy.
320 * @param channel the channel to put back to the pool
321 * @param promise offer operation promise.
322 * @param future the future that contains information fif channel is healthy or not.
323 * @throws Exception in case when failed to notify handler about release operation.
324 */
325 private void releaseAndOfferIfHealthy(Channel channel, Promise<Void> promise, Future<Boolean> future) {
326 try {
327 if (future.getNow()) { //channel turns out to be healthy, offering and releasing it.
328 releaseAndOffer(channel, promise);
329 } else { //channel not healthy, just releasing it.
330 handler.channelReleased(channel);
331 promise.setSuccess(null);
332 }
333 } catch (Throwable cause) {
334 closeAndFail(channel, cause, promise);
335 }
336 }
337
338 private void releaseAndOffer(Channel channel, Promise<Void> promise) throws Exception {
339 if (offerChannel(channel)) {
340 handler.channelReleased(channel);
341 promise.setSuccess(null);
342 } else {
343 closeAndFail(channel, new ChannelPoolFullException(), promise);
344 }
345 }
346
347 private void closeChannel(Channel channel) throws Exception {
348 channel.attr(POOL_KEY).getAndSet(null);
349 channel.close();
350 }
351
352 private void closeAndFail(Channel channel, Throwable cause, Promise<?> promise) {
353 if (channel != null) {
354 try {
355 closeChannel(channel);
356 } catch (Throwable t) {
357 promise.tryFailure(t);
358 }
359 }
360 promise.tryFailure(cause);
361 }
362
363 /**
364 * Poll a {@link Channel} out of the internal storage to reuse it. This will return {@code null} if no
365 * {@link Channel} is ready to be reused.
366 *
367 * Sub-classes may override {@link #pollChannel()} and {@link #offerChannel(Channel)}. Be aware that
368 * implementations of these methods needs to be thread-safe!
369 */
370 protected Channel pollChannel() {
371 return lastRecentUsed ? deque.pollLast() : deque.pollFirst();
372 }
373
374 /**
375 * Offer a {@link Channel} back to the internal storage. This will return {@code true} if the {@link Channel}
376 * could be added, {@code false} otherwise.
377 *
378 * Sub-classes may override {@link #pollChannel()} and {@link #offerChannel(Channel)}. Be aware that
379 * implementations of these methods needs to be thread-safe!
380 */
381 protected boolean offerChannel(Channel channel) {
382 return deque.offer(channel);
383 }
384
385 @Override
386 public void close() {
387 for (;;) {
388 Channel channel = pollChannel();
389 if (channel == null) {
390 break;
391 }
392 // Just ignore any errors that are reported back from close().
393 channel.close().awaitUninterruptibly();
394 }
395 }
396
397 /**
398 * Closes the pool in an async manner.
399 *
400 * @return Future which represents completion of the close task
401 */
402 public Future<Void> closeAsync() {
403 // Execute close asynchronously in case this is being invoked on an eventloop to avoid blocking
404 return GlobalEventExecutor.INSTANCE.submit(new Callable<Void>() {
405 @Override
406 public Void call() throws Exception {
407 close();
408 return null;
409 }
410 });
411 }
412
413 private static final class ChannelPoolFullException extends IllegalStateException {
414
415 private ChannelPoolFullException() {
416 super("ChannelPool full");
417 }
418
419 // Suppress a warning since the method doesn't need synchronization
420 @Override
421 public Throwable fillInStackTrace() {
422 return this;
423 }
424 }
425 }