1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.codec.http.websocketx;
17
18
19 import io.netty.channel.ChannelFuture;
20 import io.netty.channel.ChannelFutureListener;
21 import io.netty.channel.ChannelHandlerContext;
22 import io.netty.channel.ChannelOutboundHandler;
23 import io.netty.channel.ChannelPromise;
24 import io.netty.handler.codec.MessageToMessageDecoder;
25 import io.netty.util.ReferenceCountUtil;
26 import io.netty.util.concurrent.Future;
27 import io.netty.util.concurrent.PromiseNotifier;
28
29 import java.net.SocketAddress;
30 import java.nio.channels.ClosedChannelException;
31 import java.util.List;
32 import java.util.concurrent.TimeUnit;
33
34 abstract class WebSocketProtocolHandler extends MessageToMessageDecoder<WebSocketFrame>
35 implements ChannelOutboundHandler {
36
37 private final boolean dropPongFrames;
38 private final WebSocketCloseStatus closeStatus;
39 private final long forceCloseTimeoutMillis;
40 private ChannelPromise closeSent;
41
42
43
44
45 WebSocketProtocolHandler() {
46 this(true);
47 }
48
49
50
51
52
53
54
55
56 WebSocketProtocolHandler(boolean dropPongFrames) {
57 this(dropPongFrames, null, 0L);
58 }
59
60 WebSocketProtocolHandler(boolean dropPongFrames,
61 WebSocketCloseStatus closeStatus,
62 long forceCloseTimeoutMillis) {
63 super(WebSocketFrame.class);
64 this.dropPongFrames = dropPongFrames;
65 this.closeStatus = closeStatus;
66 this.forceCloseTimeoutMillis = forceCloseTimeoutMillis;
67 }
68
69 @Override
70 protected void decode(ChannelHandlerContext ctx, WebSocketFrame frame, List<Object> out) throws Exception {
71 if (frame instanceof PingWebSocketFrame) {
72 frame.content().retain();
73 ctx.writeAndFlush(new PongWebSocketFrame(frame.content()));
74 readIfNeeded(ctx);
75 return;
76 }
77 if (frame instanceof PongWebSocketFrame && dropPongFrames) {
78 readIfNeeded(ctx);
79 return;
80 }
81
82 out.add(frame.retain());
83 }
84
85 private static void readIfNeeded(ChannelHandlerContext ctx) {
86 if (!ctx.channel().config().isAutoRead()) {
87 ctx.read();
88 }
89 }
90
91 @Override
92 public void close(final ChannelHandlerContext ctx, final ChannelPromise promise) throws Exception {
93 if (closeStatus == null || !ctx.channel().isActive()) {
94 ctx.close(promise);
95 } else {
96 if (closeSent == null) {
97 write(ctx, new CloseWebSocketFrame(closeStatus), ctx.newPromise());
98 }
99 flush(ctx);
100 applyCloseSentTimeout(ctx);
101 closeSent.addListener(new ChannelFutureListener() {
102 @Override
103 public void operationComplete(ChannelFuture future) {
104 ctx.close(promise);
105 }
106 });
107 }
108 }
109
110 @Override
111 public void write(final ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
112 if (closeSent != null) {
113 ReferenceCountUtil.release(msg);
114 promise.setFailure(new ClosedChannelException());
115 } else if (msg instanceof CloseWebSocketFrame) {
116 closeSent(promise.unvoid());
117 ctx.write(msg).addListener(new PromiseNotifier<Void, ChannelFuture>(false, closeSent));
118 } else {
119 ctx.write(msg, promise);
120 }
121 }
122
123 void closeSent(ChannelPromise promise) {
124 closeSent = promise;
125 }
126
127 private void applyCloseSentTimeout(ChannelHandlerContext ctx) {
128 if (closeSent.isDone() || forceCloseTimeoutMillis < 0) {
129 return;
130 }
131
132 final Future<?> timeoutTask = ctx.executor().schedule(new Runnable() {
133 @Override
134 public void run() {
135 if (!closeSent.isDone()) {
136 closeSent.tryFailure(buildHandshakeException("send close frame timed out"));
137 }
138 }
139 }, forceCloseTimeoutMillis, TimeUnit.MILLISECONDS);
140
141 closeSent.addListener(new ChannelFutureListener() {
142 @Override
143 public void operationComplete(ChannelFuture future) {
144 timeoutTask.cancel(false);
145 }
146 });
147 }
148
149
150
151
152
153 protected WebSocketHandshakeException buildHandshakeException(String message) {
154 return new WebSocketHandshakeException(message);
155 }
156
157 @Override
158 public void bind(ChannelHandlerContext ctx, SocketAddress localAddress,
159 ChannelPromise promise) throws Exception {
160 ctx.bind(localAddress, promise);
161 }
162
163 @Override
164 public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
165 SocketAddress localAddress, ChannelPromise promise) throws Exception {
166 ctx.connect(remoteAddress, localAddress, promise);
167 }
168
169 @Override
170 public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise)
171 throws Exception {
172 ctx.disconnect(promise);
173 }
174
175 @Override
176 public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
177 ctx.deregister(promise);
178 }
179
180 @Override
181 public void read(ChannelHandlerContext ctx) throws Exception {
182 ctx.read();
183 }
184
185 @Override
186 public void flush(ChannelHandlerContext ctx) throws Exception {
187 ctx.flush();
188 }
189
190 @Override
191 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
192 ctx.fireExceptionCaught(cause);
193 ctx.close();
194 }
195 }