View Javadoc
1   /*
2    * Copyright 2015 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.pool;
17  
18  import io.netty.bootstrap.Bootstrap;
19  import io.netty.channel.Channel;
20  import io.netty.channel.ChannelFuture;
21  import io.netty.channel.ChannelFutureListener;
22  import io.netty.channel.ChannelInitializer;
23  import io.netty.channel.EventLoop;
24  import io.netty.util.AttributeKey;
25  import io.netty.util.concurrent.Future;
26  import io.netty.util.concurrent.FutureListener;
27  import io.netty.util.concurrent.Promise;
28  import io.netty.util.internal.PlatformDependent;
29  import io.netty.util.internal.ThrowableUtil;
30  
31  import java.util.Deque;
32  
33  import static io.netty.util.internal.ObjectUtil.*;
34  
35  /**
36   * Simple {@link ChannelPool} implementation which will create new {@link Channel}s if someone tries to acquire
37   * a {@link Channel} but none is in the pool atm. No limit on the maximal concurrent {@link Channel}s is enforced.
38   *
39   * This implementation uses LIFO order for {@link Channel}s in the {@link ChannelPool}.
40   *
41   */
42  public class SimpleChannelPool implements ChannelPool {
43      private static final AttributeKey<SimpleChannelPool> POOL_KEY = AttributeKey.newInstance("channelPool");
44      private static final IllegalStateException FULL_EXCEPTION = ThrowableUtil.unknownStackTrace(
45              new IllegalStateException("ChannelPool full"), SimpleChannelPool.class, "releaseAndOffer(...)");
46  
47      private final Deque<Channel> deque = PlatformDependent.newConcurrentDeque();
48      private final ChannelPoolHandler handler;
49      private final ChannelHealthChecker healthCheck;
50      private final Bootstrap bootstrap;
51      private final boolean releaseHealthCheck;
52      private final boolean lastRecentUsed;
53  
54      /**
55       * Creates a new instance using the {@link ChannelHealthChecker#ACTIVE}.
56       *
57       * @param bootstrap         the {@link Bootstrap} that is used for connections
58       * @param handler           the {@link ChannelPoolHandler} that will be notified for the different pool actions
59       */
60      public SimpleChannelPool(Bootstrap bootstrap, final ChannelPoolHandler handler) {
61          this(bootstrap, handler, ChannelHealthChecker.ACTIVE);
62      }
63  
64      /**
65       * Creates a new instance.
66       *
67       * @param bootstrap         the {@link Bootstrap} that is used for connections
68       * @param handler           the {@link ChannelPoolHandler} that will be notified for the different pool actions
69       * @param healthCheck       the {@link ChannelHealthChecker} that will be used to check if a {@link Channel} is
70       *                          still healthy when obtain from the {@link ChannelPool}
71       */
72      public SimpleChannelPool(Bootstrap bootstrap, final ChannelPoolHandler handler, ChannelHealthChecker healthCheck) {
73          this(bootstrap, handler, healthCheck, true);
74      }
75  
76      /**
77       * Creates a new instance.
78       *
79       * @param bootstrap          the {@link Bootstrap} that is used for connections
80       * @param handler            the {@link ChannelPoolHandler} that will be notified for the different pool actions
81       * @param healthCheck        the {@link ChannelHealthChecker} that will be used to check if a {@link Channel} is
82       *                           still healthy when obtain from the {@link ChannelPool}
83       * @param releaseHealthCheck will check channel health before offering back if this parameter set to {@code true};
84       *                           otherwise, channel health is only checked at acquisition time
85       */
86      public SimpleChannelPool(Bootstrap bootstrap, final ChannelPoolHandler handler, ChannelHealthChecker healthCheck,
87                               boolean releaseHealthCheck) {
88          this(bootstrap, handler, healthCheck, releaseHealthCheck, true);
89      }
90  
91      /**
92       * Creates a new instance.
93       *
94       * @param bootstrap          the {@link Bootstrap} that is used for connections
95       * @param handler            the {@link ChannelPoolHandler} that will be notified for the different pool actions
96       * @param healthCheck        the {@link ChannelHealthChecker} that will be used to check if a {@link Channel} is
97       *                           still healthy when obtain from the {@link ChannelPool}
98       * @param releaseHealthCheck will check channel health before offering back if this parameter set to {@code true};
99       *                           otherwise, channel health is only checked at acquisition time
100      * @param lastRecentUsed    {@code true} {@link Channel} selection will be LIFO, if {@code false} FIFO.
101      */
102     public SimpleChannelPool(Bootstrap bootstrap, final ChannelPoolHandler handler, ChannelHealthChecker healthCheck,
103                              boolean releaseHealthCheck, boolean lastRecentUsed) {
104         this.handler = checkNotNull(handler, "handler");
105         this.healthCheck = checkNotNull(healthCheck, "healthCheck");
106         this.releaseHealthCheck = releaseHealthCheck;
107         // Clone the original Bootstrap as we want to set our own handler
108         this.bootstrap = checkNotNull(bootstrap, "bootstrap").clone();
109         this.bootstrap.handler(new ChannelInitializer<Channel>() {
110             @Override
111             protected void initChannel(Channel ch) throws Exception {
112                 assert ch.eventLoop().inEventLoop();
113                 handler.channelCreated(ch);
114             }
115         });
116         this.lastRecentUsed = lastRecentUsed;
117     }
118 
119     /**
120      * Returns the {@link Bootstrap} this pool will use to open new connections.
121      *
122      * @return the {@link Bootstrap} this pool will use to open new connections
123      */
124     protected Bootstrap bootstrap() {
125         return bootstrap;
126     }
127 
128     /**
129      * Returns the {@link ChannelPoolHandler} that will be notified for the different pool actions.
130      *
131      * @return the {@link ChannelPoolHandler} that will be notified for the different pool actions
132      */
133     protected ChannelPoolHandler handler() {
134         return handler;
135     }
136 
137     /**
138      * Returns the {@link ChannelHealthChecker} that will be used to check if a {@link Channel} is healthy.
139      *
140      * @return the {@link ChannelHealthChecker} that will be used to check if a {@link Channel} is healthy
141      */
142     protected ChannelHealthChecker healthChecker() {
143         return healthCheck;
144     }
145 
146     /**
147      * Indicates whether this pool will check the health of channels before offering them back into the pool.
148      *
149      * @return {@code true} if this pool will check the health of channels before offering them back into the pool, or
150      * {@code false} if channel health is only checked at acquisition time
151      */
152     protected boolean releaseHealthCheck() {
153         return releaseHealthCheck;
154     }
155 
156     @Override
157     public final Future<Channel> acquire() {
158         return acquire(bootstrap.group().next().<Channel>newPromise());
159     }
160 
161     @Override
162     public Future<Channel> acquire(final Promise<Channel> promise) {
163         checkNotNull(promise, "promise");
164         return acquireHealthyFromPoolOrNew(promise);
165     }
166 
167     /**
168      * Tries to retrieve healthy channel from the pool if any or creates a new channel otherwise.
169      * @param promise the promise to provide acquire result.
170      * @return future for acquiring a channel.
171      */
172     private Future<Channel> acquireHealthyFromPoolOrNew(final Promise<Channel> promise) {
173         try {
174             final Channel ch = pollChannel();
175             if (ch == null) {
176                 // No Channel left in the pool bootstrap a new Channel
177                 Bootstrap bs = bootstrap.clone();
178                 bs.attr(POOL_KEY, this);
179                 ChannelFuture f = connectChannel(bs);
180                 if (f.isDone()) {
181                     notifyConnect(f, promise);
182                 } else {
183                     f.addListener(new ChannelFutureListener() {
184                         @Override
185                         public void operationComplete(ChannelFuture future) throws Exception {
186                             notifyConnect(future, promise);
187                         }
188                     });
189                 }
190                 return promise;
191             }
192             EventLoop loop = ch.eventLoop();
193             if (loop.inEventLoop()) {
194                 doHealthCheck(ch, promise);
195             } else {
196                 loop.execute(new Runnable() {
197                     @Override
198                     public void run() {
199                         doHealthCheck(ch, promise);
200                     }
201                 });
202             }
203         } catch (Throwable cause) {
204             promise.tryFailure(cause);
205         }
206         return promise;
207     }
208 
209     private void notifyConnect(ChannelFuture future, Promise<Channel> promise) {
210         if (future.isSuccess()) {
211             Channel channel = future.channel();
212             if (!promise.trySuccess(channel)) {
213                 // Promise was completed in the meantime (like cancelled), just release the channel again
214                 release(channel);
215             }
216         } else {
217             promise.tryFailure(future.cause());
218         }
219     }
220 
221     private void doHealthCheck(final Channel ch, final Promise<Channel> promise) {
222         assert ch.eventLoop().inEventLoop();
223 
224         Future<Boolean> f = healthCheck.isHealthy(ch);
225         if (f.isDone()) {
226             notifyHealthCheck(f, ch, promise);
227         } else {
228             f.addListener(new FutureListener<Boolean>() {
229                 @Override
230                 public void operationComplete(Future<Boolean> future) throws Exception {
231                     notifyHealthCheck(future, ch, promise);
232                 }
233             });
234         }
235     }
236 
237     private void notifyHealthCheck(Future<Boolean> future, Channel ch, Promise<Channel> promise) {
238         assert ch.eventLoop().inEventLoop();
239 
240         if (future.isSuccess()) {
241             if (future.getNow() == Boolean.TRUE) {
242                 try {
243                     ch.attr(POOL_KEY).set(this);
244                     handler.channelAcquired(ch);
245                     promise.setSuccess(ch);
246                 } catch (Throwable cause) {
247                     closeAndFail(ch, cause, promise);
248                 }
249             } else {
250                 closeChannel(ch);
251                 acquireHealthyFromPoolOrNew(promise);
252             }
253         } else {
254             closeChannel(ch);
255             acquireHealthyFromPoolOrNew(promise);
256         }
257     }
258 
259     /**
260      * Bootstrap a new {@link Channel}. The default implementation uses {@link Bootstrap#connect()}, sub-classes may
261      * override this.
262      * <p>
263      * The {@link Bootstrap} that is passed in here is cloned via {@link Bootstrap#clone()}, so it is safe to modify.
264      */
265     protected ChannelFuture connectChannel(Bootstrap bs) {
266         return bs.connect();
267     }
268 
269     @Override
270     public final Future<Void> release(Channel channel) {
271         return release(channel, channel.eventLoop().<Void>newPromise());
272     }
273 
274     @Override
275     public Future<Void> release(final Channel channel, final Promise<Void> promise) {
276         checkNotNull(channel, "channel");
277         checkNotNull(promise, "promise");
278         try {
279             EventLoop loop = channel.eventLoop();
280             if (loop.inEventLoop()) {
281                 doReleaseChannel(channel, promise);
282             } else {
283                 loop.execute(new Runnable() {
284                     @Override
285                     public void run() {
286                         doReleaseChannel(channel, promise);
287                     }
288                 });
289             }
290         } catch (Throwable cause) {
291             closeAndFail(channel, cause, promise);
292         }
293         return promise;
294     }
295 
296     private void doReleaseChannel(Channel channel, Promise<Void> promise) {
297         assert channel.eventLoop().inEventLoop();
298         // Remove the POOL_KEY attribute from the Channel and check if it was acquired from this pool, if not fail.
299         if (channel.attr(POOL_KEY).getAndSet(null) != this) {
300             closeAndFail(channel,
301                          // Better include a stacktrace here as this is an user error.
302                          new IllegalArgumentException(
303                                  "Channel " + channel + " was not acquired from this ChannelPool"),
304                          promise);
305         } else {
306             try {
307                 if (releaseHealthCheck) {
308                     doHealthCheckOnRelease(channel, promise);
309                 } else {
310                     releaseAndOffer(channel, promise);
311                 }
312             } catch (Throwable cause) {
313                 closeAndFail(channel, cause, promise);
314             }
315         }
316     }
317 
318     private void doHealthCheckOnRelease(final Channel channel, final Promise<Void> promise) throws Exception {
319         final Future<Boolean> f = healthCheck.isHealthy(channel);
320         if (f.isDone()) {
321             releaseAndOfferIfHealthy(channel, promise, f);
322         } else {
323             f.addListener(new FutureListener<Boolean>() {
324                 @Override
325                 public void operationComplete(Future<Boolean> future) throws Exception {
326                     releaseAndOfferIfHealthy(channel, promise, f);
327                 }
328             });
329         }
330     }
331 
332     /**
333      * Adds the channel back to the pool only if the channel is healthy.
334      * @param channel the channel to put back to the pool
335      * @param promise offer operation promise.
336      * @param future the future that contains information fif channel is healthy or not.
337      * @throws Exception in case when failed to notify handler about release operation.
338      */
339     private void releaseAndOfferIfHealthy(Channel channel, Promise<Void> promise, Future<Boolean> future)
340             throws Exception {
341         if (future.getNow()) { //channel turns out to be healthy, offering and releasing it.
342             releaseAndOffer(channel, promise);
343         } else { //channel not healthy, just releasing it.
344             handler.channelReleased(channel);
345             promise.setSuccess(null);
346         }
347     }
348 
349     private void releaseAndOffer(Channel channel, Promise<Void> promise) throws Exception {
350         if (offerChannel(channel)) {
351             handler.channelReleased(channel);
352             promise.setSuccess(null);
353         } else {
354             closeAndFail(channel, FULL_EXCEPTION, promise);
355         }
356     }
357 
358     private static void closeChannel(Channel channel) {
359         channel.attr(POOL_KEY).getAndSet(null);
360         channel.close();
361     }
362 
363     private static void closeAndFail(Channel channel, Throwable cause, Promise<?> promise) {
364         closeChannel(channel);
365         promise.tryFailure(cause);
366     }
367 
368     /**
369      * Poll a {@link Channel} out of the internal storage to reuse it. This will return {@code null} if no
370      * {@link Channel} is ready to be reused.
371      *
372      * Sub-classes may override {@link #pollChannel()} and {@link #offerChannel(Channel)}. Be aware that
373      * implementations of these methods needs to be thread-safe!
374      */
375     protected Channel pollChannel() {
376         return lastRecentUsed ? deque.pollLast() : deque.pollFirst();
377     }
378 
379     /**
380      * Offer a {@link Channel} back to the internal storage. This will return {@code true} if the {@link Channel}
381      * could be added, {@code false} otherwise.
382      *
383      * Sub-classes may override {@link #pollChannel()} and {@link #offerChannel(Channel)}. Be aware that
384      * implementations of these methods needs to be thread-safe!
385      */
386     protected boolean offerChannel(Channel channel) {
387         return deque.offer(channel);
388     }
389 
390     @Override
391     public void close() {
392         for (;;) {
393             Channel channel = pollChannel();
394             if (channel == null) {
395                 break;
396             }
397             channel.close();
398         }
399     }
400 }