View Javadoc
1   /*
2    * Copyright 2013 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty5.handler.codec.http.websocketx;
17  
18  import io.netty5.buffer.api.BufferAllocator;
19  import io.netty5.buffer.api.CompositeBuffer;
20  import io.netty5.channel.ChannelPipeline;
21  import io.netty5.handler.codec.MessageAggregator;
22  import io.netty5.handler.codec.TooLongFrameException;
23  
24  /**
25   * Handler that aggregate fragmented WebSocketFrame's.
26   *
27   * Be aware if PING/PONG/CLOSE frames are send in the middle of a fragmented {@link WebSocketFrame} they will
28   * just get forwarded to the next handler in the pipeline.
29   */
30  public class WebSocketFrameAggregator
31          extends MessageAggregator<WebSocketFrame, WebSocketFrame, ContinuationWebSocketFrame, WebSocketFrame> {
32  
33      /**
34       * Creates a new instance
35       *
36       * @param maxContentLength If the size of the aggregated frame exceeds this value,
37       *                         a {@link TooLongFrameException} is thrown.
38       */
39      public WebSocketFrameAggregator(int maxContentLength) {
40          super(maxContentLength);
41      }
42  
43      @Override
44      protected WebSocketFrame tryStartMessage(Object msg) {
45          return isStartMessage(msg) ? (WebSocketFrame) msg : null;
46      }
47  
48      @Override
49      protected ContinuationWebSocketFrame tryContentMessage(Object msg) {
50          return isContentMessage(msg) ? (ContinuationWebSocketFrame) msg : null;
51      }
52  
53      @Override
54      protected boolean isLastContentMessage(ContinuationWebSocketFrame msg) {
55          return isContentMessage(msg) && msg.isFinalFragment();
56      }
57  
58      @Override
59      protected boolean isAggregated(Object msg) throws Exception {
60          if (!(msg instanceof WebSocketFrame)) {
61              return false;
62          }
63          WebSocketFrame frame = (WebSocketFrame) msg;
64          if (frame.isFinalFragment()) {
65              return !isContentMessage(msg);
66          }
67  
68          return !isStartMessage(msg) && !isContentMessage(msg);
69      }
70  
71      @Override
72      protected boolean isContentLengthInvalid(WebSocketFrame start, int maxContentLength) {
73          return false;
74      }
75  
76      @Override
77      protected Object newContinueResponse(WebSocketFrame start, int maxContentLength, ChannelPipeline pipeline) {
78          return null;
79      }
80  
81      @Override
82      protected boolean closeAfterContinueResponse(Object msg) throws Exception {
83          throw new UnsupportedOperationException();
84      }
85  
86      @Override
87      protected boolean ignoreContentAfterContinueResponse(Object msg) throws Exception {
88          throw new UnsupportedOperationException();
89      }
90  
91      @Override
92      protected int lengthForContent(ContinuationWebSocketFrame msg) {
93          return msg.binaryData().readableBytes();
94      }
95  
96      @Override
97      protected int lengthForAggregation(WebSocketFrame msg) {
98          return msg.binaryData().readableBytes();
99      }
100 
101     @Override
102     protected WebSocketFrame beginAggregation(BufferAllocator allocator, WebSocketFrame start) {
103         if (start instanceof TextWebSocketFrame) {
104             final CompositeBuffer content = allocator.compose(start.binaryData().send());
105             return new TextWebSocketFrame(true, start.rsv(), content);
106         }
107 
108         if (start instanceof BinaryWebSocketFrame) {
109             final CompositeBuffer content = allocator.compose(start.binaryData().send());
110             return new BinaryWebSocketFrame(true, start.rsv(), content);
111         }
112 
113         // Should not reach here.
114         throw new Error();
115     }
116 
117     @Override
118     protected void aggregate(BufferAllocator allocator, WebSocketFrame aggregated, ContinuationWebSocketFrame content)
119             throws Exception {
120         final CompositeBuffer payload = (CompositeBuffer) aggregated.binaryData();
121         payload.extendWith(content.binaryData().send());
122     }
123 
124     private static boolean isStartMessage(Object msg) {
125         return msg instanceof TextWebSocketFrame || msg instanceof BinaryWebSocketFrame;
126     }
127 
128     private static boolean isContentMessage(Object msg) {
129         return msg instanceof ContinuationWebSocketFrame;
130     }
131 }