1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.pool;
17
18 import io.netty.bootstrap.Bootstrap;
19 import io.netty.channel.Channel;
20 import io.netty.channel.ChannelFuture;
21 import io.netty.channel.ChannelFutureListener;
22 import io.netty.channel.ChannelInitializer;
23 import io.netty.channel.EventLoop;
24 import io.netty.util.AttributeKey;
25 import io.netty.util.concurrent.Future;
26 import io.netty.util.concurrent.FutureListener;
27 import io.netty.util.concurrent.Promise;
28 import io.netty.util.internal.PlatformDependent;
29 import io.netty.util.internal.ThrowableUtil;
30
31 import java.util.Deque;
32
33 import static io.netty.util.internal.ObjectUtil.*;
34
35
36
37
38
39
40
41
42 public class SimpleChannelPool implements ChannelPool {
43 private static final AttributeKey<SimpleChannelPool> POOL_KEY = AttributeKey.newInstance("channelPool");
44 private static final IllegalStateException FULL_EXCEPTION = ThrowableUtil.unknownStackTrace(
45 new IllegalStateException("ChannelPool full"), SimpleChannelPool.class, "releaseAndOffer(...)");
46
47 private final Deque<Channel> deque = PlatformDependent.newConcurrentDeque();
48 private final ChannelPoolHandler handler;
49 private final ChannelHealthChecker healthCheck;
50 private final Bootstrap bootstrap;
51 private final boolean releaseHealthCheck;
52 private final boolean lastRecentUsed;
53
54
55
56
57
58
59
60 public SimpleChannelPool(Bootstrap bootstrap, final ChannelPoolHandler handler) {
61 this(bootstrap, handler, ChannelHealthChecker.ACTIVE);
62 }
63
64
65
66
67
68
69
70
71
72 public SimpleChannelPool(Bootstrap bootstrap, final ChannelPoolHandler handler, ChannelHealthChecker healthCheck) {
73 this(bootstrap, handler, healthCheck, true);
74 }
75
76
77
78
79
80
81
82
83
84
85
86 public SimpleChannelPool(Bootstrap bootstrap, final ChannelPoolHandler handler, ChannelHealthChecker healthCheck,
87 boolean releaseHealthCheck) {
88 this(bootstrap, handler, healthCheck, releaseHealthCheck, true);
89 }
90
91
92
93
94
95
96
97
98
99
100
101
102 public SimpleChannelPool(Bootstrap bootstrap, final ChannelPoolHandler handler, ChannelHealthChecker healthCheck,
103 boolean releaseHealthCheck, boolean lastRecentUsed) {
104 this.handler = checkNotNull(handler, "handler");
105 this.healthCheck = checkNotNull(healthCheck, "healthCheck");
106 this.releaseHealthCheck = releaseHealthCheck;
107
108 this.bootstrap = checkNotNull(bootstrap, "bootstrap").clone();
109 this.bootstrap.handler(new ChannelInitializer<Channel>() {
110 @Override
111 protected void initChannel(Channel ch) throws Exception {
112 assert ch.eventLoop().inEventLoop();
113 handler.channelCreated(ch);
114 }
115 });
116 this.lastRecentUsed = lastRecentUsed;
117 }
118
119
120
121
122
123
124 protected Bootstrap bootstrap() {
125 return bootstrap;
126 }
127
128
129
130
131
132
133 protected ChannelPoolHandler handler() {
134 return handler;
135 }
136
137
138
139
140
141
142 protected ChannelHealthChecker healthChecker() {
143 return healthCheck;
144 }
145
146
147
148
149
150
151
152 protected boolean releaseHealthCheck() {
153 return releaseHealthCheck;
154 }
155
156 @Override
157 public final Future<Channel> acquire() {
158 return acquire(bootstrap.group().next().<Channel>newPromise());
159 }
160
161 @Override
162 public Future<Channel> acquire(final Promise<Channel> promise) {
163 checkNotNull(promise, "promise");
164 return acquireHealthyFromPoolOrNew(promise);
165 }
166
167
168
169
170
171
172 private Future<Channel> acquireHealthyFromPoolOrNew(final Promise<Channel> promise) {
173 try {
174 final Channel ch = pollChannel();
175 if (ch == null) {
176
177 Bootstrap bs = bootstrap.clone();
178 bs.attr(POOL_KEY, this);
179 ChannelFuture f = connectChannel(bs);
180 if (f.isDone()) {
181 notifyConnect(f, promise);
182 } else {
183 f.addListener(new ChannelFutureListener() {
184 @Override
185 public void operationComplete(ChannelFuture future) throws Exception {
186 notifyConnect(future, promise);
187 }
188 });
189 }
190 return promise;
191 }
192 EventLoop loop = ch.eventLoop();
193 if (loop.inEventLoop()) {
194 doHealthCheck(ch, promise);
195 } else {
196 loop.execute(new Runnable() {
197 @Override
198 public void run() {
199 doHealthCheck(ch, promise);
200 }
201 });
202 }
203 } catch (Throwable cause) {
204 promise.tryFailure(cause);
205 }
206 return promise;
207 }
208
209 private void notifyConnect(ChannelFuture future, Promise<Channel> promise) {
210 if (future.isSuccess()) {
211 Channel channel = future.channel();
212 if (!promise.trySuccess(channel)) {
213
214 release(channel);
215 }
216 } else {
217 promise.tryFailure(future.cause());
218 }
219 }
220
221 private void doHealthCheck(final Channel ch, final Promise<Channel> promise) {
222 assert ch.eventLoop().inEventLoop();
223
224 Future<Boolean> f = healthCheck.isHealthy(ch);
225 if (f.isDone()) {
226 notifyHealthCheck(f, ch, promise);
227 } else {
228 f.addListener(new FutureListener<Boolean>() {
229 @Override
230 public void operationComplete(Future<Boolean> future) throws Exception {
231 notifyHealthCheck(future, ch, promise);
232 }
233 });
234 }
235 }
236
237 private void notifyHealthCheck(Future<Boolean> future, Channel ch, Promise<Channel> promise) {
238 assert ch.eventLoop().inEventLoop();
239
240 if (future.isSuccess()) {
241 if (future.getNow() == Boolean.TRUE) {
242 try {
243 ch.attr(POOL_KEY).set(this);
244 handler.channelAcquired(ch);
245 promise.setSuccess(ch);
246 } catch (Throwable cause) {
247 closeAndFail(ch, cause, promise);
248 }
249 } else {
250 closeChannel(ch);
251 acquireHealthyFromPoolOrNew(promise);
252 }
253 } else {
254 closeChannel(ch);
255 acquireHealthyFromPoolOrNew(promise);
256 }
257 }
258
259
260
261
262
263
264
265 protected ChannelFuture connectChannel(Bootstrap bs) {
266 return bs.connect();
267 }
268
269 @Override
270 public final Future<Void> release(Channel channel) {
271 return release(channel, channel.eventLoop().<Void>newPromise());
272 }
273
274 @Override
275 public Future<Void> release(final Channel channel, final Promise<Void> promise) {
276 checkNotNull(channel, "channel");
277 checkNotNull(promise, "promise");
278 try {
279 EventLoop loop = channel.eventLoop();
280 if (loop.inEventLoop()) {
281 doReleaseChannel(channel, promise);
282 } else {
283 loop.execute(new Runnable() {
284 @Override
285 public void run() {
286 doReleaseChannel(channel, promise);
287 }
288 });
289 }
290 } catch (Throwable cause) {
291 closeAndFail(channel, cause, promise);
292 }
293 return promise;
294 }
295
296 private void doReleaseChannel(Channel channel, Promise<Void> promise) {
297 assert channel.eventLoop().inEventLoop();
298
299 if (channel.attr(POOL_KEY).getAndSet(null) != this) {
300 closeAndFail(channel,
301
302 new IllegalArgumentException(
303 "Channel " + channel + " was not acquired from this ChannelPool"),
304 promise);
305 } else {
306 try {
307 if (releaseHealthCheck) {
308 doHealthCheckOnRelease(channel, promise);
309 } else {
310 releaseAndOffer(channel, promise);
311 }
312 } catch (Throwable cause) {
313 closeAndFail(channel, cause, promise);
314 }
315 }
316 }
317
318 private void doHealthCheckOnRelease(final Channel channel, final Promise<Void> promise) throws Exception {
319 final Future<Boolean> f = healthCheck.isHealthy(channel);
320 if (f.isDone()) {
321 releaseAndOfferIfHealthy(channel, promise, f);
322 } else {
323 f.addListener(new FutureListener<Boolean>() {
324 @Override
325 public void operationComplete(Future<Boolean> future) throws Exception {
326 releaseAndOfferIfHealthy(channel, promise, f);
327 }
328 });
329 }
330 }
331
332
333
334
335
336
337
338
339 private void releaseAndOfferIfHealthy(Channel channel, Promise<Void> promise, Future<Boolean> future)
340 throws Exception {
341 if (future.getNow()) {
342 releaseAndOffer(channel, promise);
343 } else {
344 handler.channelReleased(channel);
345 promise.setSuccess(null);
346 }
347 }
348
349 private void releaseAndOffer(Channel channel, Promise<Void> promise) throws Exception {
350 if (offerChannel(channel)) {
351 handler.channelReleased(channel);
352 promise.setSuccess(null);
353 } else {
354 closeAndFail(channel, FULL_EXCEPTION, promise);
355 }
356 }
357
358 private static void closeChannel(Channel channel) {
359 channel.attr(POOL_KEY).getAndSet(null);
360 channel.close();
361 }
362
363 private static void closeAndFail(Channel channel, Throwable cause, Promise<?> promise) {
364 closeChannel(channel);
365 promise.tryFailure(cause);
366 }
367
368
369
370
371
372
373
374
375 protected Channel pollChannel() {
376 return lastRecentUsed ? deque.pollLast() : deque.pollFirst();
377 }
378
379
380
381
382
383
384
385
386 protected boolean offerChannel(Channel channel) {
387 return deque.offer(channel);
388 }
389
390 @Override
391 public void close() {
392 for (;;) {
393 Channel channel = pollChannel();
394 if (channel == null) {
395 break;
396 }
397 channel.close();
398 }
399 }
400 }