View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.codec.stomp;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufUtil;
20  import io.netty.channel.ChannelHandlerContext;
21  import io.netty.handler.codec.MessageToMessageEncoder;
22  import io.netty.util.concurrent.FastThreadLocal;
23  import io.netty.util.internal.AppendableCharSequence;
24  
25  import java.util.LinkedHashMap;
26  import java.util.List;
27  import java.util.Map.Entry;
28  
29  import static io.netty.handler.codec.stomp.StompConstants.NUL;
30  import static io.netty.handler.codec.stomp.StompHeaders.ACCEPT_VERSION;
31  import static io.netty.handler.codec.stomp.StompHeaders.ACK;
32  import static io.netty.handler.codec.stomp.StompHeaders.CONTENT_LENGTH;
33  import static io.netty.handler.codec.stomp.StompHeaders.CONTENT_TYPE;
34  import static io.netty.handler.codec.stomp.StompHeaders.DESTINATION;
35  import static io.netty.handler.codec.stomp.StompHeaders.HEART_BEAT;
36  import static io.netty.handler.codec.stomp.StompHeaders.HOST;
37  import static io.netty.handler.codec.stomp.StompHeaders.ID;
38  import static io.netty.handler.codec.stomp.StompHeaders.LOGIN;
39  import static io.netty.handler.codec.stomp.StompHeaders.MESSAGE;
40  import static io.netty.handler.codec.stomp.StompHeaders.MESSAGE_ID;
41  import static io.netty.handler.codec.stomp.StompHeaders.PASSCODE;
42  import static io.netty.handler.codec.stomp.StompHeaders.RECEIPT;
43  import static io.netty.handler.codec.stomp.StompHeaders.RECEIPT_ID;
44  import static io.netty.handler.codec.stomp.StompHeaders.SERVER;
45  import static io.netty.handler.codec.stomp.StompHeaders.SESSION;
46  import static io.netty.handler.codec.stomp.StompHeaders.SUBSCRIPTION;
47  import static io.netty.handler.codec.stomp.StompHeaders.TRANSACTION;
48  import static io.netty.handler.codec.stomp.StompHeaders.VERSION;
49  
50  /**
51   * Encodes a {@link StompFrame} or a {@link StompSubframe} into a {@link ByteBuf}.
52   */
53  public class StompSubframeEncoder extends MessageToMessageEncoder<StompSubframe> {
54  
55      private static final int ESCAPE_HEADER_KEY_CACHE_LIMIT = 32;
56      private static final float DEFAULT_LOAD_FACTOR = 0.75f;
57      private static final FastThreadLocal<LinkedHashMap<CharSequence, CharSequence>> ESCAPE_HEADER_KEY_CACHE =
58              new FastThreadLocal<LinkedHashMap<CharSequence, CharSequence>>() {
59                  @Override
60                  protected LinkedHashMap<CharSequence, CharSequence> initialValue() throws Exception {
61                      LinkedHashMap<CharSequence, CharSequence> cache = new LinkedHashMap<CharSequence, CharSequence>(
62                              ESCAPE_HEADER_KEY_CACHE_LIMIT, DEFAULT_LOAD_FACTOR, true) {
63  
64                          @Override
65                          protected boolean removeEldestEntry(Entry eldest) {
66                              return size() > ESCAPE_HEADER_KEY_CACHE_LIMIT;
67                          }
68                      };
69  
70                      cache.put(ACCEPT_VERSION, ACCEPT_VERSION);
71                      cache.put(HOST, HOST);
72                      cache.put(LOGIN, LOGIN);
73                      cache.put(PASSCODE, PASSCODE);
74                      cache.put(HEART_BEAT, HEART_BEAT);
75                      cache.put(VERSION, VERSION);
76                      cache.put(SESSION, SESSION);
77                      cache.put(SERVER, SERVER);
78                      cache.put(DESTINATION, DESTINATION);
79                      cache.put(ID, ID);
80                      cache.put(ACK, ACK);
81                      cache.put(TRANSACTION, TRANSACTION);
82                      cache.put(RECEIPT, RECEIPT);
83                      cache.put(MESSAGE_ID, MESSAGE_ID);
84                      cache.put(SUBSCRIPTION, SUBSCRIPTION);
85                      cache.put(RECEIPT_ID, RECEIPT_ID);
86                      cache.put(MESSAGE, MESSAGE);
87                      cache.put(CONTENT_LENGTH, CONTENT_LENGTH);
88                      cache.put(CONTENT_TYPE, CONTENT_TYPE);
89  
90                      return cache;
91                  }
92              };
93  
94      public StompSubframeEncoder() {
95          super(StompSubframe.class);
96      }
97  
98      @Override
99      protected void encode(ChannelHandlerContext ctx, StompSubframe msg, List<Object> out) throws Exception {
100         if (msg instanceof StompFrame) {
101             StompFrame stompFrame = (StompFrame) msg;
102             ByteBuf buf = encodeFullFrame(stompFrame, ctx);
103 
104             out.add(convertFullFrame(stompFrame, buf));
105         } else if (msg instanceof StompHeadersSubframe) {
106             StompHeadersSubframe stompHeadersSubframe = (StompHeadersSubframe) msg;
107             ByteBuf buf = ctx.alloc().buffer(headersSubFrameSize(stompHeadersSubframe));
108             encodeHeaders(stompHeadersSubframe, buf);
109 
110             out.add(convertHeadersSubFrame(stompHeadersSubframe, buf));
111         } else if (msg instanceof StompContentSubframe) {
112             StompContentSubframe stompContentSubframe = (StompContentSubframe) msg;
113             ByteBuf buf = encodeContent(stompContentSubframe, ctx);
114 
115             out.add(convertContentSubFrame(stompContentSubframe, buf));
116         }
117     }
118 
119     /**
120      * An extension method to convert a STOMP encoded buffer to a different message type
121      * based on an original {@link StompFrame} full frame.
122      *
123      * <p>By default an encoded buffer is returned as is.
124      */
125     protected Object convertFullFrame(StompFrame original, ByteBuf encoded) {
126         return encoded;
127     }
128 
129     /**
130      * An extension method to convert a STOMP encoded buffer to a different message type
131      * based on an original {@link StompHeadersSubframe} headers sub frame.
132      *
133      * <p>By default an encoded buffer is returned as is.
134      */
135     protected Object convertHeadersSubFrame(StompHeadersSubframe original, ByteBuf encoded) {
136         return encoded;
137     }
138 
139     /**
140      * An extension method to convert a STOMP encoded buffer to a different message type
141      * based on an original {@link StompHeadersSubframe} content sub frame.
142      *
143      * <p>By default an encoded buffer is returned as is.
144      */
145     protected Object convertContentSubFrame(StompContentSubframe original, ByteBuf encoded) {
146         return encoded;
147     }
148 
149     /**
150      * Returns a heuristic size for headers (32 bytes per header line) + (2 bytes for colon and eol) + (additional
151      * command buffer).
152      */
153     protected int headersSubFrameSize(StompHeadersSubframe headersSubframe) {
154         int estimatedSize = headersSubframe.headers().size() * 34 + 48;
155         if (estimatedSize < 128) {
156             return 128;
157         }
158 
159         return Math.max(estimatedSize, 256);
160     }
161 
162     private ByteBuf encodeFullFrame(StompFrame frame, ChannelHandlerContext ctx) {
163         int contentReadableBytes = frame.content().readableBytes();
164         ByteBuf buf = ctx.alloc().buffer(headersSubFrameSize(frame) + contentReadableBytes);
165         encodeHeaders(frame, buf);
166 
167         if (contentReadableBytes > 0) {
168             buf.writeBytes(frame.content());
169         }
170 
171         return buf.writeByte(NUL);
172     }
173 
174     private static void encodeHeaders(StompHeadersSubframe frame, ByteBuf buf) {
175         StompCommand command = frame.command();
176         ByteBufUtil.writeUtf8(buf, command.toString());
177         buf.writeByte(StompConstants.LF);
178 
179         boolean shouldEscape = shouldEscape(command);
180         LinkedHashMap<CharSequence, CharSequence> cache = ESCAPE_HEADER_KEY_CACHE.get();
181         for (Entry<CharSequence, CharSequence> entry : frame.headers()) {
182             CharSequence headerKey = entry.getKey();
183             if (shouldEscape) {
184                 CharSequence cachedHeaderKey = cache.get(headerKey);
185                 if (cachedHeaderKey == null) {
186                     cachedHeaderKey = escape(headerKey);
187                     cache.put(headerKey, cachedHeaderKey);
188                 }
189                 headerKey = cachedHeaderKey;
190             }
191 
192             ByteBufUtil.writeUtf8(buf, headerKey);
193             buf.writeByte(StompConstants.COLON);
194 
195             CharSequence headerValue = shouldEscape? escape(entry.getValue()) : entry.getValue();
196             ByteBufUtil.writeUtf8(buf, headerValue);
197             buf.writeByte(StompConstants.LF);
198         }
199 
200         buf.writeByte(StompConstants.LF);
201     }
202 
203     private static ByteBuf encodeContent(StompContentSubframe content, ChannelHandlerContext ctx) {
204         if (content instanceof LastStompContentSubframe) {
205             ByteBuf buf = ctx.alloc().buffer(content.content().readableBytes() + 1);
206             buf.writeBytes(content.content());
207             buf.writeByte(StompConstants.NUL);
208             return buf;
209         }
210 
211         return content.content().retain();
212     }
213 
214     private static boolean shouldEscape(StompCommand command) {
215         return command != StompCommand.CONNECT && command != StompCommand.CONNECTED;
216     }
217 
218     private static CharSequence escape(CharSequence input) {
219         AppendableCharSequence builder = null;
220         for (int i = 0; i < input.length(); i++) {
221             char chr = input.charAt(i);
222             if (chr == '\\') {
223                 builder = escapeBuilder(builder, input, i);
224                 builder.append("\\\\");
225             } else if (chr == ':') {
226                 builder = escapeBuilder(builder, input, i);
227                 builder.append("\\c");
228             } else if (chr == '\n') {
229                 builder = escapeBuilder(builder, input, i);
230                 builder.append("\\n");
231             } else if (chr == '\r') {
232                 builder = escapeBuilder(builder, input, i);
233                 builder.append("\\r");
234             } else if (builder != null) {
235                 builder.append(chr);
236             }
237         }
238 
239         return builder != null? builder : input;
240     }
241 
242     private static AppendableCharSequence escapeBuilder(AppendableCharSequence builder, CharSequence input,
243                                                         int offset) {
244         if (builder != null) {
245             return builder;
246         }
247 
248         // Add extra overhead to the input char sequence to avoid resizing during escaping.
249         return new AppendableCharSequence(input.length() + 8).append(input, 0, offset);
250     }
251 }