View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    * https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  
17  package io.netty.handler.codec.sctp;
18  
19  import io.netty.buffer.ByteBuf;
20  import io.netty.buffer.CompositeByteBuf;
21  import io.netty.channel.ChannelHandlerContext;
22  import io.netty.channel.ChannelInboundHandler;
23  import io.netty.channel.sctp.SctpMessage;
24  import io.netty.handler.codec.CodecException;
25  import io.netty.handler.codec.MessageToMessageDecoder;
26  import io.netty.util.collection.IntObjectHashMap;
27  import io.netty.util.collection.IntObjectMap;
28  
29  import java.util.ArrayList;
30  import java.util.List;
31  
32  import static io.netty.util.internal.ObjectUtil.checkPositive;
33  
34  /**
35   * {@link MessageToMessageDecoder} which will take care of handle fragmented {@link SctpMessage}s, so
36   * only <strong>complete</strong> {@link SctpMessage}s will be forwarded to the next
37   * {@link ChannelInboundHandler}.
38   */
39  public class SctpMessageCompletionHandler extends MessageToMessageDecoder<SctpMessage> {
40      private final IntObjectMap<List<ByteBuf>> incompleteSctpMessages = new IntObjectHashMap<>();
41      private final int maxIncompleteSctpMessages;
42      private final int maxFragments;
43  
44      public SctpMessageCompletionHandler() {
45          this(128, 128);
46      }
47  
48      /**
49       * Create a new instance.
50       *
51       * @param maxIncompleteSctpMessages the maximum number of incomplete sctp message inflight.
52       * @param maxFragments              the maximum number of fragments per sctp message.
53       */
54      public SctpMessageCompletionHandler(int maxIncompleteSctpMessages, int maxFragments) {
55          super(SctpMessage.class);
56          this.maxIncompleteSctpMessages = checkPositive(maxIncompleteSctpMessages, "maxIncompleteSctpMessages");
57          this.maxFragments = checkPositive(maxFragments, "maxFragments");
58      }
59  
60      @Override
61      protected void decode(ChannelHandlerContext ctx, SctpMessage msg, List<Object> out) throws Exception {
62          final ByteBuf byteBuf = msg.content();
63          final int protocolIdentifier = msg.protocolIdentifier();
64          final int streamIdentifier = msg.streamIdentifier();
65          final boolean isComplete = msg.isComplete();
66          final boolean isUnordered = msg.isUnordered();
67  
68          List<ByteBuf> frag = incompleteSctpMessages.get(streamIdentifier);
69          if (frag == null) {
70              // No previous fragments.
71              if (isComplete) {
72                  out.add(msg.retain());
73              } else {
74                  if (maxIncompleteSctpMessages <= incompleteSctpMessages.size()) {
75                      throw new CodecException(
76                              "Too many incomplete sctp messages in flight: " + maxIncompleteSctpMessages);
77                  }
78                  //first incomplete message
79                  frag = new ArrayList<>();
80                  frag.add(byteBuf.retain());
81                  incompleteSctpMessages.put(streamIdentifier, frag);
82              }
83          } else {
84              if (maxFragments <= frag.size()) {
85                  throw new CodecException("Too many fragments for sctp message: " + maxFragments);
86              }
87              frag.add(byteBuf.retain());
88              if (isComplete) {
89                  // Is complete so remove it.
90                  incompleteSctpMessages.remove(streamIdentifier);
91                  CompositeByteBuf composite = ctx.alloc().compositeBuffer();
92  
93                  for (int i = 0; i < frag.size(); i++) {
94                      composite.addComponent(true, frag.get(i));
95                  }
96                  // last message to complete
97                  SctpMessage assembledMsg = new SctpMessage(
98                          protocolIdentifier,
99                          streamIdentifier,
100                         isUnordered,
101                         composite);
102                 out.add(assembledMsg);
103             }
104         }
105     }
106 
107     @Override
108     public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
109         for (List<ByteBuf> buffers: incompleteSctpMessages.values()) {
110             for (ByteBuf buffer: buffers) {
111                 buffer.release();
112             }
113         }
114         incompleteSctpMessages.clear();
115         super.handlerRemoved(ctx);
116     }
117 
118     @Override
119     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
120         super.exceptionCaught(ctx, cause);
121         ctx.close();
122     }
123 }