1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.codec.stomp;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufUtil;
20 import io.netty.channel.ChannelHandlerContext;
21 import io.netty.handler.codec.MessageToMessageEncoder;
22 import io.netty.util.concurrent.FastThreadLocal;
23 import io.netty.util.internal.AppendableCharSequence;
24
25 import java.util.LinkedHashMap;
26 import java.util.List;
27 import java.util.Map.Entry;
28
29 import static io.netty.handler.codec.stomp.StompConstants.NUL;
30 import static io.netty.handler.codec.stomp.StompHeaders.ACCEPT_VERSION;
31 import static io.netty.handler.codec.stomp.StompHeaders.ACK;
32 import static io.netty.handler.codec.stomp.StompHeaders.CONTENT_LENGTH;
33 import static io.netty.handler.codec.stomp.StompHeaders.CONTENT_TYPE;
34 import static io.netty.handler.codec.stomp.StompHeaders.DESTINATION;
35 import static io.netty.handler.codec.stomp.StompHeaders.HEART_BEAT;
36 import static io.netty.handler.codec.stomp.StompHeaders.HOST;
37 import static io.netty.handler.codec.stomp.StompHeaders.ID;
38 import static io.netty.handler.codec.stomp.StompHeaders.LOGIN;
39 import static io.netty.handler.codec.stomp.StompHeaders.MESSAGE;
40 import static io.netty.handler.codec.stomp.StompHeaders.MESSAGE_ID;
41 import static io.netty.handler.codec.stomp.StompHeaders.PASSCODE;
42 import static io.netty.handler.codec.stomp.StompHeaders.RECEIPT;
43 import static io.netty.handler.codec.stomp.StompHeaders.RECEIPT_ID;
44 import static io.netty.handler.codec.stomp.StompHeaders.SERVER;
45 import static io.netty.handler.codec.stomp.StompHeaders.SESSION;
46 import static io.netty.handler.codec.stomp.StompHeaders.SUBSCRIPTION;
47 import static io.netty.handler.codec.stomp.StompHeaders.TRANSACTION;
48 import static io.netty.handler.codec.stomp.StompHeaders.VERSION;
49
50
51
52
53 public class StompSubframeEncoder extends MessageToMessageEncoder<StompSubframe> {
54
55 private static final int ESCAPE_HEADER_KEY_CACHE_LIMIT = 32;
56 private static final float DEFAULT_LOAD_FACTOR = 0.75f;
57 private static final FastThreadLocal<LinkedHashMap<CharSequence, CharSequence>> ESCAPE_HEADER_KEY_CACHE =
58 new FastThreadLocal<LinkedHashMap<CharSequence, CharSequence>>() {
59 @Override
60 protected LinkedHashMap<CharSequence, CharSequence> initialValue() throws Exception {
61 LinkedHashMap<CharSequence, CharSequence> cache = new LinkedHashMap<CharSequence, CharSequence>(
62 ESCAPE_HEADER_KEY_CACHE_LIMIT, DEFAULT_LOAD_FACTOR, true) {
63
64 @Override
65 protected boolean removeEldestEntry(Entry eldest) {
66 return size() > ESCAPE_HEADER_KEY_CACHE_LIMIT;
67 }
68 };
69
70 cache.put(ACCEPT_VERSION, ACCEPT_VERSION);
71 cache.put(HOST, HOST);
72 cache.put(LOGIN, LOGIN);
73 cache.put(PASSCODE, PASSCODE);
74 cache.put(HEART_BEAT, HEART_BEAT);
75 cache.put(VERSION, VERSION);
76 cache.put(SESSION, SESSION);
77 cache.put(SERVER, SERVER);
78 cache.put(DESTINATION, DESTINATION);
79 cache.put(ID, ID);
80 cache.put(ACK, ACK);
81 cache.put(TRANSACTION, TRANSACTION);
82 cache.put(RECEIPT, RECEIPT);
83 cache.put(MESSAGE_ID, MESSAGE_ID);
84 cache.put(SUBSCRIPTION, SUBSCRIPTION);
85 cache.put(RECEIPT_ID, RECEIPT_ID);
86 cache.put(MESSAGE, MESSAGE);
87 cache.put(CONTENT_LENGTH, CONTENT_LENGTH);
88 cache.put(CONTENT_TYPE, CONTENT_TYPE);
89
90 return cache;
91 }
92 };
93
94 public StompSubframeEncoder() {
95 super(StompSubframe.class);
96 }
97
98 @Override
99 protected void encode(ChannelHandlerContext ctx, StompSubframe msg, List<Object> out) throws Exception {
100 if (msg instanceof StompFrame) {
101 StompFrame stompFrame = (StompFrame) msg;
102 ByteBuf buf = encodeFullFrame(stompFrame, ctx);
103
104 out.add(convertFullFrame(stompFrame, buf));
105 } else if (msg instanceof StompHeadersSubframe) {
106 StompHeadersSubframe stompHeadersSubframe = (StompHeadersSubframe) msg;
107 ByteBuf buf = ctx.alloc().buffer(headersSubFrameSize(stompHeadersSubframe));
108 encodeHeaders(stompHeadersSubframe, buf);
109
110 out.add(convertHeadersSubFrame(stompHeadersSubframe, buf));
111 } else if (msg instanceof StompContentSubframe) {
112 StompContentSubframe stompContentSubframe = (StompContentSubframe) msg;
113 ByteBuf buf = encodeContent(stompContentSubframe, ctx);
114
115 out.add(convertContentSubFrame(stompContentSubframe, buf));
116 }
117 }
118
119
120
121
122
123
124
125 protected Object convertFullFrame(StompFrame original, ByteBuf encoded) {
126 return encoded;
127 }
128
129
130
131
132
133
134
135 protected Object convertHeadersSubFrame(StompHeadersSubframe original, ByteBuf encoded) {
136 return encoded;
137 }
138
139
140
141
142
143
144
145 protected Object convertContentSubFrame(StompContentSubframe original, ByteBuf encoded) {
146 return encoded;
147 }
148
149
150
151
152
153 protected int headersSubFrameSize(StompHeadersSubframe headersSubframe) {
154 int estimatedSize = headersSubframe.headers().size() * 34 + 48;
155 if (estimatedSize < 128) {
156 return 128;
157 }
158
159 return Math.max(estimatedSize, 256);
160 }
161
162 private ByteBuf encodeFullFrame(StompFrame frame, ChannelHandlerContext ctx) {
163 int contentReadableBytes = frame.content().readableBytes();
164 ByteBuf buf = ctx.alloc().buffer(headersSubFrameSize(frame) + contentReadableBytes);
165 encodeHeaders(frame, buf);
166
167 if (contentReadableBytes > 0) {
168 buf.writeBytes(frame.content());
169 }
170
171 return buf.writeByte(NUL);
172 }
173
174 private static void encodeHeaders(StompHeadersSubframe frame, ByteBuf buf) {
175 StompCommand command = frame.command();
176 ByteBufUtil.writeUtf8(buf, command.toString());
177 buf.writeByte(StompConstants.LF);
178
179 boolean shouldEscape = shouldEscape(command);
180 LinkedHashMap<CharSequence, CharSequence> cache = ESCAPE_HEADER_KEY_CACHE.get();
181 for (Entry<CharSequence, CharSequence> entry : frame.headers()) {
182 CharSequence headerKey = entry.getKey();
183 if (shouldEscape) {
184 CharSequence cachedHeaderKey = cache.get(headerKey);
185 if (cachedHeaderKey == null) {
186 cachedHeaderKey = escape(headerKey);
187 cache.put(headerKey, cachedHeaderKey);
188 }
189 headerKey = cachedHeaderKey;
190 }
191
192 ByteBufUtil.writeUtf8(buf, headerKey);
193 buf.writeByte(StompConstants.COLON);
194
195 CharSequence headerValue = shouldEscape? escape(entry.getValue()) : entry.getValue();
196 ByteBufUtil.writeUtf8(buf, headerValue);
197 buf.writeByte(StompConstants.LF);
198 }
199
200 buf.writeByte(StompConstants.LF);
201 }
202
203 private static ByteBuf encodeContent(StompContentSubframe content, ChannelHandlerContext ctx) {
204 if (content instanceof LastStompContentSubframe) {
205 ByteBuf buf = ctx.alloc().buffer(content.content().readableBytes() + 1);
206 buf.writeBytes(content.content());
207 buf.writeByte(StompConstants.NUL);
208 return buf;
209 }
210
211 return content.content().retain();
212 }
213
214 private static boolean shouldEscape(StompCommand command) {
215 return command != StompCommand.CONNECT && command != StompCommand.CONNECTED;
216 }
217
218 private static CharSequence escape(CharSequence input) {
219 AppendableCharSequence builder = null;
220 for (int i = 0; i < input.length(); i++) {
221 char chr = input.charAt(i);
222 if (chr == '\\') {
223 builder = escapeBuilder(builder, input, i);
224 builder.append("\\\\");
225 } else if (chr == ':') {
226 builder = escapeBuilder(builder, input, i);
227 builder.append("\\c");
228 } else if (chr == '\n') {
229 builder = escapeBuilder(builder, input, i);
230 builder.append("\\n");
231 } else if (chr == '\r') {
232 builder = escapeBuilder(builder, input, i);
233 builder.append("\\r");
234 } else if (builder != null) {
235 builder.append(chr);
236 }
237 }
238
239 return builder != null? builder : input;
240 }
241
242 private static AppendableCharSequence escapeBuilder(AppendableCharSequence builder, CharSequence input,
243 int offset) {
244 if (builder != null) {
245 return builder;
246 }
247
248
249 return new AppendableCharSequence(input.length() + 8).append(input, 0, offset);
250 }
251 }