View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    * https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  
17  package io.netty.handler.codec.sctp;
18  
19  import io.netty.buffer.ByteBuf;
20  import io.netty.buffer.Unpooled;
21  import io.netty.channel.ChannelHandlerContext;
22  import io.netty.channel.ChannelInboundHandler;
23  import io.netty.channel.sctp.SctpMessage;
24  import io.netty.handler.codec.MessageToMessageDecoder;
25  import io.netty.util.collection.IntObjectHashMap;
26  import io.netty.util.collection.IntObjectMap;
27  
28  import java.util.List;
29  
30  /**
31   * {@link MessageToMessageDecoder} which will take care of handle fragmented {@link SctpMessage}s, so
32   * only <strong>complete</strong> {@link SctpMessage}s will be forwarded to the next
33   * {@link ChannelInboundHandler}.
34   */
35  public class SctpMessageCompletionHandler extends MessageToMessageDecoder<SctpMessage> {
36      private final IntObjectMap<ByteBuf> fragments = new IntObjectHashMap<ByteBuf>();
37  
38      public SctpMessageCompletionHandler() {
39          super(SctpMessage.class);
40      }
41  
42      @Override
43      protected void decode(ChannelHandlerContext ctx, SctpMessage msg, List<Object> out) throws Exception {
44          final ByteBuf byteBuf = msg.content();
45          final int protocolIdentifier = msg.protocolIdentifier();
46          final int streamIdentifier = msg.streamIdentifier();
47          final boolean isComplete = msg.isComplete();
48          final boolean isUnordered = msg.isUnordered();
49  
50          ByteBuf frag = fragments.remove(streamIdentifier);
51          if (frag == null) {
52              frag = Unpooled.EMPTY_BUFFER;
53          }
54  
55          if (isComplete && !frag.isReadable()) {
56              //data chunk is not fragmented
57              out.add(msg);
58          } else if (!isComplete && frag.isReadable()) {
59              //more message to complete
60              fragments.put(streamIdentifier, Unpooled.wrappedBuffer(frag, byteBuf));
61          } else if (isComplete && frag.isReadable()) {
62              //last message to complete
63              SctpMessage assembledMsg = new SctpMessage(
64                      protocolIdentifier,
65                      streamIdentifier,
66                      isUnordered,
67                      Unpooled.wrappedBuffer(frag, byteBuf));
68              out.add(assembledMsg);
69          } else {
70              //first incomplete message
71              fragments.put(streamIdentifier, byteBuf);
72          }
73          byteBuf.retain();
74      }
75  
76      @Override
77      public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
78          for (ByteBuf buffer: fragments.values()) {
79              buffer.release();
80          }
81          fragments.clear();
82          super.handlerRemoved(ctx);
83      }
84  }