1 /*
2 * Copyright 2012 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License,
5 * version 2.0 (the "License"); you may not use this file except in compliance
6 * with the License. You may obtain a copy of the License at:
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations
14 * under the License.
15 */
16 package org.jboss.netty.handler.codec.replay;
17
18 import org.jboss.netty.buffer.ChannelBuffer;
19 import org.jboss.netty.channel.Channel;
20 import org.jboss.netty.channel.ChannelHandler;
21 import org.jboss.netty.channel.ChannelHandlerContext;
22 import org.jboss.netty.channel.ChannelPipeline;
23 import org.jboss.netty.channel.ChannelStateEvent;
24 import org.jboss.netty.channel.MessageEvent;
25 import org.jboss.netty.handler.codec.frame.FrameDecoder;
26
27 import java.net.SocketAddress;
28
29 /**
30 * A specialized variation of {@link FrameDecoder} which enables implementation
31 * of a non-blocking decoder in the blocking I/O paradigm.
32 * <p>
33 * The biggest difference between {@link ReplayingDecoder} and
34 * {@link FrameDecoder} is that {@link ReplayingDecoder} allows you to
35 * implement the {@code decode()} and {@code decodeLast()} methods just like
36 * all required bytes were received already, rather than checking the
37 * availability of the required bytes. For example, the following
38 * {@link FrameDecoder} implementation:
39 * <pre>
40 * public class IntegerHeaderFrameDecoder extends {@link FrameDecoder} {
41 *
42 * {@code @Override}
43 * protected Object decode({@link ChannelHandlerContext} ctx,
44 * {@link Channel} channel,
45 * {@link ChannelBuffer} buf) throws Exception {
46 *
47 * if (buf.readableBytes() < 4) {
48 * return <strong>null</strong>;
49 * }
50 *
51 * buf.markReaderIndex();
52 * int length = buf.readInt();
53 *
54 * if (buf.readableBytes() < length) {
55 * buf.resetReaderIndex();
56 * return <strong>null</strong>;
57 * }
58 *
59 * return buf.readBytes(length);
60 * }
61 * }
62 * </pre>
63 * is simplified like the following with {@link ReplayingDecoder}:
64 * <pre>
65 * public class IntegerHeaderFrameDecoder
66 * extends {@link ReplayingDecoder}<{@link VoidEnum}> {
67 *
68 * protected Object decode({@link ChannelHandlerContext} ctx,
69 * {@link Channel} channel,
70 * {@link ChannelBuffer} buf,
71 * {@link VoidEnum} state) throws Exception {
72 *
73 * return buf.readBytes(buf.readInt());
74 * }
75 * }
76 * </pre>
77 *
78 * <h3>How does this work?</h3>
79 * <p>
80 * {@link ReplayingDecoder} passes a specialized {@link ChannelBuffer}
81 * implementation which throws an {@link Error} of certain type when there's not
82 * enough data in the buffer. In the {@code IntegerHeaderFrameDecoder} above,
83 * you just assumed that there will be 4 or more bytes in the buffer when
84 * you call {@code buf.readInt()}. If there's really 4 bytes in the buffer,
85 * it will return the integer header as you expected. Otherwise, the
86 * {@link Error} will be raised and the control will be returned to
87 * {@link ReplayingDecoder}. If {@link ReplayingDecoder} catches the
88 * {@link Error}, then it will rewind the {@code readerIndex} of the buffer
89 * back to the 'initial' position (i.e. the beginning of the buffer) and call
90 * the {@code decode(..)} method again when more data is received into the
91 * buffer.
92 * <p>
93 * Please note that {@link ReplayingDecoder} always throws the same cached
94 * {@link Error} instance to avoid the overhead of creating a new {@link Error}
95 * and filling its stack trace for every throw.
96 *
97 * <h3>Limitations</h3>
98 * <p>
99 * At the cost of the simplicity, {@link ReplayingDecoder} enforces you a few
100 * limitations:
101 * <ul>
102 * <li>Some buffer operations are prohibited.</li>
103 * <li>Performance can be worse if the network is slow and the message
104 * format is complicated unlike the example above. In this case, your
105 * decoder might have to decode the same part of the message over and over
106 * again.</li>
107 * <li>You must keep in mind that {@code decode(..)} method can be called many
108 * times to decode a single message. For example, the following code will
109 * not work:
110 * <pre> public class MyDecoder extends {@link ReplayingDecoder}<{@link VoidEnum}> {
111 *
112 * private final Queue<Integer> values = new LinkedList<Integer>();
113 *
114 * {@code @Override}
115 * public Object decode(.., {@link ChannelBuffer} buffer, ..) throws Exception {
116 *
117 * // A message contains 2 integers.
118 * values.offer(buffer.readInt());
119 * values.offer(buffer.readInt());
120 *
121 * // This assertion will fail intermittently since values.offer()
122 * // can be called more than two times!
123 * assert values.size() == 2;
124 * return values.poll() + values.poll();
125 * }
126 * }</pre>
127 * The correct implementation looks like the following, and you can also
128 * utilize the 'checkpoint' feature which is explained in detail in the
129 * next section.
130 * <pre> public class MyDecoder extends {@link ReplayingDecoder}<{@link VoidEnum}> {
131 *
132 * private final Queue<Integer> values = new LinkedList<Integer>();
133 *
134 * {@code @Override}
135 * public Object decode(.., {@link ChannelBuffer} buffer, ..) throws Exception {
136 *
137 * // Revert the state of the variable that might have been changed
138 * // since the last partial decode.
139 * values.clear();
140 *
141 * // A message contains 2 integers.
142 * values.offer(buffer.readInt());
143 * values.offer(buffer.readInt());
144 *
145 * // Now we know this assertion will never fail.
146 * assert values.size() == 2;
147 * return values.poll() + values.poll();
148 * }
149 * }</pre>
150 * </li>
151 * </ul>
152 *
153 * <h3>Improving the performance</h3>
154 * <p>
155 * Fortunately, the performance of a complex decoder implementation can be
156 * improved significantly with the {@code checkpoint()} method. The
157 * {@code checkpoint()} method updates the 'initial' position of the buffer so
158 * that {@link ReplayingDecoder} rewinds the {@code readerIndex} of the buffer
159 * to the last position where you called the {@code checkpoint()} method.
160 *
161 * <h4>Calling {@code checkpoint(T)} with an {@link Enum}</h4>
162 * <p>
163 * Although you can just use {@code checkpoint()} method and manage the state
164 * of the decoder by yourself, the easiest way to manage the state of the
165 * decoder is to create an {@link Enum} type which represents the current state
166 * of the decoder and to call {@code checkpoint(T)} method whenever the state
167 * changes. You can have as many states as you want depending on the
168 * complexity of the message you want to decode:
169 *
170 * <pre>
171 * public enum MyDecoderState {
172 * READ_LENGTH,
173 * READ_CONTENT;
174 * }
175 *
176 * public class IntegerHeaderFrameDecoder
177 * extends {@link ReplayingDecoder}<<strong>MyDecoderState</strong>> {
178 *
179 * private int length;
180 *
181 * public IntegerHeaderFrameDecoder() {
182 * // Set the initial state.
183 * <strong>super(MyDecoderState.READ_LENGTH);</strong>
184 * }
185 *
186 * {@code @Override}
187 * protected Object decode({@link ChannelHandlerContext} ctx,
188 * {@link Channel} channel,
189 * {@link ChannelBuffer} buf,
190 * <b>MyDecoderState</b> state) throws Exception {
191 * switch (state) {
192 * case READ_LENGTH:
193 * length = buf.readInt();
194 * <strong>checkpoint(MyDecoderState.READ_CONTENT);</strong>
195 * case READ_CONTENT:
196 * ChannelBuffer frame = buf.readBytes(length);
197 * <strong>checkpoint(MyDecoderState.READ_LENGTH);</strong>
198 * return frame;
199 * default:
200 * throw new Error("Shouldn't reach here.");
201 * }
202 * }
203 * }
204 * </pre>
205 *
206 * <h4>Calling {@code checkpoint()} with no parameter</h4>
207 * <p>
208 * An alternative way to manage the decoder state is to manage it by yourself.
209 * <pre>
210 * public class IntegerHeaderFrameDecoder
211 * extends {@link ReplayingDecoder}<<strong>{@link VoidEnum}</strong>> {
212 *
213 * <strong>private boolean readLength;</strong>
214 * private int length;
215 *
216 * {@code @Override}
217 * protected Object decode({@link ChannelHandlerContext} ctx,
218 * {@link Channel} channel,
219 * {@link ChannelBuffer} buf,
220 * {@link VoidEnum} state) throws Exception {
221 * if (!readLength) {
222 * length = buf.readInt();
223 * <strong>readLength = true;</strong>
224 * <strong>checkpoint();</strong>
225 * }
226 *
227 * if (readLength) {
228 * ChannelBuffer frame = buf.readBytes(length);
229 * <strong>readLength = false;</strong>
230 * <strong>checkpoint();</strong>
231 * return frame;
232 * }
233 * }
234 * }
235 * </pre>
236 *
237 * <h3>Replacing a decoder with another decoder in a pipeline</h3>
238 * <p>
239 * If you are going to write a protocol multiplexer, you will probably want to
240 * replace a {@link ReplayingDecoder} (protocol detector) with another
241 * {@link ReplayingDecoder} or {@link FrameDecoder} (actual protocol decoder).
242 * It is not possible to achieve this simply by calling
243 * {@link ChannelPipeline#replace(ChannelHandler, String, ChannelHandler)}, but
244 * some additional steps are required:
245 * <pre>
246 * public class FirstDecoder extends {@link ReplayingDecoder}<{@link VoidEnum}> {
247 *
248 * public FirstDecoder() {
249 * super(true); // Enable unfold
250 * }
251 *
252 * {@code @Override}
253 * protected Object decode({@link ChannelHandlerContext} ctx,
254 * {@link Channel} ch,
255 * {@link ChannelBuffer} buf,
256 * {@link VoidEnum} state) {
257 * ...
258 * // Decode the first message
259 * Object firstMessage = ...;
260 *
261 * // Add the second decoder
262 * ctx.getPipeline().addLast("second", new SecondDecoder());
263 *
264 * // Remove the first decoder (me)
265 * ctx.getPipeline().remove(this);
266 *
267 * if (buf.readable()) {
268 * // Hand off the remaining data to the second decoder
269 * return new Object[] { firstMessage, buf.readBytes(<b>super.actualReadableBytes()</b>) };
270 * } else {
271 * // Nothing to hand off
272 * return firstMessage;
273 * }
274 * }
275 * </pre>
276 *
277 * @param <T>
278 * the state type; use {@link VoidEnum} if state management is unused
279 *
280 * @apiviz.landmark
281 * @apiviz.has org.jboss.netty.handler.codec.replay.UnreplayableOperationException oneway - - throws
282 */
283 public abstract class ReplayingDecoder<T extends Enum<T>>
284 extends FrameDecoder {
285
286 private final ReplayingDecoderBuffer replayable = new ReplayingDecoderBuffer(this);
287 private T state;
288 private int checkpoint;
289 private boolean needsCleanup;
290
291 /**
292 * Creates a new instance with no initial state (i.e: {@code null}).
293 */
294 protected ReplayingDecoder() {
295 this(null);
296 }
297
298 protected ReplayingDecoder(boolean unfold) {
299 this(null, unfold);
300 }
301
302 /**
303 * Creates a new instance with the specified initial state.
304 */
305 protected ReplayingDecoder(T initialState) {
306 this(initialState, false);
307 }
308
309 protected ReplayingDecoder(T initialState, boolean unfold) {
310 super(unfold);
311 state = initialState;
312 }
313
314 @Override
315 protected ChannelBuffer internalBuffer() {
316 return super.internalBuffer();
317 }
318
319 /**
320 * Stores the internal cumulative buffer's reader position.
321 */
322 protected void checkpoint() {
323 ChannelBuffer cumulation = this.cumulation;
324 if (cumulation != null) {
325 checkpoint = cumulation.readerIndex();
326 } else {
327 checkpoint = -1; // buffer not available (already cleaned up)
328 }
329 }
330
331 /**
332 * Stores the internal cumulative buffer's reader position and updates
333 * the current decoder state.
334 */
335 protected void checkpoint(T state) {
336 checkpoint();
337 setState(state);
338 }
339
340 /**
341 * Returns the current state of this decoder.
342 * @return the current state of this decoder
343 */
344 protected T getState() {
345 return state;
346 }
347
348 /**
349 * Sets the current state of this decoder.
350 * @return the old state of this decoder
351 */
352 protected T setState(T newState) {
353 T oldState = state;
354 state = newState;
355 return oldState;
356 }
357
358 /**
359 * Decodes the received packets so far into a frame.
360 *
361 * @param ctx the context of this handler
362 * @param channel the current channel
363 * @param buffer the cumulative buffer of received packets so far.
364 * Note that the buffer might be empty, which means you
365 * should not make an assumption that the buffer contains
366 * at least one byte in your decoder implementation.
367 * @param state the current decoder state ({@code null} if unused)
368 *
369 * @return the decoded frame
370 */
371 protected abstract Object decode(ChannelHandlerContext ctx,
372 Channel channel, ChannelBuffer buffer, T state) throws Exception;
373
374 /**
375 * Decodes the received data so far into a frame when the channel is
376 * disconnected.
377 *
378 * @param ctx the context of this handler
379 * @param channel the current channel
380 * @param buffer the cumulative buffer of received packets so far.
381 * Note that the buffer might be empty, which means you
382 * should not make an assumption that the buffer contains
383 * at least one byte in your decoder implementation.
384 * @param state the current decoder state ({@code null} if unused)
385 *
386 * @return the decoded frame
387 */
388 protected Object decodeLast(
389 ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer, T state) throws Exception {
390 return decode(ctx, channel, buffer, state);
391 }
392
393 /**
394 * Calls {@link #decode(ChannelHandlerContext, Channel, ChannelBuffer, Enum)}. This method
395 * should be never used by {@link ReplayingDecoder} itself. But to be safe we should handle it
396 * anyway
397 */
398 @Override
399 protected final Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
400 return decode(ctx, channel, buffer, state);
401 }
402
403 @Override
404 protected final Object decodeLast(
405 ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
406 return decodeLast(ctx, channel, buffer, state);
407 }
408
409 @Override
410 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
411 throws Exception {
412
413 Object m = e.getMessage();
414 if (!(m instanceof ChannelBuffer)) {
415 ctx.sendUpstream(e);
416 return;
417 }
418
419 ChannelBuffer input = (ChannelBuffer) m;
420 if (!input.readable()) {
421 return;
422 }
423
424 needsCleanup = true;
425
426 if (cumulation == null) {
427 // the cumulation buffer is not created yet so just pass the input
428 // to callDecode(...) method
429 cumulation = input;
430
431 int oldReaderIndex = input.readerIndex();
432 int inputSize = input.readableBytes();
433
434 try {
435 callDecode(
436 ctx, e.getChannel(),
437 input, replayable,
438 e.getRemoteAddress());
439 } finally {
440 int readableBytes = input.readableBytes();
441 if (readableBytes > 0) {
442 int inputCapacity = input.capacity();
443 // check if readableBytes == capacity we can safe the copy as we will not be able to
444 // optimize memory usage anyway
445 boolean copy =
446 readableBytes != inputCapacity &&
447 inputCapacity > getMaxCumulationBufferCapacity();
448
449 // seems like there is something readable left in the input buffer
450 // or decoder wants a replay - create the cumulation buffer and
451 // copy the input into it
452 ChannelBuffer cumulation;
453 if (checkpoint > 0) {
454 int bytesToPreserve = inputSize - (checkpoint - oldReaderIndex);
455 if (copy) {
456 this.cumulation = cumulation = newCumulationBuffer(ctx, bytesToPreserve);
457 cumulation.writeBytes(input, checkpoint, bytesToPreserve);
458 } else {
459 this.cumulation = input.slice(checkpoint, bytesToPreserve);
460 }
461 } else if (checkpoint == 0) {
462 if (copy) {
463 this.cumulation = cumulation = newCumulationBuffer(ctx, inputSize);
464 cumulation.writeBytes(input, oldReaderIndex, inputSize);
465 cumulation.readerIndex(input.readerIndex());
466 } else {
467 this.cumulation = cumulation = input.slice(oldReaderIndex, inputSize);
468 cumulation.readerIndex(input.readerIndex());
469 }
470 } else {
471 if (copy) {
472 this.cumulation = cumulation = newCumulationBuffer(ctx, input.readableBytes());
473 cumulation.writeBytes(input);
474 } else {
475 this.cumulation = input;
476 }
477 }
478 } else {
479 cumulation = null;
480 }
481 }
482 } else {
483 input = appendToCumulation(input);
484 try {
485 callDecode(ctx, e.getChannel(), input, replayable, e.getRemoteAddress());
486 } finally {
487 updateCumulation(ctx, input);
488 }
489 }
490 }
491
492 private void callDecode(
493 ChannelHandlerContext context, Channel channel,
494 ChannelBuffer input, ChannelBuffer replayableInput, SocketAddress remoteAddress) throws Exception {
495 while (input.readable()) {
496 int oldReaderIndex = checkpoint = input.readerIndex();
497 Object result = null;
498 T oldState = state;
499 try {
500 result = decode(context, channel, replayableInput, state);
501 if (result == null) {
502 if (oldReaderIndex == input.readerIndex() && oldState == state) {
503 throw new IllegalStateException(
504 "null cannot be returned if no data is consumed and state didn't change.");
505 } else {
506 // Previous data has been discarded or caused state transition.
507 // Probably it is reading on.
508 continue;
509 }
510 }
511 } catch (ReplayError replay) {
512 // Return to the checkpoint (or oldPosition) and retry.
513 int checkpoint = this.checkpoint;
514 if (checkpoint >= 0) {
515 input.readerIndex(checkpoint);
516 } else {
517 // Called by cleanup() - no need to maintain the readerIndex
518 // anymore because the buffer has been released already.
519 }
520 }
521
522 if (result == null) {
523 // Seems like more data is required.
524 // Let us wait for the next notification.
525 break;
526 }
527
528 if (oldReaderIndex == input.readerIndex() && oldState == state) {
529 throw new IllegalStateException(
530 "decode() method must consume at least one byte " +
531 "if it returned a decoded message (caused by: " +
532 getClass() + ')');
533 }
534
535 // A successful decode
536 unfoldAndFireMessageReceived(context, remoteAddress, result);
537 }
538 }
539
540 @Override
541 protected void cleanup(ChannelHandlerContext ctx, ChannelStateEvent e)
542 throws Exception {
543 try {
544 ChannelBuffer cumulation = this.cumulation;
545 if (!needsCleanup) {
546 return;
547 }
548
549 needsCleanup = false;
550 replayable.terminate();
551
552 if (cumulation != null && cumulation.readable()) {
553 // Make sure all data was read before notifying a closed channel.
554 callDecode(ctx, e.getChannel(), cumulation, replayable, null);
555 }
556
557 // Call decodeLast() finally. Please note that decodeLast() is
558 // called even if there's nothing more to read from the buffer to
559 // notify a user that the connection was closed explicitly.
560 Object partiallyDecoded = decodeLast(ctx, e.getChannel(), replayable, state);
561
562 this.cumulation = null;
563
564 if (partiallyDecoded != null) {
565 unfoldAndFireMessageReceived(ctx, null, partiallyDecoded);
566 }
567 } catch (ReplayError replay) {
568 // Ignore
569 } finally {
570 ctx.sendUpstream(e);
571 }
572 }
573 }