1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty5.handler.codec.http.websocketx;
17
18 import io.netty5.channel.ChannelHandler;
19 import io.netty5.channel.ChannelHandlerContext;
20 import io.netty5.handler.codec.http.FullHttpResponse;
21 import io.netty5.util.concurrent.Future;
22 import io.netty5.util.concurrent.Promise;
23
24 import java.util.concurrent.TimeUnit;
25
26 import static io.netty5.util.internal.ObjectUtil.checkPositive;
27
28 class WebSocketClientProtocolHandshakeHandler implements ChannelHandler {
29
30 private static final long DEFAULT_HANDSHAKE_TIMEOUT_MS = 10000L;
31
32 private final WebSocketClientHandshaker handshaker;
33 private final long handshakeTimeoutMillis;
34 private ChannelHandlerContext ctx;
35 private Promise<Void> handshakePromise;
36
37 WebSocketClientProtocolHandshakeHandler(WebSocketClientHandshaker handshaker) {
38 this(handshaker, DEFAULT_HANDSHAKE_TIMEOUT_MS);
39 }
40
41 WebSocketClientProtocolHandshakeHandler(WebSocketClientHandshaker handshaker, long handshakeTimeoutMillis) {
42 this.handshaker = handshaker;
43 this.handshakeTimeoutMillis = checkPositive(handshakeTimeoutMillis, "handshakeTimeoutMillis");
44 }
45
46 @Override
47 public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
48 this.ctx = ctx;
49 handshakePromise = ctx.newPromise();
50 }
51
52 @Override
53 public void channelActive(final ChannelHandlerContext ctx) throws Exception {
54 ctx.fireChannelActive();
55 handshaker.handshake(ctx.channel()).addListener(future -> {
56 if (future.isFailed()) {
57 handshakePromise.tryFailure(future.cause());
58 ctx.fireChannelExceptionCaught(future.cause());
59 }
60 });
61 applyHandshakeTimeout();
62 }
63
64 @Override
65 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
66 if (!handshakePromise.isDone()) {
67 handshakePromise.tryFailure(new WebSocketClientHandshakeException("channel closed with handshake " +
68 "in progress"));
69 }
70
71 ctx.fireChannelInactive();
72 }
73
74 @Override
75 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
76 if (!(msg instanceof FullHttpResponse)) {
77 ctx.fireChannelRead(msg);
78 return;
79 }
80
81 try (FullHttpResponse response = (FullHttpResponse) msg) {
82 if (!handshaker.isHandshakeComplete()) {
83 handshaker.finishHandshake(ctx.channel(), response);
84 handshakePromise.trySuccess(null);
85 ctx.fireChannelInboundEvent(new WebSocketClientHandshakeCompletionEvent(handshaker.version()));
86 ctx.pipeline().remove(this);
87 return;
88 }
89 throw new IllegalStateException("WebSocketClientHandshaker should have been non finished yet");
90 }
91 }
92
93 private void applyHandshakeTimeout() {
94 final Promise<Void> localHandshakePromise = handshakePromise;
95 if (handshakeTimeoutMillis <= 0 || localHandshakePromise.isDone()) {
96 return;
97 }
98
99 final Future<?> timeoutFuture = ctx.executor().schedule(() -> {
100 if (localHandshakePromise.isDone()) {
101 return;
102 }
103
104 WebSocketHandshakeException exception = new WebSocketHandshakeTimeoutException(
105 "handshake timed out after " + handshakeTimeoutMillis + "ms");
106 if (localHandshakePromise.tryFailure(exception)) {
107 ctx.flush()
108 .fireChannelInboundEvent(new WebSocketClientHandshakeCompletionEvent(exception))
109 .close();
110 }
111 }, handshakeTimeoutMillis, TimeUnit.MILLISECONDS);
112
113
114 localHandshakePromise.asFuture().addListener(f -> timeoutFuture.cancel());
115 }
116
117
118
119
120
121
122 Future<Void> getHandshakeFuture() {
123 return handshakePromise.asFuture();
124 }
125 }