1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.codec.http.websocketx;
17
18 import io.netty.channel.ChannelFuture;
19 import io.netty.channel.ChannelHandlerContext;
20 import io.netty.channel.ChannelInboundHandlerAdapter;
21 import io.netty.channel.ChannelPromise;
22 import io.netty.handler.codec.http.FullHttpResponse;
23 import io.netty.handler.codec.http.websocketx.WebSocketClientProtocolHandler.ClientHandshakeStateEvent;
24 import io.netty.util.concurrent.Future;
25
26 import java.util.concurrent.TimeUnit;
27
28 import static io.netty.util.internal.ObjectUtil.*;
29
30 class WebSocketClientProtocolHandshakeHandler extends ChannelInboundHandlerAdapter {
31 private static final long DEFAULT_HANDSHAKE_TIMEOUT_MS = 10000L;
32
33 private final WebSocketClientHandshaker handshaker;
34 private final long handshakeTimeoutMillis;
35 private ChannelHandlerContext ctx;
36 private ChannelPromise handshakePromise;
37
38 WebSocketClientProtocolHandshakeHandler(WebSocketClientHandshaker handshaker) {
39 this(handshaker, DEFAULT_HANDSHAKE_TIMEOUT_MS);
40 }
41
42 WebSocketClientProtocolHandshakeHandler(WebSocketClientHandshaker handshaker, long handshakeTimeoutMillis) {
43 this.handshaker = handshaker;
44 this.handshakeTimeoutMillis = checkPositive(handshakeTimeoutMillis, "handshakeTimeoutMillis");
45 }
46
47 @Override
48 public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
49 this.ctx = ctx;
50 handshakePromise = ctx.newPromise();
51 }
52
53 @Override
54 public void channelActive(final ChannelHandlerContext ctx) throws Exception {
55 ctx.fireChannelActive();
56 handshaker.handshake(ctx.channel()).addListener(future -> {
57 if (!future.isSuccess()) {
58 handshakePromise.tryFailure(future.cause());
59 ctx.fireExceptionCaught(future.cause());
60 } else {
61 ctx.fireUserEventTriggered(
62 ClientHandshakeStateEvent.HANDSHAKE_ISSUED);
63 }
64 });
65 applyHandshakeTimeout();
66 }
67
68 @Override
69 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
70 if (!handshakePromise.isDone()) {
71 handshakePromise.tryFailure(new WebSocketClientHandshakeException("channel closed with handshake " +
72 "in progress"));
73 }
74
75 super.channelInactive(ctx);
76 }
77
78 @Override
79 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
80 if (!(msg instanceof FullHttpResponse)) {
81 ctx.fireChannelRead(msg);
82 return;
83 }
84
85 FullHttpResponse response = (FullHttpResponse) msg;
86 try {
87 if (!handshaker.isHandshakeComplete()) {
88 handshaker.finishHandshake(ctx.channel(), response);
89 handshakePromise.trySuccess();
90 ctx.fireUserEventTriggered(
91 WebSocketClientProtocolHandler.ClientHandshakeStateEvent.HANDSHAKE_COMPLETE);
92 ctx.pipeline().remove(this);
93 return;
94 }
95 throw new IllegalStateException("WebSocketClientHandshaker should have been non finished yet");
96 } finally {
97 response.release();
98 }
99 }
100
101 private void applyHandshakeTimeout() {
102 final ChannelPromise localHandshakePromise = handshakePromise;
103 if (handshakeTimeoutMillis <= 0 || localHandshakePromise.isDone()) {
104 return;
105 }
106
107 final Future<?> timeoutFuture = ctx.executor().schedule(new Runnable() {
108 @Override
109 public void run() {
110 if (localHandshakePromise.isDone()) {
111 return;
112 }
113
114 if (localHandshakePromise.tryFailure(new WebSocketClientHandshakeException("handshake timed out"))) {
115 ctx.flush()
116 .fireUserEventTriggered(ClientHandshakeStateEvent.HANDSHAKE_TIMEOUT)
117 .close();
118 }
119 }
120 }, handshakeTimeoutMillis, TimeUnit.MILLISECONDS);
121
122
123 localHandshakePromise.addListener(f -> timeoutFuture.cancel(false));
124 }
125
126
127
128
129
130
131 ChannelFuture getHandshakeFuture() {
132 return handshakePromise;
133 }
134 }