1 /* 2 * Copyright 2012 The Netty Project 3 * 4 * The Netty Project licenses this file to you under the Apache License, 5 * version 2.0 (the "License"); you may not use this file except in compliance 6 * with the License. You may obtain a copy of the License at: 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 13 * License for the specific language governing permissions and limitations 14 * under the License. 15 */ 16 package org.jboss.netty.handler.codec.frame; 17 18 import org.jboss.netty.buffer.ChannelBuffer; 19 import org.jboss.netty.buffer.ChannelBufferFactory; 20 import org.jboss.netty.buffer.ChannelBuffers; 21 import org.jboss.netty.buffer.CompositeChannelBuffer; 22 import org.jboss.netty.channel.Channel; 23 import org.jboss.netty.channel.ChannelHandler; 24 import org.jboss.netty.channel.ChannelHandlerContext; 25 import org.jboss.netty.channel.ChannelPipeline; 26 import org.jboss.netty.channel.ChannelStateEvent; 27 import org.jboss.netty.channel.ChannelUpstreamHandler; 28 import org.jboss.netty.channel.Channels; 29 import org.jboss.netty.channel.ExceptionEvent; 30 import org.jboss.netty.channel.LifeCycleAwareChannelHandler; 31 import org.jboss.netty.channel.MessageEvent; 32 import org.jboss.netty.channel.SimpleChannelUpstreamHandler; 33 import org.jboss.netty.handler.codec.replay.ReplayingDecoder; 34 35 import java.net.SocketAddress; 36 37 /** 38 * Decodes the received {@link ChannelBuffer}s into a meaningful frame object. 39 * <p> 40 * In a stream-based transport such as TCP/IP, packets can be fragmented and 41 * reassembled during transmission even in a LAN environment. For example, 42 * let us assume you have received three packets: 43 * <pre> 44 * +-----+-----+-----+ 45 * | ABC | DEF | GHI | 46 * +-----+-----+-----+ 47 * </pre> 48 * because of the packet fragmentation, a server can receive them like the 49 * following: 50 * <pre> 51 * +----+-------+---+---+ 52 * | AB | CDEFG | H | I | 53 * +----+-------+---+---+ 54 * </pre> 55 * <p> 56 * {@link FrameDecoder} helps you defrag the received packets into one or more 57 * meaningful <strong>frames</strong> that could be easily understood by the 58 * application logic. In case of the example above, your {@link FrameDecoder} 59 * implementation could defrag the received packets like the following: 60 * <pre> 61 * +-----+-----+-----+ 62 * | ABC | DEF | GHI | 63 * +-----+-----+-----+ 64 * </pre> 65 * <p> 66 * The following code shows an example handler which decodes a frame whose 67 * first 4 bytes header represents the length of the frame, excluding the 68 * header. 69 * <pre> 70 * MESSAGE FORMAT 71 * ============== 72 * 73 * Offset: 0 4 (Length + 4) 74 * +--------+------------------------+ 75 * Fields: | Length | Actual message content | 76 * +--------+------------------------+ 77 * 78 * DECODER IMPLEMENTATION 79 * ====================== 80 * 81 * public class IntegerHeaderFrameDecoder extends {@link FrameDecoder} { 82 * 83 * {@code @Override} 84 * protected Object decode({@link ChannelHandlerContext} ctx, 85 * {@link Channel channel}, 86 * {@link ChannelBuffer} buf) throws Exception { 87 * 88 * // Make sure if the length field was received. 89 * if (buf.readableBytes() < 4) { 90 * // The length field was not received yet - return null. 91 * // This method will be invoked again when more packets are 92 * // received and appended to the buffer. 93 * return <strong>null</strong>; 94 * } 95 * 96 * // The length field is in the buffer. 97 * 98 * // Mark the current buffer position before reading the length field 99 * // because the whole frame might not be in the buffer yet. 100 * // We will reset the buffer position to the marked position if 101 * // there's not enough bytes in the buffer. 102 * buf.markReaderIndex(); 103 * 104 * // Read the length field. 105 * int length = buf.readInt(); 106 * 107 * // Make sure if there's enough bytes in the buffer. 108 * if (buf.readableBytes() < length) { 109 * // The whole bytes were not received yet - return null. 110 * // This method will be invoked again when more packets are 111 * // received and appended to the buffer. 112 * 113 * // Reset to the marked position to read the length field again 114 * // next time. 115 * buf.resetReaderIndex(); 116 * 117 * return <strong>null</strong>; 118 * } 119 * 120 * // There's enough bytes in the buffer. Read it. 121 * {@link ChannelBuffer} frame = buf.readBytes(length); 122 * 123 * // Successfully decoded a frame. Return the decoded frame. 124 * return <strong>frame</strong>; 125 * } 126 * } 127 * </pre> 128 * 129 * <h3>Returning a POJO rather than a {@link ChannelBuffer}</h3> 130 * <p> 131 * Please note that you can return an object of a different type than 132 * {@link ChannelBuffer} in your {@code decode()} and {@code decodeLast()} 133 * implementation. For example, you could return a 134 * <a href="http://en.wikipedia.org/wiki/POJO">POJO</a> so that the next 135 * {@link ChannelUpstreamHandler} receives a {@link MessageEvent} which 136 * contains a POJO rather than a {@link ChannelBuffer}. 137 * 138 * <h3>Replacing a decoder with another decoder in a pipeline</h3> 139 * <p> 140 * If you are going to write a protocol multiplexer, you will probably want to 141 * replace a {@link FrameDecoder} (protocol detector) with another 142 * {@link FrameDecoder} or {@link ReplayingDecoder} (actual protocol decoder). 143 * It is not possible to achieve this simply by calling 144 * {@link ChannelPipeline#replace(ChannelHandler, String, ChannelHandler)}, but 145 * some additional steps are required: 146 * <pre> 147 * public class FirstDecoder extends {@link FrameDecoder} { 148 * 149 * public FirstDecoder() { 150 * super(true); // Enable unfold 151 * } 152 * 153 * {@code @Override} 154 * protected Object decode({@link ChannelHandlerContext} ctx, 155 * {@link Channel} channel, 156 * {@link ChannelBuffer} buf) { 157 * ... 158 * // Decode the first message 159 * Object firstMessage = ...; 160 * 161 * // Add the second decoder 162 * ctx.getPipeline().addLast("second", new SecondDecoder()); 163 * 164 * // Remove the first decoder (me) 165 * ctx.getPipeline().remove(this); 166 * 167 * if (buf.readable()) { 168 * // Hand off the remaining data to the second decoder 169 * return new Object[] { firstMessage, buf.readBytes(buf.readableBytes()) }; 170 * } else { 171 * // Nothing to hand off 172 * return firstMessage; 173 * } 174 * } 175 * } 176 * </pre> 177 * 178 * @apiviz.landmark 179 */ 180 public abstract class FrameDecoder extends SimpleChannelUpstreamHandler implements LifeCycleAwareChannelHandler { 181 182 public static final int DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS = 1024; 183 184 private boolean unfold; 185 protected ChannelBuffer cumulation; 186 private volatile ChannelHandlerContext ctx; 187 private int copyThreshold; 188 private int maxCumulationBufferComponents = DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS; 189 190 protected FrameDecoder() { 191 this(false); 192 } 193 194 protected FrameDecoder(boolean unfold) { 195 this.unfold = unfold; 196 } 197 198 public final boolean isUnfold() { 199 return unfold; 200 } 201 202 public final void setUnfold(boolean unfold) { 203 if (ctx == null) { 204 this.unfold = unfold; 205 } else { 206 throw new IllegalStateException( 207 "decoder properties cannot be changed once the decoder is added to a pipeline."); 208 } 209 } 210 211 /** 212 * See {@link #setMaxCumulationBufferCapacity(int)} for explaintation of this setting 213 * 214 */ 215 public final int getMaxCumulationBufferCapacity() { 216 return copyThreshold; 217 } 218 219 /** 220 * Set the maximal capacity of the internal cumulation ChannelBuffer to use 221 * before the {@link FrameDecoder} tries to minimize the memory usage by 222 * "byte copy". 223 * 224 * 225 * What you use here really depends on your application and need. Using 226 * {@link Integer#MAX_VALUE} will disable all byte copies but give you the 227 * cost of a higher memory usage if big {@link ChannelBuffer}'s will be 228 * received. 229 * 230 * By default a threshold of <code>0</code> is used, which means it will 231 * always copy to try to reduce memory usage 232 * 233 * 234 * @param copyThreshold 235 * the threshold (in bytes) or {@link Integer#MAX_VALUE} to 236 * disable it. The value must be at least 0 237 * @throws IllegalStateException 238 * get thrown if someone tries to change this setting after the 239 * Decoder was added to the {@link ChannelPipeline} 240 */ 241 public final void setMaxCumulationBufferCapacity(int copyThreshold) { 242 if (copyThreshold < 0) { 243 throw new IllegalArgumentException("maxCumulationBufferCapacity must be >= 0"); 244 } 245 if (ctx == null) { 246 this.copyThreshold = copyThreshold; 247 } else { 248 throw new IllegalStateException( 249 "decoder properties cannot be changed once the decoder is added to a pipeline."); 250 } 251 } 252 253 /** 254 * Returns the maximum number of components in the cumulation buffer. If the number of 255 * the components in the cumulation buffer exceeds this value, the components of the 256 * cumulation buffer are consolidated into a single component, involving memory copies. 257 * The default value of this property {@link #DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS}. 258 */ 259 public final int getMaxCumulationBufferComponents() { 260 return maxCumulationBufferComponents; 261 } 262 263 /** 264 * Sets the maximum number of components in the cumulation buffer. If the number of 265 * the components in the cumulation buffer exceeds this value, the components of the 266 * cumulation buffer are consolidated into a single component, involving memory copies. 267 * The default value of this property is {@link #DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS} 268 * and its minimum allowed value is {@code 2}. 269 */ 270 public final void setMaxCumulationBufferComponents(int maxCumulationBufferComponents) { 271 if (maxCumulationBufferComponents < 2) { 272 throw new IllegalArgumentException( 273 "maxCumulationBufferComponents: " + maxCumulationBufferComponents + 274 " (expected: >= 2)"); 275 } 276 277 if (ctx == null) { 278 this.maxCumulationBufferComponents = maxCumulationBufferComponents; 279 } else { 280 throw new IllegalStateException( 281 "decoder properties cannot be changed once the decoder is added to a pipeline."); 282 } 283 } 284 285 @Override 286 public void messageReceived( 287 ChannelHandlerContext ctx, MessageEvent e) throws Exception { 288 289 Object m = e.getMessage(); 290 if (!(m instanceof ChannelBuffer)) { 291 ctx.sendUpstream(e); 292 return; 293 } 294 295 ChannelBuffer input = (ChannelBuffer) m; 296 if (!input.readable()) { 297 return; 298 } 299 300 if (cumulation == null) { 301 try { 302 // the cumulation buffer is not created yet so just pass the input to callDecode(...) method 303 callDecode(ctx, e.getChannel(), input, e.getRemoteAddress()); 304 } finally { 305 updateCumulation(ctx, input); 306 } 307 308 } else { 309 input = appendToCumulation(input); 310 try { 311 callDecode(ctx, e.getChannel(), input, e.getRemoteAddress()); 312 } finally { 313 updateCumulation(ctx, input); 314 } 315 } 316 } 317 318 protected ChannelBuffer appendToCumulation(ChannelBuffer input) { 319 ChannelBuffer cumulation = this.cumulation; 320 assert cumulation.readable(); 321 if (cumulation instanceof CompositeChannelBuffer) { 322 // Make sure the resulting cumulation buffer has no more than the configured components. 323 CompositeChannelBuffer composite = (CompositeChannelBuffer) cumulation; 324 if (composite.numComponents() >= maxCumulationBufferComponents) { 325 cumulation = composite.copy(); 326 } 327 } 328 329 this.cumulation = input = ChannelBuffers.wrappedBuffer(cumulation, input); 330 return input; 331 } 332 333 protected ChannelBuffer updateCumulation(ChannelHandlerContext ctx, ChannelBuffer input) { 334 ChannelBuffer newCumulation; 335 int readableBytes = input.readableBytes(); 336 if (readableBytes > 0) { 337 int inputCapacity = input.capacity(); 338 339 // If input.readableBytes() == input.capacity() (i.e. input is full), 340 // there's nothing to save from creating a new cumulation buffer 341 // even if input.capacity() exceeds the threshold, because the new cumulation 342 // buffer will have the same capacity and content with input. 343 if (readableBytes < inputCapacity && inputCapacity > copyThreshold) { 344 // At least one byte was consumed by callDecode() and input.capacity() 345 // exceeded the threshold. 346 cumulation = newCumulation = newCumulationBuffer(ctx, input.readableBytes()); 347 cumulation.writeBytes(input); 348 } else { 349 // Nothing was consumed by callDecode() or input.capacity() did not 350 // exceed the threshold. 351 if (input.readerIndex() != 0) { 352 cumulation = newCumulation = input.slice(); 353 } else { 354 cumulation = newCumulation = input; 355 } 356 } 357 } else { 358 cumulation = newCumulation = null; 359 } 360 return newCumulation; 361 } 362 363 @Override 364 public void channelDisconnected( 365 ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { 366 cleanup(ctx, e); 367 } 368 369 @Override 370 public void channelClosed( 371 ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { 372 cleanup(ctx, e); 373 } 374 375 @Override 376 public void exceptionCaught( 377 ChannelHandlerContext ctx, ExceptionEvent e) throws Exception { 378 ctx.sendUpstream(e); 379 } 380 381 /** 382 * Decodes the received packets so far into a frame. 383 * 384 * @param ctx the context of this handler 385 * @param channel the current channel 386 * @param buffer the cumulative buffer of received packets so far. 387 * Note that the buffer might be empty, which means you 388 * should not make an assumption that the buffer contains 389 * at least one byte in your decoder implementation. 390 * 391 * @return the decoded frame if a full frame was received and decoded. 392 * {@code null} if there's not enough data in the buffer to decode a frame. 393 */ 394 protected abstract Object decode( 395 ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception; 396 397 /** 398 * Decodes the received data so far into a frame when the channel is 399 * disconnected. 400 * 401 * @param ctx the context of this handler 402 * @param channel the current channel 403 * @param buffer the cumulative buffer of received packets so far. 404 * Note that the buffer might be empty, which means you 405 * should not make an assumption that the buffer contains 406 * at least one byte in your decoder implementation. 407 * 408 * @return the decoded frame if a full frame was received and decoded. 409 * {@code null} if there's not enough data in the buffer to decode a frame. 410 */ 411 protected Object decodeLast( 412 ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception { 413 return decode(ctx, channel, buffer); 414 } 415 416 private void callDecode( 417 ChannelHandlerContext context, Channel channel, 418 ChannelBuffer cumulation, SocketAddress remoteAddress) throws Exception { 419 420 while (cumulation.readable()) { 421 int oldReaderIndex = cumulation.readerIndex(); 422 Object frame = decode(context, channel, cumulation); 423 if (frame == null) { 424 if (oldReaderIndex == cumulation.readerIndex()) { 425 // Seems like more data is required. 426 // Let us wait for the next notification. 427 break; 428 } else { 429 // Previous data has been discarded. 430 // Probably it is reading on. 431 continue; 432 } 433 } else if (oldReaderIndex == cumulation.readerIndex()) { 434 throw new IllegalStateException( 435 "decode() method must read at least one byte " + 436 "if it returned a frame (caused by: " + getClass() + ')'); 437 } 438 439 unfoldAndFireMessageReceived(context, remoteAddress, frame); 440 } 441 } 442 443 protected final void unfoldAndFireMessageReceived( 444 ChannelHandlerContext context, SocketAddress remoteAddress, Object result) { 445 if (unfold) { 446 if (result instanceof Object[]) { 447 for (Object r: (Object[]) result) { 448 Channels.fireMessageReceived(context, r, remoteAddress); 449 } 450 } else if (result instanceof Iterable<?>) { 451 for (Object r: (Iterable<?>) result) { 452 Channels.fireMessageReceived(context, r, remoteAddress); 453 } 454 } else { 455 Channels.fireMessageReceived(context, result, remoteAddress); 456 } 457 } else { 458 Channels.fireMessageReceived(context, result, remoteAddress); 459 } 460 } 461 462 /** 463 * Gets called on {@link #channelDisconnected(ChannelHandlerContext, ChannelStateEvent)} and 464 * {@link #channelClosed(ChannelHandlerContext, ChannelStateEvent)} 465 */ 466 protected void cleanup(ChannelHandlerContext ctx, ChannelStateEvent e) 467 throws Exception { 468 try { 469 ChannelBuffer cumulation = this.cumulation; 470 if (cumulation == null) { 471 return; 472 } 473 474 this.cumulation = null; 475 476 if (cumulation.readable()) { 477 // Make sure all frames are read before notifying a closed channel. 478 callDecode(ctx, ctx.getChannel(), cumulation, null); 479 } 480 481 // Call decodeLast() finally. Please note that decodeLast() is 482 // called even if there's nothing more to read from the buffer to 483 // notify a user that the connection was closed explicitly. 484 Object partialFrame = decodeLast(ctx, ctx.getChannel(), cumulation); 485 if (partialFrame != null) { 486 unfoldAndFireMessageReceived(ctx, null, partialFrame); 487 } 488 } finally { 489 ctx.sendUpstream(e); 490 } 491 } 492 493 /** 494 * Create a new {@link ChannelBuffer} which is used for the cumulation. 495 * Sub-classes may override this. 496 * 497 * @param ctx {@link ChannelHandlerContext} for this handler 498 * @return buffer the {@link ChannelBuffer} which is used for cumulation 499 */ 500 protected ChannelBuffer newCumulationBuffer( 501 ChannelHandlerContext ctx, int minimumCapacity) { 502 ChannelBufferFactory factory = ctx.getChannel().getConfig().getBufferFactory(); 503 return factory.getBuffer(Math.max(minimumCapacity, 256)); 504 } 505 506 /** 507 * Replace this {@link FrameDecoder} in the {@link ChannelPipeline} with the given {@link ChannelHandler}. All 508 * remaining bytes in the {@link ChannelBuffer} will get send to the new {@link ChannelHandler} that was used 509 * as replacement 510 * 511 */ 512 public void replace(String handlerName, ChannelHandler handler) { 513 if (ctx == null) { 514 throw new IllegalStateException( 515 "Replace cann only be called once the FrameDecoder is added to the ChannelPipeline"); 516 } 517 ChannelPipeline pipeline = ctx.getPipeline(); 518 pipeline.addAfter(ctx.getName(), handlerName, handler); 519 520 try { 521 if (cumulation != null) { 522 Channels.fireMessageReceived(ctx, cumulation.readBytes(actualReadableBytes())); 523 } 524 } finally { 525 pipeline.remove(this); 526 } 527 528 } 529 530 /** 531 * Returns the actual number of readable bytes in the internal cumulative 532 * buffer of this decoder. You usually do not need to rely on this value 533 * to write a decoder. Use it only when you muse use it at your own risk. 534 * This method is a shortcut to {@link #internalBuffer() internalBuffer().readableBytes()}. 535 */ 536 protected int actualReadableBytes() { 537 return internalBuffer().readableBytes(); 538 } 539 540 541 542 /** 543 * Returns the internal cumulative buffer of this decoder. You usually 544 * do not need to access the internal buffer directly to write a decoder. 545 * Use it only when you must use it at your own risk. 546 */ 547 protected ChannelBuffer internalBuffer() { 548 ChannelBuffer buf = cumulation; 549 if (buf == null) { 550 return ChannelBuffers.EMPTY_BUFFER; 551 } 552 return buf; 553 } 554 555 public void beforeAdd(ChannelHandlerContext ctx) throws Exception { 556 this.ctx = ctx; 557 } 558 559 public void afterAdd(ChannelHandlerContext ctx) throws Exception { 560 // Nothing to do.. 561 } 562 563 public void beforeRemove(ChannelHandlerContext ctx) throws Exception { 564 // Nothing to do.. 565 } 566 567 public void afterRemove(ChannelHandlerContext ctx) throws Exception { 568 // Nothing to do.. 569 } 570 571 }