1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package io.netty.handler.codec.sctp;
18
19 import io.netty.buffer.ByteBuf;
20 import io.netty.buffer.Unpooled;
21 import io.netty.channel.ChannelHandlerContext;
22 import io.netty.channel.ChannelInboundHandler;
23 import io.netty.channel.sctp.SctpMessage;
24 import io.netty.handler.codec.MessageToMessageDecoder;
25 import io.netty.util.collection.IntObjectHashMap;
26 import io.netty.util.collection.IntObjectMap;
27
28 import java.util.List;
29
30
31
32
33
34
35 public class SctpMessageCompletionHandler extends MessageToMessageDecoder<SctpMessage> {
36 private final IntObjectMap<ByteBuf> fragments = new IntObjectHashMap<ByteBuf>();
37
38 public SctpMessageCompletionHandler() {
39 super(SctpMessage.class);
40 }
41
42 @Override
43 protected void decode(ChannelHandlerContext ctx, SctpMessage msg, List<Object> out) throws Exception {
44 final ByteBuf byteBuf = msg.content();
45 final int protocolIdentifier = msg.protocolIdentifier();
46 final int streamIdentifier = msg.streamIdentifier();
47 final boolean isComplete = msg.isComplete();
48 final boolean isUnordered = msg.isUnordered();
49
50 ByteBuf frag = fragments.remove(streamIdentifier);
51 if (frag == null) {
52 frag = Unpooled.EMPTY_BUFFER;
53 }
54
55 if (isComplete && !frag.isReadable()) {
56
57 out.add(msg);
58 } else if (!isComplete && frag.isReadable()) {
59
60 fragments.put(streamIdentifier, Unpooled.wrappedBuffer(frag, byteBuf));
61 } else if (isComplete && frag.isReadable()) {
62
63 SctpMessage assembledMsg = new SctpMessage(
64 protocolIdentifier,
65 streamIdentifier,
66 isUnordered,
67 Unpooled.wrappedBuffer(frag, byteBuf));
68 out.add(assembledMsg);
69 } else {
70
71 fragments.put(streamIdentifier, byteBuf);
72 }
73 byteBuf.retain();
74 }
75
76 @Override
77 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
78 for (ByteBuf buffer: fragments.values()) {
79 buffer.release();
80 }
81 fragments.clear();
82 super.handlerRemoved(ctx);
83 }
84 }