View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    * http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  
17  package io.netty.handler.codec.sctp;
18  
19  import io.netty.buffer.ByteBuf;
20  import io.netty.buffer.Unpooled;
21  import io.netty.channel.ChannelHandler;
22  import io.netty.channel.ChannelHandlerContext;
23  import io.netty.channel.sctp.SctpMessage;
24  import io.netty.handler.codec.MessageToMessageDecoder;
25  
26  import java.util.HashMap;
27  import java.util.List;
28  import java.util.Map;
29  
30  /**
31   * {@link MessageToMessageDecoder} which will take care of handle fragmented {@link SctpMessage}s, so
32   * only <strong>complete</strong> {@link SctpMessage}s will be forwarded to the next
33   * {@link ChannelHandler}.
34   */
35  public class SctpMessageCompletionHandler extends MessageToMessageDecoder<SctpMessage> {
36      private final Map<Integer, ByteBuf> fragments = new HashMap<Integer, ByteBuf>();
37  
38      @Override
39      protected void decode(ChannelHandlerContext ctx, SctpMessage msg, List<Object> out) throws Exception {
40          final ByteBuf byteBuf = msg.content();
41          final int protocolIdentifier = msg.protocolIdentifier();
42          final int streamIdentifier = msg.streamIdentifier();
43          final boolean isComplete = msg.isComplete();
44  
45          ByteBuf frag;
46          if (fragments.containsKey(streamIdentifier)) {
47              frag = fragments.remove(streamIdentifier);
48          } else {
49              frag = Unpooled.EMPTY_BUFFER;
50          }
51  
52          if (isComplete && !frag.isReadable()) {
53              //data chunk is not fragmented
54              out.add(msg);
55          } else if (!isComplete && frag.isReadable()) {
56              //more message to complete
57              fragments.put(streamIdentifier, Unpooled.wrappedBuffer(frag, byteBuf));
58          } else if (isComplete && frag.isReadable()) {
59              //last message to complete
60              fragments.remove(streamIdentifier);
61              SctpMessage assembledMsg = new SctpMessage(
62                      protocolIdentifier,
63                      streamIdentifier,
64                      Unpooled.wrappedBuffer(frag, byteBuf));
65              out.add(assembledMsg);
66          } else {
67              //first incomplete message
68              fragments.put(streamIdentifier, byteBuf);
69          }
70          byteBuf.retain();
71      }
72  }