1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty5.handler.codec.http.websocketx;
17
18 import io.netty5.buffer.api.BufferAllocator;
19 import io.netty5.buffer.api.CompositeBuffer;
20 import io.netty5.channel.ChannelPipeline;
21 import io.netty5.handler.codec.MessageAggregator;
22 import io.netty5.handler.codec.TooLongFrameException;
23
24
25
26
27
28
29
30 public class WebSocketFrameAggregator
31 extends MessageAggregator<WebSocketFrame, WebSocketFrame, ContinuationWebSocketFrame, WebSocketFrame> {
32
33
34
35
36
37
38
39 public WebSocketFrameAggregator(int maxContentLength) {
40 super(maxContentLength);
41 }
42
43 @Override
44 protected WebSocketFrame tryStartMessage(Object msg) {
45 return isStartMessage(msg) ? (WebSocketFrame) msg : null;
46 }
47
48 @Override
49 protected ContinuationWebSocketFrame tryContentMessage(Object msg) {
50 return isContentMessage(msg) ? (ContinuationWebSocketFrame) msg : null;
51 }
52
53 @Override
54 protected boolean isLastContentMessage(ContinuationWebSocketFrame msg) {
55 return isContentMessage(msg) && msg.isFinalFragment();
56 }
57
58 @Override
59 protected boolean isAggregated(Object msg) throws Exception {
60 if (!(msg instanceof WebSocketFrame)) {
61 return false;
62 }
63 WebSocketFrame frame = (WebSocketFrame) msg;
64 if (frame.isFinalFragment()) {
65 return !isContentMessage(msg);
66 }
67
68 return !isStartMessage(msg) && !isContentMessage(msg);
69 }
70
71 @Override
72 protected boolean isContentLengthInvalid(WebSocketFrame start, int maxContentLength) {
73 return false;
74 }
75
76 @Override
77 protected Object newContinueResponse(WebSocketFrame start, int maxContentLength, ChannelPipeline pipeline) {
78 return null;
79 }
80
81 @Override
82 protected boolean closeAfterContinueResponse(Object msg) throws Exception {
83 throw new UnsupportedOperationException();
84 }
85
86 @Override
87 protected boolean ignoreContentAfterContinueResponse(Object msg) throws Exception {
88 throw new UnsupportedOperationException();
89 }
90
91 @Override
92 protected int lengthForContent(ContinuationWebSocketFrame msg) {
93 return msg.binaryData().readableBytes();
94 }
95
96 @Override
97 protected int lengthForAggregation(WebSocketFrame msg) {
98 return msg.binaryData().readableBytes();
99 }
100
101 @Override
102 protected WebSocketFrame beginAggregation(BufferAllocator allocator, WebSocketFrame start) {
103 if (start instanceof TextWebSocketFrame) {
104 final CompositeBuffer content = allocator.compose(start.binaryData().send());
105 return new TextWebSocketFrame(true, start.rsv(), content);
106 }
107
108 if (start instanceof BinaryWebSocketFrame) {
109 final CompositeBuffer content = allocator.compose(start.binaryData().send());
110 return new BinaryWebSocketFrame(true, start.rsv(), content);
111 }
112
113
114 throw new Error();
115 }
116
117 @Override
118 protected void aggregate(BufferAllocator allocator, WebSocketFrame aggregated, ContinuationWebSocketFrame content)
119 throws Exception {
120 final CompositeBuffer payload = (CompositeBuffer) aggregated.binaryData();
121 payload.extendWith(content.binaryData().send());
122 }
123
124 private static boolean isStartMessage(Object msg) {
125 return msg instanceof TextWebSocketFrame || msg instanceof BinaryWebSocketFrame;
126 }
127
128 private static boolean isContentMessage(Object msg) {
129 return msg instanceof ContinuationWebSocketFrame;
130 }
131 }