View Javadoc
1   /*
2    * Copyright 2015 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.pool;
17  
18  import io.netty.bootstrap.Bootstrap;
19  import io.netty.channel.Channel;
20  import io.netty.channel.ChannelFuture;
21  import io.netty.channel.ChannelFutureListener;
22  import io.netty.channel.ChannelInitializer;
23  import io.netty.channel.EventLoop;
24  import io.netty.util.AttributeKey;
25  import io.netty.util.concurrent.Future;
26  import io.netty.util.concurrent.FutureListener;
27  import io.netty.util.concurrent.GlobalEventExecutor;
28  import io.netty.util.concurrent.Promise;
29  import io.netty.util.internal.PlatformDependent;
30  
31  import java.util.Deque;
32  import java.util.concurrent.Callable;
33  
34  import static io.netty.util.internal.ObjectUtil.*;
35  
36  /**
37   * Simple {@link ChannelPool} implementation which will create new {@link Channel}s if someone tries to acquire
38   * a {@link Channel} but none is in the pool atm. No limit on the maximal concurrent {@link Channel}s is enforced.
39   *
40   * This implementation uses LIFO order for {@link Channel}s in the {@link ChannelPool}.
41   *
42   */
43  public class SimpleChannelPool implements ChannelPool {
44      private static final AttributeKey<SimpleChannelPool> POOL_KEY =
45          AttributeKey.newInstance("io.netty.channel.pool.SimpleChannelPool");
46      private final Deque<Channel> deque = PlatformDependent.newConcurrentDeque();
47      private final ChannelPoolHandler handler;
48      private final ChannelHealthChecker healthCheck;
49      private final Bootstrap bootstrap;
50      private final boolean releaseHealthCheck;
51      private final boolean lastRecentUsed;
52  
53      /**
54       * Creates a new instance using the {@link ChannelHealthChecker#ACTIVE}.
55       *
56       * @param bootstrap         the {@link Bootstrap} that is used for connections
57       * @param handler           the {@link ChannelPoolHandler} that will be notified for the different pool actions
58       */
59      public SimpleChannelPool(Bootstrap bootstrap, final ChannelPoolHandler handler) {
60          this(bootstrap, handler, ChannelHealthChecker.ACTIVE);
61      }
62  
63      /**
64       * Creates a new instance.
65       *
66       * @param bootstrap         the {@link Bootstrap} that is used for connections
67       * @param handler           the {@link ChannelPoolHandler} that will be notified for the different pool actions
68       * @param healthCheck       the {@link ChannelHealthChecker} that will be used to check if a {@link Channel} is
69       *                          still healthy when obtain from the {@link ChannelPool}
70       */
71      public SimpleChannelPool(Bootstrap bootstrap, final ChannelPoolHandler handler, ChannelHealthChecker healthCheck) {
72          this(bootstrap, handler, healthCheck, true);
73      }
74  
75      /**
76       * Creates a new instance.
77       *
78       * @param bootstrap          the {@link Bootstrap} that is used for connections
79       * @param handler            the {@link ChannelPoolHandler} that will be notified for the different pool actions
80       * @param healthCheck        the {@link ChannelHealthChecker} that will be used to check if a {@link Channel} is
81       *                           still healthy when obtain from the {@link ChannelPool}
82       * @param releaseHealthCheck will check channel health before offering back if this parameter set to {@code true};
83       *                           otherwise, channel health is only checked at acquisition time
84       */
85      public SimpleChannelPool(Bootstrap bootstrap, final ChannelPoolHandler handler, ChannelHealthChecker healthCheck,
86                               boolean releaseHealthCheck) {
87          this(bootstrap, handler, healthCheck, releaseHealthCheck, true);
88      }
89  
90      /**
91       * Creates a new instance.
92       *
93       * @param bootstrap          the {@link Bootstrap} that is used for connections
94       * @param handler            the {@link ChannelPoolHandler} that will be notified for the different pool actions
95       * @param healthCheck        the {@link ChannelHealthChecker} that will be used to check if a {@link Channel} is
96       *                           still healthy when obtain from the {@link ChannelPool}
97       * @param releaseHealthCheck will check channel health before offering back if this parameter set to {@code true};
98       *                           otherwise, channel health is only checked at acquisition time
99       * @param lastRecentUsed    {@code true} {@link Channel} selection will be LIFO, if {@code false} FIFO.
100      */
101     public SimpleChannelPool(Bootstrap bootstrap, final ChannelPoolHandler handler, ChannelHealthChecker healthCheck,
102                              boolean releaseHealthCheck, boolean lastRecentUsed) {
103         this.handler = checkNotNull(handler, "handler");
104         this.healthCheck = checkNotNull(healthCheck, "healthCheck");
105         this.releaseHealthCheck = releaseHealthCheck;
106         // Clone the original Bootstrap as we want to set our own handler
107         this.bootstrap = checkNotNull(bootstrap, "bootstrap").clone();
108         this.bootstrap.handler(new ChannelInitializer<Channel>() {
109             @Override
110             protected void initChannel(Channel ch) throws Exception {
111                 assert ch.eventLoop().inEventLoop();
112                 handler.channelCreated(ch);
113             }
114         });
115         this.lastRecentUsed = lastRecentUsed;
116     }
117 
118     /**
119      * Returns the {@link Bootstrap} this pool will use to open new connections.
120      *
121      * @return the {@link Bootstrap} this pool will use to open new connections
122      */
123     protected Bootstrap bootstrap() {
124         return bootstrap;
125     }
126 
127     /**
128      * Returns the {@link ChannelPoolHandler} that will be notified for the different pool actions.
129      *
130      * @return the {@link ChannelPoolHandler} that will be notified for the different pool actions
131      */
132     protected ChannelPoolHandler handler() {
133         return handler;
134     }
135 
136     /**
137      * Returns the {@link ChannelHealthChecker} that will be used to check if a {@link Channel} is healthy.
138      *
139      * @return the {@link ChannelHealthChecker} that will be used to check if a {@link Channel} is healthy
140      */
141     protected ChannelHealthChecker healthChecker() {
142         return healthCheck;
143     }
144 
145     /**
146      * Indicates whether this pool will check the health of channels before offering them back into the pool.
147      *
148      * @return {@code true} if this pool will check the health of channels before offering them back into the pool, or
149      * {@code false} if channel health is only checked at acquisition time
150      */
151     protected boolean releaseHealthCheck() {
152         return releaseHealthCheck;
153     }
154 
155     @Override
156     public final Future<Channel> acquire() {
157         return acquire(bootstrap.config().group().next().<Channel>newPromise());
158     }
159 
160     @Override
161     public Future<Channel> acquire(final Promise<Channel> promise) {
162         return acquireHealthyFromPoolOrNew(checkNotNull(promise, "promise"));
163     }
164 
165     /**
166      * Tries to retrieve healthy channel from the pool if any or creates a new channel otherwise.
167      * @param promise the promise to provide acquire result.
168      * @return future for acquiring a channel.
169      */
170     private Future<Channel> acquireHealthyFromPoolOrNew(final Promise<Channel> promise) {
171         try {
172             final Channel ch = pollChannel();
173             if (ch == null) {
174                 // No Channel left in the pool bootstrap a new Channel
175                 Bootstrap bs = bootstrap.clone();
176                 bs.attr(POOL_KEY, this);
177                 ChannelFuture f = connectChannel(bs);
178                 if (f.isDone()) {
179                     notifyConnect(f, promise);
180                 } else {
181                     f.addListener(new ChannelFutureListener() {
182                         @Override
183                         public void operationComplete(ChannelFuture future) throws Exception {
184                             notifyConnect(future, promise);
185                         }
186                     });
187                 }
188             } else {
189                 EventLoop loop = ch.eventLoop();
190                 if (loop.inEventLoop()) {
191                     doHealthCheck(ch, promise);
192                 } else {
193                     loop.execute(new Runnable() {
194                         @Override
195                         public void run() {
196                             doHealthCheck(ch, promise);
197                         }
198                     });
199                 }
200             }
201         } catch (Throwable cause) {
202             promise.tryFailure(cause);
203         }
204         return promise;
205     }
206 
207     private void notifyConnect(ChannelFuture future, Promise<Channel> promise) {
208         Channel channel = null;
209         try {
210             if (future.isSuccess()) {
211                 channel = future.channel();
212                 handler.channelAcquired(channel);
213                 if (!promise.trySuccess(channel)) {
214                     // Promise was completed in the meantime (like cancelled), just release the channel again
215                     release(channel);
216                 }
217             } else {
218                 promise.tryFailure(future.cause());
219             }
220         } catch (Throwable cause) {
221             closeAndFail(channel, cause, promise);
222         }
223     }
224 
225     private void doHealthCheck(final Channel channel, final Promise<Channel> promise) {
226         try {
227             assert channel.eventLoop().inEventLoop();
228             Future<Boolean> f = healthCheck.isHealthy(channel);
229             if (f.isDone()) {
230                 notifyHealthCheck(f, channel, promise);
231             } else {
232                 f.addListener(new FutureListener<Boolean>() {
233                     @Override
234                     public void operationComplete(Future<Boolean> future) {
235                         notifyHealthCheck(future, channel, promise);
236                     }
237                 });
238             }
239         } catch (Throwable cause) {
240             closeAndFail(channel, cause, promise);
241         }
242     }
243 
244     private void notifyHealthCheck(Future<Boolean> future, Channel channel, Promise<Channel> promise) {
245         try {
246             assert channel.eventLoop().inEventLoop();
247             if (future.isSuccess() && future.getNow()) {
248                 channel.attr(POOL_KEY).set(this);
249                 handler.channelAcquired(channel);
250                 promise.setSuccess(channel);
251             } else {
252                 closeChannel(channel);
253                 acquireHealthyFromPoolOrNew(promise);
254             }
255         } catch (Throwable cause) {
256             closeAndFail(channel, cause, promise);
257         }
258     }
259 
260     /**
261      * Bootstrap a new {@link Channel}. The default implementation uses {@link Bootstrap#connect()}, sub-classes may
262      * override this.
263      * <p>
264      * The {@link Bootstrap} that is passed in here is cloned via {@link Bootstrap#clone()}, so it is safe to modify.
265      */
266     protected ChannelFuture connectChannel(Bootstrap bs) {
267         return bs.connect();
268     }
269 
270     @Override
271     public final Future<Void> release(Channel channel) {
272         return release(channel, channel.eventLoop().<Void>newPromise());
273     }
274 
275     @Override
276     public Future<Void> release(final Channel channel, final Promise<Void> promise) {
277         try {
278             checkNotNull(channel, "channel");
279             checkNotNull(promise, "promise");
280             EventLoop loop = channel.eventLoop();
281             if (loop.inEventLoop()) {
282                 doReleaseChannel(channel, promise);
283             } else {
284                 loop.execute(new Runnable() {
285                     @Override
286                     public void run() {
287                         doReleaseChannel(channel, promise);
288                     }
289                 });
290             }
291         } catch (Throwable cause) {
292             closeAndFail(channel, cause, promise);
293         }
294         return promise;
295     }
296 
297     private void doReleaseChannel(Channel channel, Promise<Void> promise) {
298         try {
299             assert channel.eventLoop().inEventLoop();
300             // Remove the POOL_KEY attribute from the Channel and check if it was acquired from this pool, if not fail.
301             if (channel.attr(POOL_KEY).getAndSet(null) != this) {
302                 closeAndFail(channel,
303                              // Better include a stacktrace here as this is an user error.
304                              new IllegalArgumentException(
305                                      "Channel " + channel + " was not acquired from this ChannelPool"),
306                              promise);
307             } else {
308                 if (releaseHealthCheck) {
309                     doHealthCheckOnRelease(channel, promise);
310                 } else {
311                     releaseAndOffer(channel, promise);
312                 }
313             }
314         } catch (Throwable cause) {
315             closeAndFail(channel, cause, promise);
316         }
317     }
318 
319     private void doHealthCheckOnRelease(final Channel channel, final Promise<Void> promise) throws Exception {
320         final Future<Boolean> f = healthCheck.isHealthy(channel);
321         if (f.isDone()) {
322             releaseAndOfferIfHealthy(channel, promise, f);
323         } else {
324             f.addListener(new FutureListener<Boolean>() {
325                 @Override
326                 public void operationComplete(Future<Boolean> future) throws Exception {
327                     releaseAndOfferIfHealthy(channel, promise, f);
328                 }
329             });
330         }
331     }
332 
333     /**
334      * Adds the channel back to the pool only if the channel is healthy.
335      * @param channel the channel to put back to the pool
336      * @param promise offer operation promise.
337      * @param future the future that contains information fif channel is healthy or not.
338      * @throws Exception in case when failed to notify handler about release operation.
339      */
340     private void releaseAndOfferIfHealthy(Channel channel, Promise<Void> promise, Future<Boolean> future) {
341         try {
342             if (future.getNow()) { //channel turns out to be healthy, offering and releasing it.
343                 releaseAndOffer(channel, promise);
344             } else { //channel not healthy, just releasing it.
345                 handler.channelReleased(channel);
346                 promise.setSuccess(null);
347             }
348         } catch (Throwable cause) {
349             closeAndFail(channel, cause, promise);
350         }
351     }
352 
353     private void releaseAndOffer(Channel channel, Promise<Void> promise) throws Exception {
354         if (offerChannel(channel)) {
355             handler.channelReleased(channel);
356             promise.setSuccess(null);
357         } else {
358             closeAndFail(channel, new ChannelPoolFullException(), promise);
359         }
360     }
361 
362     private void closeChannel(Channel channel) throws Exception {
363         channel.attr(POOL_KEY).getAndSet(null);
364         channel.close();
365     }
366 
367     private void closeAndFail(Channel channel, Throwable cause, Promise<?> promise) {
368         if (channel != null) {
369             try {
370                 closeChannel(channel);
371             } catch (Throwable t) {
372                 promise.tryFailure(t);
373             }
374         }
375         promise.tryFailure(cause);
376     }
377 
378     /**
379      * Poll a {@link Channel} out of the internal storage to reuse it. This will return {@code null} if no
380      * {@link Channel} is ready to be reused.
381      *
382      * Sub-classes may override {@link #pollChannel()} and {@link #offerChannel(Channel)}. Be aware that
383      * implementations of these methods needs to be thread-safe!
384      */
385     protected Channel pollChannel() {
386         return lastRecentUsed ? deque.pollLast() : deque.pollFirst();
387     }
388 
389     /**
390      * Offer a {@link Channel} back to the internal storage. This will return {@code true} if the {@link Channel}
391      * could be added, {@code false} otherwise.
392      *
393      * Sub-classes may override {@link #pollChannel()} and {@link #offerChannel(Channel)}. Be aware that
394      * implementations of these methods needs to be thread-safe!
395      */
396     protected boolean offerChannel(Channel channel) {
397         return deque.offer(channel);
398     }
399 
400     @Override
401     public void close() {
402         for (;;) {
403             Channel channel = pollChannel();
404             if (channel == null) {
405                 break;
406             }
407             // Just ignore any errors that are reported back from close().
408             channel.close().awaitUninterruptibly();
409         }
410     }
411 
412     /**
413      * Closes the pool in an async manner.
414      *
415      * @return Future which represents completion of the close task
416      */
417     public Future<Void> closeAsync() {
418         // Execute close asynchronously in case this is being invoked on an eventloop to avoid blocking
419         return GlobalEventExecutor.INSTANCE.submit(new Callable<Void>() {
420             @Override
421             public Void call() throws Exception {
422                 close();
423                 return null;
424             }
425         });
426     }
427 
428     private static final class ChannelPoolFullException extends IllegalStateException {
429 
430         private ChannelPoolFullException() {
431             super("ChannelPool full");
432         }
433 
434         // Suppress a warning since the method doesn't need synchronization
435         @Override
436         public Throwable fillInStackTrace() {
437             return this;
438         }
439     }
440 }