1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty5.handler.codec.http.websocketx;
17
18
19 import io.netty5.channel.ChannelHandlerContext;
20 import io.netty5.channel.ChannelOption;
21 import io.netty5.handler.codec.MessageToMessageDecoder;
22 import io.netty5.util.Resource;
23 import io.netty5.util.concurrent.Future;
24 import io.netty5.util.concurrent.Promise;
25
26 import java.nio.channels.ClosedChannelException;
27 import java.util.concurrent.TimeUnit;
28
29 abstract class WebSocketProtocolHandler extends MessageToMessageDecoder<WebSocketFrame> {
30
31 private final boolean dropPongFrames;
32 private final WebSocketCloseStatus closeStatus;
33 private final long forceCloseTimeoutMillis;
34 private Promise<Void> closeSent;
35
36
37
38
39 WebSocketProtocolHandler() {
40 this(true);
41 }
42
43
44
45
46
47
48
49
50 WebSocketProtocolHandler(boolean dropPongFrames) {
51 this(dropPongFrames, null, 0L);
52 }
53
54 WebSocketProtocolHandler(boolean dropPongFrames,
55 WebSocketCloseStatus closeStatus,
56 long forceCloseTimeoutMillis) {
57 this.dropPongFrames = dropPongFrames;
58 this.closeStatus = closeStatus;
59 this.forceCloseTimeoutMillis = forceCloseTimeoutMillis;
60 }
61
62 @Override
63 protected void decode(ChannelHandlerContext ctx, WebSocketFrame msg) throws Exception {
64 throw new UnsupportedOperationException("WebSocketProtocolHandler use decodeAndClose().");
65 }
66
67 @Override
68 protected void decodeAndClose(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception {
69 if (frame instanceof PingWebSocketFrame) {
70
71
72 try (frame) {
73 ctx.writeAndFlush(new PongWebSocketFrame(frame.binaryData().send()));
74 }
75 readIfNeeded(ctx);
76 return;
77 }
78 if (frame instanceof PongWebSocketFrame && dropPongFrames) {
79 readIfNeeded(ctx);
80 return;
81 }
82
83 ctx.fireChannelRead(frame);
84 }
85
86 private static void readIfNeeded(ChannelHandlerContext ctx) {
87 if (!ctx.channel().getOption(ChannelOption.AUTO_READ)) {
88 ctx.read();
89 }
90 }
91
92 @Override
93 public Future<Void> close(final ChannelHandlerContext ctx) {
94 if (closeStatus == null || !ctx.channel().isActive()) {
95 return ctx.close();
96 }
97 final Future<Void> future = closeSent == null ?
98 write(ctx, new CloseWebSocketFrame(ctx.bufferAllocator(), closeStatus)) : closeSent.asFuture();
99
100 flush(ctx);
101 applyCloseSentTimeout(ctx);
102 Promise<Void> promise = ctx.newPromise();
103 future.addListener(f -> ctx.close().cascadeTo(promise));
104 return promise.asFuture();
105 }
106
107 @Override
108 public Future<Void> write(final ChannelHandlerContext ctx, Object msg) {
109 if (closeSent != null) {
110 Resource.dispose(msg);
111 return ctx.newFailedFuture(new ClosedChannelException());
112 }
113 if (msg instanceof CloseWebSocketFrame) {
114 Promise<Void> promise = ctx.newPromise();
115 closeSent(promise);
116 ctx.write(msg).cascadeTo(closeSent);
117 return promise.asFuture();
118 }
119 return ctx.write(msg);
120 }
121
122 void closeSent(Promise<Void> promise) {
123 closeSent = promise;
124 }
125
126 private void applyCloseSentTimeout(ChannelHandlerContext ctx) {
127 if (closeSent.isDone() || forceCloseTimeoutMillis < 0) {
128 return;
129 }
130
131 Future<?> timeoutTask = ctx.executor().schedule(() -> {
132 if (!closeSent.isDone()) {
133 closeSent.tryFailure(buildHandshakeException("send close frame timed out"));
134 }
135 }, forceCloseTimeoutMillis, TimeUnit.MILLISECONDS);
136
137 closeSent.asFuture().addListener(future -> timeoutTask.cancel());
138 }
139
140
141
142
143
144 protected WebSocketHandshakeException buildHandshakeException(String message) {
145 return new WebSocketHandshakeException(message);
146 }
147
148 @Override
149 public void channelExceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
150 ctx.fireChannelExceptionCaught(cause);
151 ctx.close();
152 }
153 }