1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package io.netty.handler.codec.sctp;
18
19 import io.netty.buffer.ByteBuf;
20 import io.netty.buffer.CompositeByteBuf;
21 import io.netty.channel.ChannelHandlerContext;
22 import io.netty.channel.ChannelInboundHandler;
23 import io.netty.channel.sctp.SctpMessage;
24 import io.netty.handler.codec.CodecException;
25 import io.netty.handler.codec.MessageToMessageDecoder;
26 import io.netty.util.collection.IntObjectHashMap;
27 import io.netty.util.collection.IntObjectMap;
28
29 import java.util.ArrayList;
30 import java.util.List;
31
32 import static io.netty.util.internal.ObjectUtil.checkPositive;
33
34
35
36
37
38
39 public class SctpMessageCompletionHandler extends MessageToMessageDecoder<SctpMessage> {
40 private final IntObjectMap<List<ByteBuf>> incompleteSctpMessages = new IntObjectHashMap<>();
41 private final int maxIncompleteSctpMessages;
42 private final int maxFragments;
43
44 public SctpMessageCompletionHandler() {
45 this(128, 128);
46 }
47
48
49
50
51
52
53
54 public SctpMessageCompletionHandler(int maxIncompleteSctpMessages, int maxFragments) {
55 super(SctpMessage.class);
56 this.maxIncompleteSctpMessages = checkPositive(maxIncompleteSctpMessages, "maxIncompleteSctpMessages");
57 this.maxFragments = checkPositive(maxFragments, "maxFragments");
58 }
59
60 @Override
61 protected void decode(ChannelHandlerContext ctx, SctpMessage msg, List<Object> out) throws Exception {
62 final ByteBuf byteBuf = msg.content();
63 final int protocolIdentifier = msg.protocolIdentifier();
64 final int streamIdentifier = msg.streamIdentifier();
65 final boolean isComplete = msg.isComplete();
66 final boolean isUnordered = msg.isUnordered();
67
68 List<ByteBuf> frag = incompleteSctpMessages.get(streamIdentifier);
69 if (frag == null) {
70
71 if (isComplete) {
72 out.add(msg.retain());
73 } else {
74 if (maxIncompleteSctpMessages <= incompleteSctpMessages.size()) {
75 throw new CodecException(
76 "Too many incomplete sctp messages in flight: " + maxIncompleteSctpMessages);
77 }
78
79 frag = new ArrayList<>();
80 frag.add(byteBuf.retain());
81 incompleteSctpMessages.put(streamIdentifier, frag);
82 }
83 } else {
84 if (maxFragments <= frag.size()) {
85 throw new CodecException("Too many fragments for sctp message: " + maxFragments);
86 }
87 frag.add(byteBuf.retain());
88 if (isComplete) {
89
90 incompleteSctpMessages.remove(streamIdentifier);
91 CompositeByteBuf composite = ctx.alloc().compositeBuffer();
92
93 for (int i = 0; i < frag.size(); i++) {
94 composite.addComponent(true, frag.get(i));
95 }
96
97 SctpMessage assembledMsg = new SctpMessage(
98 protocolIdentifier,
99 streamIdentifier,
100 isUnordered,
101 composite);
102 out.add(assembledMsg);
103 }
104 }
105 }
106
107 @Override
108 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
109 for (List<ByteBuf> buffers: incompleteSctpMessages.values()) {
110 for (ByteBuf buffer: buffers) {
111 buffer.release();
112 }
113 }
114 incompleteSctpMessages.clear();
115 super.handlerRemoved(ctx);
116 }
117
118 @Override
119 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
120 super.exceptionCaught(ctx, cause);
121 ctx.close();
122 }
123 }