1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.codec.stomp;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufUtil;
20 import io.netty.channel.ChannelHandlerContext;
21 import io.netty.handler.codec.MessageToMessageEncoder;
22
23 import java.util.List;
24 import java.util.Map.Entry;
25
26 import static io.netty.handler.codec.stomp.StompConstants.*;
27
28
29
30
31 public class StompSubframeEncoder extends MessageToMessageEncoder<StompSubframe> {
32
33 @Override
34 protected void encode(ChannelHandlerContext ctx, StompSubframe msg, List<Object> out) throws Exception {
35 if (msg instanceof StompFrame) {
36 StompFrame stompFrame = (StompFrame) msg;
37 ByteBuf buf = encodeFullFrame(stompFrame, ctx);
38
39 out.add(convertFullFrame(stompFrame, buf));
40 } else if (msg instanceof StompHeadersSubframe) {
41 StompHeadersSubframe stompHeadersSubframe = (StompHeadersSubframe) msg;
42 ByteBuf buf = ctx.alloc().buffer(headersSubFrameSize(stompHeadersSubframe));
43 encodeHeaders(stompHeadersSubframe, buf);
44
45 out.add(convertHeadersSubFrame(stompHeadersSubframe, buf));
46 } else if (msg instanceof StompContentSubframe) {
47 StompContentSubframe stompContentSubframe = (StompContentSubframe) msg;
48 ByteBuf buf = encodeContent(stompContentSubframe, ctx);
49
50 out.add(convertContentSubFrame(stompContentSubframe, buf));
51 }
52 }
53
54
55
56
57
58
59
60 protected Object convertFullFrame(StompFrame original, ByteBuf encoded) {
61 return encoded;
62 }
63
64
65
66
67
68
69
70 protected Object convertHeadersSubFrame(StompHeadersSubframe original, ByteBuf encoded) {
71 return encoded;
72 }
73
74
75
76
77
78
79
80 protected Object convertContentSubFrame(StompContentSubframe original, ByteBuf encoded) {
81 return encoded;
82 }
83
84
85
86
87
88 protected int headersSubFrameSize(StompHeadersSubframe headersSubframe) {
89 int estimatedSize = headersSubframe.headers().size() * 34 + 48;
90 if (estimatedSize < 128) {
91 return 128;
92 } else if (estimatedSize < 256) {
93 return 256;
94 }
95
96 return estimatedSize;
97 }
98
99 private ByteBuf encodeFullFrame(StompFrame frame, ChannelHandlerContext ctx) {
100 int contentReadableBytes = frame.content().readableBytes();
101 ByteBuf buf = ctx.alloc().buffer(headersSubFrameSize(frame) + contentReadableBytes);
102 encodeHeaders(frame, buf);
103
104 if (contentReadableBytes > 0) {
105 buf.writeBytes(frame.content());
106 }
107
108 return buf.writeByte(NUL);
109 }
110
111 private static void encodeHeaders(StompHeadersSubframe frame, ByteBuf buf) {
112 ByteBufUtil.writeUtf8(buf, frame.command().toString());
113 buf.writeByte(StompConstants.LF);
114
115 for (Entry<CharSequence, CharSequence> entry : frame.headers()) {
116 ByteBufUtil.writeUtf8(buf, entry.getKey());
117 buf.writeByte(StompConstants.COLON);
118 ByteBufUtil.writeUtf8(buf, entry.getValue());
119 buf.writeByte(StompConstants.LF);
120 }
121
122 buf.writeByte(StompConstants.LF);
123 }
124
125 private static ByteBuf encodeContent(StompContentSubframe content, ChannelHandlerContext ctx) {
126 if (content instanceof LastStompContentSubframe) {
127 ByteBuf buf = ctx.alloc().buffer(content.content().readableBytes() + 1);
128 buf.writeBytes(content.content());
129 buf.writeByte(StompConstants.NUL);
130 return buf;
131 }
132
133 return content.content().retain();
134 }
135 }