1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.codec.http.websocketx;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.CompositeByteBuf;
20 import io.netty.channel.ChannelHandlerContext;
21 import io.netty.handler.codec.MessageToMessageDecoder;
22 import io.netty.handler.codec.TooLongFrameException;
23
24 import java.util.List;
25
26
27
28
29
30
31
32 public class WebSocketFrameAggregator extends MessageToMessageDecoder<WebSocketFrame> {
33 private final int maxFrameSize;
34 private WebSocketFrame currentFrame;
35 private boolean tooLongFrameFound;
36
37
38
39
40
41
42
43 public WebSocketFrameAggregator(int maxFrameSize) {
44 if (maxFrameSize < 1) {
45 throw new IllegalArgumentException("maxFrameSize must be > 0");
46 }
47 this.maxFrameSize = maxFrameSize;
48 }
49
50 @Override
51 protected void decode(ChannelHandlerContext ctx, WebSocketFrame msg, List<Object> out) throws Exception {
52 if (currentFrame == null) {
53 tooLongFrameFound = false;
54 if (msg.isFinalFragment()) {
55 out.add(msg.retain());
56 return;
57 }
58 ByteBuf buf = ctx.alloc().compositeBuffer().addComponent(msg.content().retain());
59 buf.writerIndex(buf.writerIndex() + msg.content().readableBytes());
60
61 if (msg instanceof TextWebSocketFrame) {
62 currentFrame = new TextWebSocketFrame(true, msg.rsv(), buf);
63 } else if (msg instanceof BinaryWebSocketFrame) {
64 currentFrame = new BinaryWebSocketFrame(true, msg.rsv(), buf);
65 } else {
66 buf.release();
67 throw new IllegalStateException(
68 "WebSocket frame was not of type TextWebSocketFrame or BinaryWebSocketFrame");
69 }
70 return;
71 }
72 if (msg instanceof ContinuationWebSocketFrame) {
73 if (tooLongFrameFound) {
74 if (msg.isFinalFragment()) {
75 currentFrame = null;
76 }
77 return;
78 }
79 CompositeByteBuf content = (CompositeByteBuf) currentFrame.content();
80 if (content.readableBytes() > maxFrameSize - msg.content().readableBytes()) {
81
82 currentFrame.release();
83 tooLongFrameFound = true;
84 throw new TooLongFrameException(
85 "WebSocketFrame length exceeded " + content +
86 " bytes.");
87 }
88 content.addComponent(msg.content().retain());
89 content.writerIndex(content.writerIndex() + msg.content().readableBytes());
90
91 if (msg.isFinalFragment()) {
92 WebSocketFrame currentFrame = this.currentFrame;
93 this.currentFrame = null;
94 out.add(currentFrame);
95 return;
96 } else {
97 return;
98 }
99 }
100
101
102 out.add(msg.retain());
103 }
104
105 @Override
106 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
107 super.channelInactive(ctx);
108
109 if (currentFrame != null) {
110 currentFrame.release();
111 currentFrame = null;
112 }
113 }
114
115 @Override
116 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
117 super.handlerRemoved(ctx);
118
119
120 if (currentFrame != null) {
121 currentFrame.release();
122 currentFrame = null;
123 }
124 }
125 }