View Javadoc
1   /*
2    * Copyright 2015 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.pool;
17  
18  import io.netty.bootstrap.Bootstrap;
19  import io.netty.channel.Channel;
20  import io.netty.channel.ChannelFuture;
21  import io.netty.channel.ChannelFutureListener;
22  import io.netty.channel.ChannelInitializer;
23  import io.netty.channel.EventLoop;
24  import io.netty.util.AttributeKey;
25  import io.netty.util.concurrent.Future;
26  import io.netty.util.concurrent.FutureListener;
27  import io.netty.util.concurrent.GlobalEventExecutor;
28  import io.netty.util.concurrent.Promise;
29  import io.netty.util.internal.PlatformDependent;
30  
31  import java.util.Deque;
32  import java.util.concurrent.Callable;
33  
34  import static io.netty.util.internal.ObjectUtil.*;
35  
36  /**
37   * Simple {@link ChannelPool} implementation which will create new {@link Channel}s if someone tries to acquire
38   * a {@link Channel} but none is in the pool atm. No limit on the maximal concurrent {@link Channel}s is enforced.
39   *
40   * This implementation uses LIFO order for {@link Channel}s in the {@link ChannelPool}.
41   *
42   */
43  public class SimpleChannelPool implements ChannelPool {
44      private static final AttributeKey<SimpleChannelPool> POOL_KEY =
45          AttributeKey.newInstance("io.netty.channel.pool.SimpleChannelPool");
46      private final Deque<Channel> deque = PlatformDependent.newConcurrentDeque();
47      private final ChannelPoolHandler handler;
48      private final ChannelHealthChecker healthCheck;
49      private final Bootstrap bootstrap;
50      private final boolean releaseHealthCheck;
51      private final boolean lastRecentUsed;
52  
53      /**
54       * Creates a new instance using the {@link ChannelHealthChecker#ACTIVE}.
55       *
56       * @param bootstrap         the {@link Bootstrap} that is used for connections
57       * @param handler           the {@link ChannelPoolHandler} that will be notified for the different pool actions
58       */
59      public SimpleChannelPool(Bootstrap bootstrap, final ChannelPoolHandler handler) {
60          this(bootstrap, handler, ChannelHealthChecker.ACTIVE);
61      }
62  
63      /**
64       * Creates a new instance.
65       *
66       * @param bootstrap         the {@link Bootstrap} that is used for connections
67       * @param handler           the {@link ChannelPoolHandler} that will be notified for the different pool actions
68       * @param healthCheck       the {@link ChannelHealthChecker} that will be used to check if a {@link Channel} is
69       *                          still healthy when obtain from the {@link ChannelPool}
70       */
71      public SimpleChannelPool(Bootstrap bootstrap, final ChannelPoolHandler handler, ChannelHealthChecker healthCheck) {
72          this(bootstrap, handler, healthCheck, true);
73      }
74  
75      /**
76       * Creates a new instance.
77       *
78       * @param bootstrap          the {@link Bootstrap} that is used for connections
79       * @param handler            the {@link ChannelPoolHandler} that will be notified for the different pool actions
80       * @param healthCheck        the {@link ChannelHealthChecker} that will be used to check if a {@link Channel} is
81       *                           still healthy when obtain from the {@link ChannelPool}
82       * @param releaseHealthCheck will check channel health before offering back if this parameter set to {@code true};
83       *                           otherwise, channel health is only checked at acquisition time
84       */
85      public SimpleChannelPool(Bootstrap bootstrap, final ChannelPoolHandler handler, ChannelHealthChecker healthCheck,
86                               boolean releaseHealthCheck) {
87          this(bootstrap, handler, healthCheck, releaseHealthCheck, true);
88      }
89  
90      /**
91       * Creates a new instance.
92       *
93       * @param bootstrap          the {@link Bootstrap} that is used for connections
94       * @param handler            the {@link ChannelPoolHandler} that will be notified for the different pool actions
95       * @param healthCheck        the {@link ChannelHealthChecker} that will be used to check if a {@link Channel} is
96       *                           still healthy when obtain from the {@link ChannelPool}
97       * @param releaseHealthCheck will check channel health before offering back if this parameter set to {@code true};
98       *                           otherwise, channel health is only checked at acquisition time
99       * @param lastRecentUsed    {@code true} {@link Channel} selection will be LIFO, if {@code false} FIFO.
100      */
101     public SimpleChannelPool(Bootstrap bootstrap, final ChannelPoolHandler handler, ChannelHealthChecker healthCheck,
102                              boolean releaseHealthCheck, boolean lastRecentUsed) {
103         this.handler = checkNotNull(handler, "handler");
104         this.healthCheck = checkNotNull(healthCheck, "healthCheck");
105         this.releaseHealthCheck = releaseHealthCheck;
106         // Clone the original Bootstrap as we want to set our own handler
107         this.bootstrap = checkNotNull(bootstrap, "bootstrap").clone();
108         this.bootstrap.handler(new ChannelInitializer<Channel>() {
109             @Override
110             protected void initChannel(Channel ch) throws Exception {
111                 assert ch.eventLoop().inEventLoop();
112                 handler.channelCreated(ch);
113             }
114         });
115         this.lastRecentUsed = lastRecentUsed;
116     }
117 
118     /**
119      * Returns the {@link Bootstrap} this pool will use to open new connections.
120      *
121      * @return the {@link Bootstrap} this pool will use to open new connections
122      */
123     protected Bootstrap bootstrap() {
124         return bootstrap;
125     }
126 
127     /**
128      * Returns the {@link ChannelPoolHandler} that will be notified for the different pool actions.
129      *
130      * @return the {@link ChannelPoolHandler} that will be notified for the different pool actions
131      */
132     protected ChannelPoolHandler handler() {
133         return handler;
134     }
135 
136     /**
137      * Returns the {@link ChannelHealthChecker} that will be used to check if a {@link Channel} is healthy.
138      *
139      * @return the {@link ChannelHealthChecker} that will be used to check if a {@link Channel} is healthy
140      */
141     protected ChannelHealthChecker healthChecker() {
142         return healthCheck;
143     }
144 
145     /**
146      * Indicates whether this pool will check the health of channels before offering them back into the pool.
147      *
148      * @return {@code true} if this pool will check the health of channels before offering them back into the pool, or
149      * {@code false} if channel health is only checked at acquisition time
150      */
151     protected boolean releaseHealthCheck() {
152         return releaseHealthCheck;
153     }
154 
155     @Override
156     public final Future<Channel> acquire() {
157         return acquire(bootstrap.config().group().next().<Channel>newPromise());
158     }
159 
160     @Override
161     public Future<Channel> acquire(final Promise<Channel> promise) {
162         return acquireHealthyFromPoolOrNew(checkNotNull(promise, "promise"));
163     }
164 
165     /**
166      * Tries to retrieve healthy channel from the pool if any or creates a new channel otherwise.
167      * @param promise the promise to provide acquire result.
168      * @return future for acquiring a channel.
169      */
170     private Future<Channel> acquireHealthyFromPoolOrNew(final Promise<Channel> promise) {
171         try {
172             final Channel ch = pollChannel();
173             if (ch == null) {
174                 // No Channel left in the pool bootstrap a new Channel
175                 Bootstrap bs = bootstrap.clone();
176                 bs.attr(POOL_KEY, this);
177                 ChannelFuture f = connectChannel(bs);
178                 if (f.isDone()) {
179                     notifyConnect(f, promise);
180                 } else {
181                     f.addListener((ChannelFutureListener) future -> notifyConnect(future, promise));
182                 }
183             } else {
184                 EventLoop loop = ch.eventLoop();
185                 if (loop.inEventLoop()) {
186                     doHealthCheck(ch, promise);
187                 } else {
188                     loop.execute(new Runnable() {
189                         @Override
190                         public void run() {
191                             doHealthCheck(ch, promise);
192                         }
193                     });
194                 }
195             }
196         } catch (Throwable cause) {
197             promise.tryFailure(cause);
198         }
199         return promise;
200     }
201 
202     private void notifyConnect(ChannelFuture future, Promise<Channel> promise) {
203         Channel channel = null;
204         try {
205             if (future.isSuccess()) {
206                 channel = future.channel();
207                 handler.channelAcquired(channel);
208                 if (!promise.trySuccess(channel)) {
209                     // Promise was completed in the meantime (like cancelled), just release the channel again
210                     release(channel);
211                 }
212             } else {
213                 promise.tryFailure(future.cause());
214             }
215         } catch (Throwable cause) {
216             closeAndFail(channel, cause, promise);
217         }
218     }
219 
220     private void doHealthCheck(final Channel channel, final Promise<Channel> promise) {
221         try {
222             assert channel.eventLoop().inEventLoop();
223             Future<Boolean> f = healthCheck.isHealthy(channel);
224             if (f.isDone()) {
225                 notifyHealthCheck(f, channel, promise);
226             } else {
227                 f.addListener((FutureListener<Boolean>) future -> notifyHealthCheck(future, channel, promise));
228             }
229         } catch (Throwable cause) {
230             closeAndFail(channel, cause, promise);
231         }
232     }
233 
234     private void notifyHealthCheck(Future<Boolean> future, Channel channel, Promise<Channel> promise) {
235         try {
236             assert channel.eventLoop().inEventLoop();
237             if (future.isSuccess() && future.getNow()) {
238                 channel.attr(POOL_KEY).set(this);
239                 handler.channelAcquired(channel);
240                 promise.setSuccess(channel);
241             } else {
242                 closeChannel(channel);
243                 acquireHealthyFromPoolOrNew(promise);
244             }
245         } catch (Throwable cause) {
246             closeAndFail(channel, cause, promise);
247         }
248     }
249 
250     /**
251      * Bootstrap a new {@link Channel}. The default implementation uses {@link Bootstrap#connect()}, sub-classes may
252      * override this.
253      * <p>
254      * The {@link Bootstrap} that is passed in here is cloned via {@link Bootstrap#clone()}, so it is safe to modify.
255      */
256     protected ChannelFuture connectChannel(Bootstrap bs) {
257         return bs.connect();
258     }
259 
260     @Override
261     public final Future<Void> release(Channel channel) {
262         return release(channel, channel.eventLoop().<Void>newPromise());
263     }
264 
265     @Override
266     public Future<Void> release(final Channel channel, final Promise<Void> promise) {
267         try {
268             checkNotNull(channel, "channel");
269             checkNotNull(promise, "promise");
270             EventLoop loop = channel.eventLoop();
271             if (loop.inEventLoop()) {
272                 doReleaseChannel(channel, promise);
273             } else {
274                 loop.execute(new Runnable() {
275                     @Override
276                     public void run() {
277                         doReleaseChannel(channel, promise);
278                     }
279                 });
280             }
281         } catch (Throwable cause) {
282             closeAndFail(channel, cause, promise);
283         }
284         return promise;
285     }
286 
287     private void doReleaseChannel(Channel channel, Promise<Void> promise) {
288         try {
289             assert channel.eventLoop().inEventLoop();
290             // Remove the POOL_KEY attribute from the Channel and check if it was acquired from this pool, if not fail.
291             if (channel.attr(POOL_KEY).getAndSet(null) != this) {
292                 closeAndFail(channel,
293                              // Better include a stacktrace here as this is an user error.
294                              new IllegalArgumentException(
295                                      "Channel " + channel + " was not acquired from this ChannelPool"),
296                              promise);
297             } else {
298                 if (releaseHealthCheck) {
299                     doHealthCheckOnRelease(channel, promise);
300                 } else {
301                     releaseAndOffer(channel, promise);
302                 }
303             }
304         } catch (Throwable cause) {
305             closeAndFail(channel, cause, promise);
306         }
307     }
308 
309     private void doHealthCheckOnRelease(final Channel channel, final Promise<Void> promise) throws Exception {
310         final Future<Boolean> f = healthCheck.isHealthy(channel);
311         if (f.isDone()) {
312             releaseAndOfferIfHealthy(channel, promise, f);
313         } else {
314             f.addListener((FutureListener<Boolean>) future -> releaseAndOfferIfHealthy(channel, promise, f));
315         }
316     }
317 
318     /**
319      * Adds the channel back to the pool only if the channel is healthy.
320      * @param channel the channel to put back to the pool
321      * @param promise offer operation promise.
322      * @param future the future that contains information fif channel is healthy or not.
323      * @throws Exception in case when failed to notify handler about release operation.
324      */
325     private void releaseAndOfferIfHealthy(Channel channel, Promise<Void> promise, Future<Boolean> future) {
326         try {
327             if (future.getNow()) { //channel turns out to be healthy, offering and releasing it.
328                 releaseAndOffer(channel, promise);
329             } else { //channel not healthy, just releasing it.
330                 handler.channelReleased(channel);
331                 promise.setSuccess(null);
332             }
333         } catch (Throwable cause) {
334             closeAndFail(channel, cause, promise);
335         }
336     }
337 
338     private void releaseAndOffer(Channel channel, Promise<Void> promise) throws Exception {
339         if (offerChannel(channel)) {
340             handler.channelReleased(channel);
341             promise.setSuccess(null);
342         } else {
343             closeAndFail(channel, new ChannelPoolFullException(), promise);
344         }
345     }
346 
347     private void closeChannel(Channel channel) throws Exception {
348         channel.attr(POOL_KEY).getAndSet(null);
349         channel.close();
350     }
351 
352     private void closeAndFail(Channel channel, Throwable cause, Promise<?> promise) {
353         if (channel != null) {
354             try {
355                 closeChannel(channel);
356             } catch (Throwable t) {
357                 promise.tryFailure(t);
358             }
359         }
360         promise.tryFailure(cause);
361     }
362 
363     /**
364      * Poll a {@link Channel} out of the internal storage to reuse it. This will return {@code null} if no
365      * {@link Channel} is ready to be reused.
366      *
367      * Sub-classes may override {@link #pollChannel()} and {@link #offerChannel(Channel)}. Be aware that
368      * implementations of these methods needs to be thread-safe!
369      */
370     protected Channel pollChannel() {
371         return lastRecentUsed ? deque.pollLast() : deque.pollFirst();
372     }
373 
374     /**
375      * Offer a {@link Channel} back to the internal storage. This will return {@code true} if the {@link Channel}
376      * could be added, {@code false} otherwise.
377      *
378      * Sub-classes may override {@link #pollChannel()} and {@link #offerChannel(Channel)}. Be aware that
379      * implementations of these methods needs to be thread-safe!
380      */
381     protected boolean offerChannel(Channel channel) {
382         return deque.offer(channel);
383     }
384 
385     @Override
386     public void close() {
387         for (;;) {
388             Channel channel = pollChannel();
389             if (channel == null) {
390                 break;
391             }
392             // Just ignore any errors that are reported back from close().
393             channel.close().awaitUninterruptibly();
394         }
395     }
396 
397     /**
398      * Closes the pool in an async manner.
399      *
400      * @return Future which represents completion of the close task
401      */
402     public Future<Void> closeAsync() {
403         // Execute close asynchronously in case this is being invoked on an eventloop to avoid blocking
404         return GlobalEventExecutor.INSTANCE.submit(new Callable<Void>() {
405             @Override
406             public Void call() throws Exception {
407                 close();
408                 return null;
409             }
410         });
411     }
412 
413     private static final class ChannelPoolFullException extends IllegalStateException {
414 
415         private ChannelPoolFullException() {
416             super("ChannelPool full");
417         }
418 
419         // Suppress a warning since the method doesn't need synchronization
420         @Override
421         public Throwable fillInStackTrace() {
422             return this;
423         }
424     }
425 }