1 /*
2 * Copyright 2012 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License,
5 * version 2.0 (the "License"); you may not use this file except in compliance
6 * with the License. You may obtain a copy of the License at:
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations
14 * under the License.
15 */
16 package org.jboss.netty.handler.codec.frame;
17
18 import org.jboss.netty.buffer.ChannelBuffer;
19 import org.jboss.netty.buffer.ChannelBufferFactory;
20 import org.jboss.netty.buffer.ChannelBuffers;
21 import org.jboss.netty.buffer.CompositeChannelBuffer;
22 import org.jboss.netty.channel.Channel;
23 import org.jboss.netty.channel.ChannelHandler;
24 import org.jboss.netty.channel.ChannelHandlerContext;
25 import org.jboss.netty.channel.ChannelPipeline;
26 import org.jboss.netty.channel.ChannelStateEvent;
27 import org.jboss.netty.channel.ChannelUpstreamHandler;
28 import org.jboss.netty.channel.Channels;
29 import org.jboss.netty.channel.ExceptionEvent;
30 import org.jboss.netty.channel.LifeCycleAwareChannelHandler;
31 import org.jboss.netty.channel.MessageEvent;
32 import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
33 import org.jboss.netty.handler.codec.replay.ReplayingDecoder;
34
35 import java.net.SocketAddress;
36
37 /**
38 * Decodes the received {@link ChannelBuffer}s into a meaningful frame object.
39 * <p>
40 * In a stream-based transport such as TCP/IP, packets can be fragmented and
41 * reassembled during transmission even in a LAN environment. For example,
42 * let us assume you have received three packets:
43 * <pre>
44 * +-----+-----+-----+
45 * | ABC | DEF | GHI |
46 * +-----+-----+-----+
47 * </pre>
48 * because of the packet fragmentation, a server can receive them like the
49 * following:
50 * <pre>
51 * +----+-------+---+---+
52 * | AB | CDEFG | H | I |
53 * +----+-------+---+---+
54 * </pre>
55 * <p>
56 * {@link FrameDecoder} helps you defrag the received packets into one or more
57 * meaningful <strong>frames</strong> that could be easily understood by the
58 * application logic. In case of the example above, your {@link FrameDecoder}
59 * implementation could defrag the received packets like the following:
60 * <pre>
61 * +-----+-----+-----+
62 * | ABC | DEF | GHI |
63 * +-----+-----+-----+
64 * </pre>
65 * <p>
66 * The following code shows an example handler which decodes a frame whose
67 * first 4 bytes header represents the length of the frame, excluding the
68 * header.
69 * <pre>
70 * MESSAGE FORMAT
71 * ==============
72 *
73 * Offset: 0 4 (Length + 4)
74 * +--------+------------------------+
75 * Fields: | Length | Actual message content |
76 * +--------+------------------------+
77 *
78 * DECODER IMPLEMENTATION
79 * ======================
80 *
81 * public class IntegerHeaderFrameDecoder extends {@link FrameDecoder} {
82 *
83 * {@code @Override}
84 * protected Object decode({@link ChannelHandlerContext} ctx,
85 * {@link Channel channel},
86 * {@link ChannelBuffer} buf) throws Exception {
87 *
88 * // Make sure if the length field was received.
89 * if (buf.readableBytes() < 4) {
90 * // The length field was not received yet - return null.
91 * // This method will be invoked again when more packets are
92 * // received and appended to the buffer.
93 * return <strong>null</strong>;
94 * }
95 *
96 * // The length field is in the buffer.
97 *
98 * // Mark the current buffer position before reading the length field
99 * // because the whole frame might not be in the buffer yet.
100 * // We will reset the buffer position to the marked position if
101 * // there's not enough bytes in the buffer.
102 * buf.markReaderIndex();
103 *
104 * // Read the length field.
105 * int length = buf.readInt();
106 *
107 * // Make sure if there's enough bytes in the buffer.
108 * if (buf.readableBytes() < length) {
109 * // The whole bytes were not received yet - return null.
110 * // This method will be invoked again when more packets are
111 * // received and appended to the buffer.
112 *
113 * // Reset to the marked position to read the length field again
114 * // next time.
115 * buf.resetReaderIndex();
116 *
117 * return <strong>null</strong>;
118 * }
119 *
120 * // There's enough bytes in the buffer. Read it.
121 * {@link ChannelBuffer} frame = buf.readBytes(length);
122 *
123 * // Successfully decoded a frame. Return the decoded frame.
124 * return <strong>frame</strong>;
125 * }
126 * }
127 * </pre>
128 *
129 * <h3>Returning a POJO rather than a {@link ChannelBuffer}</h3>
130 * <p>
131 * Please note that you can return an object of a different type than
132 * {@link ChannelBuffer} in your {@code decode()} and {@code decodeLast()}
133 * implementation. For example, you could return a
134 * <a href="http://en.wikipedia.org/wiki/POJO">POJO</a> so that the next
135 * {@link ChannelUpstreamHandler} receives a {@link MessageEvent} which
136 * contains a POJO rather than a {@link ChannelBuffer}.
137 *
138 * <h3>Replacing a decoder with another decoder in a pipeline</h3>
139 * <p>
140 * If you are going to write a protocol multiplexer, you will probably want to
141 * replace a {@link FrameDecoder} (protocol detector) with another
142 * {@link FrameDecoder} or {@link ReplayingDecoder} (actual protocol decoder).
143 * It is not possible to achieve this simply by calling
144 * {@link ChannelPipeline#replace(ChannelHandler, String, ChannelHandler)}, but
145 * some additional steps are required:
146 * <pre>
147 * public class FirstDecoder extends {@link FrameDecoder} {
148 *
149 * public FirstDecoder() {
150 * super(true); // Enable unfold
151 * }
152 *
153 * {@code @Override}
154 * protected Object decode({@link ChannelHandlerContext} ctx,
155 * {@link Channel} channel,
156 * {@link ChannelBuffer} buf) {
157 * ...
158 * // Decode the first message
159 * Object firstMessage = ...;
160 *
161 * // Add the second decoder
162 * ctx.getPipeline().addLast("second", new SecondDecoder());
163 *
164 * // Remove the first decoder (me)
165 * ctx.getPipeline().remove(this);
166 *
167 * if (buf.readable()) {
168 * // Hand off the remaining data to the second decoder
169 * return new Object[] { firstMessage, buf.readBytes(buf.readableBytes()) };
170 * } else {
171 * // Nothing to hand off
172 * return firstMessage;
173 * }
174 * }
175 * }
176 * </pre>
177 *
178 * @apiviz.landmark
179 */
180 public abstract class FrameDecoder extends SimpleChannelUpstreamHandler implements LifeCycleAwareChannelHandler {
181
182 public static final int DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS = 1024;
183
184 private boolean unfold;
185 protected ChannelBuffer cumulation;
186 private volatile ChannelHandlerContext ctx;
187 private int copyThreshold;
188 private int maxCumulationBufferComponents = DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS;
189
190 protected FrameDecoder() {
191 this(false);
192 }
193
194 protected FrameDecoder(boolean unfold) {
195 this.unfold = unfold;
196 }
197
198 public final boolean isUnfold() {
199 return unfold;
200 }
201
202 public final void setUnfold(boolean unfold) {
203 if (ctx == null) {
204 this.unfold = unfold;
205 } else {
206 throw new IllegalStateException(
207 "decoder properties cannot be changed once the decoder is added to a pipeline.");
208 }
209 }
210
211 /**
212 * See {@link #setMaxCumulationBufferCapacity(int)} for explaintation of this setting
213 *
214 */
215 public final int getMaxCumulationBufferCapacity() {
216 return copyThreshold;
217 }
218
219 /**
220 * Set the maximal capacity of the internal cumulation ChannelBuffer to use
221 * before the {@link FrameDecoder} tries to minimize the memory usage by
222 * "byte copy".
223 *
224 *
225 * What you use here really depends on your application and need. Using
226 * {@link Integer#MAX_VALUE} will disable all byte copies but give you the
227 * cost of a higher memory usage if big {@link ChannelBuffer}'s will be
228 * received.
229 *
230 * By default a threshold of {@code 0} is used, which means it will
231 * always copy to try to reduce memory usage
232 *
233 *
234 * @param copyThreshold
235 * the threshold (in bytes) or {@link Integer#MAX_VALUE} to
236 * disable it. The value must be at least 0
237 * @throws IllegalStateException
238 * get thrown if someone tries to change this setting after the
239 * Decoder was added to the {@link ChannelPipeline}
240 */
241 public final void setMaxCumulationBufferCapacity(int copyThreshold) {
242 if (copyThreshold < 0) {
243 throw new IllegalArgumentException("maxCumulationBufferCapacity must be >= 0");
244 }
245 if (ctx == null) {
246 this.copyThreshold = copyThreshold;
247 } else {
248 throw new IllegalStateException(
249 "decoder properties cannot be changed once the decoder is added to a pipeline.");
250 }
251 }
252
253 /**
254 * Returns the maximum number of components in the cumulation buffer. If the number of
255 * the components in the cumulation buffer exceeds this value, the components of the
256 * cumulation buffer are consolidated into a single component, involving memory copies.
257 * The default value of this property {@link #DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS}.
258 */
259 public final int getMaxCumulationBufferComponents() {
260 return maxCumulationBufferComponents;
261 }
262
263 /**
264 * Sets the maximum number of components in the cumulation buffer. If the number of
265 * the components in the cumulation buffer exceeds this value, the components of the
266 * cumulation buffer are consolidated into a single component, involving memory copies.
267 * The default value of this property is {@link #DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS}
268 * and its minimum allowed value is {@code 2}.
269 */
270 public final void setMaxCumulationBufferComponents(int maxCumulationBufferComponents) {
271 if (maxCumulationBufferComponents < 2) {
272 throw new IllegalArgumentException(
273 "maxCumulationBufferComponents: " + maxCumulationBufferComponents +
274 " (expected: >= 2)");
275 }
276
277 if (ctx == null) {
278 this.maxCumulationBufferComponents = maxCumulationBufferComponents;
279 } else {
280 throw new IllegalStateException(
281 "decoder properties cannot be changed once the decoder is added to a pipeline.");
282 }
283 }
284
285 @Override
286 public void messageReceived(
287 ChannelHandlerContext ctx, MessageEvent e) throws Exception {
288
289 Object m = e.getMessage();
290 if (!(m instanceof ChannelBuffer)) {
291 ctx.sendUpstream(e);
292 return;
293 }
294
295 ChannelBuffer input = (ChannelBuffer) m;
296 if (!input.readable()) {
297 return;
298 }
299
300 if (cumulation == null) {
301 try {
302 // the cumulation buffer is not created yet so just pass the input to callDecode(...) method
303 callDecode(ctx, e.getChannel(), input, e.getRemoteAddress());
304 } finally {
305 updateCumulation(ctx, input);
306 }
307 } else {
308 input = appendToCumulation(input);
309 try {
310 callDecode(ctx, e.getChannel(), input, e.getRemoteAddress());
311 } finally {
312 updateCumulation(ctx, input);
313 }
314 }
315 }
316
317 protected ChannelBuffer appendToCumulation(ChannelBuffer input) {
318 ChannelBuffer cumulation = this.cumulation;
319 assert cumulation.readable();
320 if (cumulation instanceof CompositeChannelBuffer) {
321 // Make sure the resulting cumulation buffer has no more than the configured components.
322 CompositeChannelBuffer composite = (CompositeChannelBuffer) cumulation;
323 if (composite.numComponents() >= maxCumulationBufferComponents) {
324 cumulation = composite.copy();
325 }
326 }
327
328 this.cumulation = input = ChannelBuffers.wrappedBuffer(cumulation, input);
329 return input;
330 }
331
332 protected ChannelBuffer updateCumulation(ChannelHandlerContext ctx, ChannelBuffer input) {
333 ChannelBuffer newCumulation;
334 int readableBytes = input.readableBytes();
335 if (readableBytes > 0) {
336 int inputCapacity = input.capacity();
337
338 // If input.readableBytes() == input.capacity() (i.e. input is full),
339 // there's nothing to save from creating a new cumulation buffer
340 // even if input.capacity() exceeds the threshold, because the new cumulation
341 // buffer will have the same capacity and content with input.
342 if (readableBytes < inputCapacity && inputCapacity > copyThreshold) {
343 // At least one byte was consumed by callDecode() and input.capacity()
344 // exceeded the threshold.
345 cumulation = newCumulation = newCumulationBuffer(ctx, input.readableBytes());
346 cumulation.writeBytes(input);
347 } else {
348 // Nothing was consumed by callDecode() or input.capacity() did not
349 // exceed the threshold.
350 if (input.readerIndex() != 0) {
351 cumulation = newCumulation = input.slice();
352 } else {
353 cumulation = newCumulation = input;
354 }
355 }
356 } else {
357 cumulation = newCumulation = null;
358 }
359 return newCumulation;
360 }
361
362 @Override
363 public void channelDisconnected(
364 ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
365 cleanup(ctx, e);
366 }
367
368 @Override
369 public void channelClosed(
370 ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
371 cleanup(ctx, e);
372 }
373
374 @Override
375 public void exceptionCaught(
376 ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
377 ctx.sendUpstream(e);
378 }
379
380 /**
381 * Decodes the received packets so far into a frame.
382 *
383 * If an sub-class wants to extract a frame out of the buffer it should use
384 * the {@link #extractFrame(ChannelBuffer, int, int)} method,
385 * to make optimizations easier later.
386 *
387 * @param ctx the context of this handler
388 * @param channel the current channel
389 * @param buffer the cumulative buffer of received packets so far.
390 * Note that the buffer might be empty, which means you
391 * should not make an assumption that the buffer contains
392 * at least one byte in your decoder implementation.
393 *
394 * @return the decoded frame if a full frame was received and decoded.
395 * {@code null} if there's not enough data in the buffer to decode a frame.
396 */
397 protected abstract Object decode(
398 ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception;
399
400 /**
401 * Decodes the received data so far into a frame when the channel is
402 * disconnected.
403 *
404 * @param ctx the context of this handler
405 * @param channel the current channel
406 * @param buffer the cumulative buffer of received packets so far.
407 * Note that the buffer might be empty, which means you
408 * should not make an assumption that the buffer contains
409 * at least one byte in your decoder implementation.
410 *
411 * @return the decoded frame if a full frame was received and decoded.
412 * {@code null} if there's not enough data in the buffer to decode a frame.
413 */
414 protected Object decodeLast(
415 ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
416 return decode(ctx, channel, buffer);
417 }
418
419 private void callDecode(
420 ChannelHandlerContext context, Channel channel,
421 ChannelBuffer cumulation, SocketAddress remoteAddress) throws Exception {
422
423 while (cumulation.readable()) {
424 int oldReaderIndex = cumulation.readerIndex();
425 Object frame = decode(context, channel, cumulation);
426 if (frame == null) {
427 if (oldReaderIndex == cumulation.readerIndex()) {
428 // Seems like more data is required.
429 // Let us wait for the next notification.
430 break;
431 } else {
432 // Previous data has been discarded.
433 // Probably it is reading on.
434 continue;
435 }
436 }
437 if (oldReaderIndex == cumulation.readerIndex()) {
438 throw new IllegalStateException(
439 "decode() method must read at least one byte " +
440 "if it returned a frame (caused by: " + getClass() + ')');
441 }
442
443 unfoldAndFireMessageReceived(context, remoteAddress, frame);
444 }
445 }
446
447 protected final void unfoldAndFireMessageReceived(
448 ChannelHandlerContext context, SocketAddress remoteAddress, Object result) {
449 if (unfold) {
450 if (result instanceof Object[]) {
451 for (Object r: (Object[]) result) {
452 Channels.fireMessageReceived(context, r, remoteAddress);
453 }
454 } else if (result instanceof Iterable<?>) {
455 for (Object r: (Iterable<?>) result) {
456 Channels.fireMessageReceived(context, r, remoteAddress);
457 }
458 } else {
459 Channels.fireMessageReceived(context, result, remoteAddress);
460 }
461 } else {
462 Channels.fireMessageReceived(context, result, remoteAddress);
463 }
464 }
465
466 /**
467 * Gets called on {@link #channelDisconnected(ChannelHandlerContext, ChannelStateEvent)} and
468 * {@link #channelClosed(ChannelHandlerContext, ChannelStateEvent)}
469 */
470 protected void cleanup(ChannelHandlerContext ctx, ChannelStateEvent e)
471 throws Exception {
472 try {
473 ChannelBuffer cumulation = this.cumulation;
474 if (cumulation == null) {
475 return;
476 }
477
478 this.cumulation = null;
479
480 if (cumulation.readable()) {
481 // Make sure all frames are read before notifying a closed channel.
482 callDecode(ctx, ctx.getChannel(), cumulation, null);
483 }
484
485 // Call decodeLast() finally. Please note that decodeLast() is
486 // called even if there's nothing more to read from the buffer to
487 // notify a user that the connection was closed explicitly.
488 Object partialFrame = decodeLast(ctx, ctx.getChannel(), cumulation);
489 if (partialFrame != null) {
490 unfoldAndFireMessageReceived(ctx, null, partialFrame);
491 }
492 } finally {
493 ctx.sendUpstream(e);
494 }
495 }
496
497 /**
498 * Create a new {@link ChannelBuffer} which is used for the cumulation.
499 * Sub-classes may override this.
500 *
501 * @param ctx {@link ChannelHandlerContext} for this handler
502 * @return buffer the {@link ChannelBuffer} which is used for cumulation
503 */
504 protected ChannelBuffer newCumulationBuffer(
505 ChannelHandlerContext ctx, int minimumCapacity) {
506 ChannelBufferFactory factory = ctx.getChannel().getConfig().getBufferFactory();
507 return factory.getBuffer(Math.max(minimumCapacity, 256));
508 }
509
510 /**
511 * Replace this {@link FrameDecoder} in the {@link ChannelPipeline} with the given {@link ChannelHandler}. All
512 * remaining bytes in the {@link ChannelBuffer} will get send to the new {@link ChannelHandler} that was used
513 * as replacement
514 *
515 */
516 public void replace(String handlerName, ChannelHandler handler) {
517 if (ctx == null) {
518 throw new IllegalStateException(
519 "Replace cann only be called once the FrameDecoder is added to the ChannelPipeline");
520 }
521 ChannelPipeline pipeline = ctx.getPipeline();
522 pipeline.addAfter(ctx.getName(), handlerName, handler);
523
524 try {
525 if (cumulation != null) {
526 Channels.fireMessageReceived(ctx, cumulation.readBytes(actualReadableBytes()));
527 }
528 } finally {
529 pipeline.remove(this);
530 }
531 }
532
533 /**
534 * Returns the actual number of readable bytes in the internal cumulative
535 * buffer of this decoder. You usually do not need to rely on this value
536 * to write a decoder. Use it only when you muse use it at your own risk.
537 * This method is a shortcut to {@link #internalBuffer() internalBuffer().readableBytes()}.
538 */
539 protected int actualReadableBytes() {
540 return internalBuffer().readableBytes();
541 }
542
543 /**
544 * Returns the internal cumulative buffer of this decoder. You usually
545 * do not need to access the internal buffer directly to write a decoder.
546 * Use it only when you must use it at your own risk.
547 */
548 protected ChannelBuffer internalBuffer() {
549 ChannelBuffer buf = cumulation;
550 if (buf == null) {
551 return ChannelBuffers.EMPTY_BUFFER;
552 }
553 return buf;
554 }
555
556 /**
557 * Extract a Frame of the specified buffer. By default this implementation
558 * will return a extract the sub-region of the buffer and create a new one.
559 * If an sub-class want to extract a frame from the buffer it should use this method
560 * by default.
561 *
562 * <strong>Be sure that this method MUST not modify the readerIndex of the given buffer</strong>
563 *
564 */
565 protected ChannelBuffer extractFrame(ChannelBuffer buffer, int index, int length) {
566 ChannelBuffer frame = buffer.factory().getBuffer(length);
567 frame.writeBytes(buffer, index, length);
568 return frame;
569 }
570
571 public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
572 this.ctx = ctx;
573 }
574
575 public void afterAdd(ChannelHandlerContext ctx) throws Exception {
576 // Nothing to do..
577 }
578
579 public void beforeRemove(ChannelHandlerContext ctx) throws Exception {
580 // Nothing to do..
581 }
582
583 public void afterRemove(ChannelHandlerContext ctx) throws Exception {
584 // Nothing to do..
585 }
586
587 }