View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.codec;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufHolder;
20  import io.netty.buffer.CompositeByteBuf;
21  import io.netty.buffer.Unpooled;
22  import io.netty.channel.ChannelFuture;
23  import io.netty.channel.ChannelFutureListener;
24  import io.netty.channel.ChannelHandler;
25  import io.netty.channel.ChannelHandlerContext;
26  import io.netty.util.ReferenceCountUtil;
27  
28  import java.util.List;
29  
30  /**
31   * An abstract {@link ChannelHandler} that aggregates a series of message objects into a single aggregated message.
32   * <p>
33   * 'A series of messages' is composed of the following:
34   * <ul>
35   * <li>a single start message which optionally contains the first part of the content, and</li>
36   * <li>1 or more content messages.</li>
37   * </ul>
38   * The content of the aggregated message will be the merged content of the start message and its following content
39   * messages. If this aggregator encounters a content message where {@link #isLastContentMessage(ByteBufHolder)}
40   * return {@code true} for, the aggregator will finish the aggregation and produce the aggregated message and expect
41   * another start message.
42   * </p>
43   *
44   * @param <I> the type that covers both start message and content message
45   * @param <S> the type of the start message
46   * @param <C> the type of the content message (must be a subtype of {@link ByteBufHolder})
47   * @param <O> the type of the aggregated message (must be a subtype of {@code S} and {@link ByteBufHolder})
48   */
49  public abstract class MessageAggregator<I, S, C extends ByteBufHolder, O extends ByteBufHolder>
50          extends MessageToMessageDecoder<I> {
51  
52      private static final int DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS = 1024;
53  
54      private final int maxContentLength;
55      private O currentMessage;
56      private boolean handlingOversizedMessage;
57  
58      private int maxCumulationBufferComponents = DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS;
59      private ChannelHandlerContext ctx;
60      private ChannelFutureListener continueResponseWriteListener;
61  
62      /**
63       * Creates a new instance.
64       *
65       * @param maxContentLength
66       *        the maximum length of the aggregated content.
67       *        If the length of the aggregated content exceeds this value,
68       *        {@link #handleOversizedMessage(ChannelHandlerContext, Object)} will be called.
69       */
70      protected MessageAggregator(int maxContentLength) {
71          validateMaxContentLength(maxContentLength);
72          this.maxContentLength = maxContentLength;
73      }
74  
75      protected MessageAggregator(int maxContentLength, Class<? extends I> inboundMessageType) {
76          super(inboundMessageType);
77          validateMaxContentLength(maxContentLength);
78          this.maxContentLength = maxContentLength;
79      }
80  
81      private static void validateMaxContentLength(int maxContentLength) {
82          if (maxContentLength <= 0) {
83              throw new IllegalArgumentException("maxContentLength must be a positive integer: " + maxContentLength);
84          }
85      }
86  
87      @Override
88      public boolean acceptInboundMessage(Object msg) throws Exception {
89          // No need to match last and full types because they are subset of first and middle types.
90          if (!super.acceptInboundMessage(msg)) {
91              return false;
92          }
93  
94          @SuppressWarnings("unchecked")
95          I in = (I) msg;
96  
97          return (isContentMessage(in) || isStartMessage(in)) && !isAggregated(in);
98      }
99  
100     /**
101      * Returns {@code true} if and only if the specified message is a start message. Typically, this method is
102      * implemented as a single {@code return} statement with {@code instanceof}:
103      * <pre>
104      * return msg instanceof MyStartMessage;
105      * </pre>
106      */
107     protected abstract boolean isStartMessage(I msg) throws Exception;
108 
109     /**
110      * Returns {@code true} if and only if the specified message is a content message. Typically, this method is
111      * implemented as a single {@code return} statement with {@code instanceof}:
112      * <pre>
113      * return msg instanceof MyContentMessage;
114      * </pre>
115      */
116     protected abstract boolean isContentMessage(I msg) throws Exception;
117 
118     /**
119      * Returns {@code true} if and only if the specified message is the last content message. Typically, this method is
120      * implemented as a single {@code return} statement with {@code instanceof}:
121      * <pre>
122      * return msg instanceof MyLastContentMessage;
123      * </pre>
124      * or with {@code instanceof} and boolean field check:
125      * <pre>
126      * return msg instanceof MyContentMessage && msg.isLastFragment();
127      * </pre>
128      */
129     protected abstract boolean isLastContentMessage(C msg) throws Exception;
130 
131     /**
132      * Returns {@code true} if and only if the specified message is already aggregated.  If this method returns
133      * {@code true}, this handler will simply forward the message to the next handler as-is.
134      */
135     protected abstract boolean isAggregated(I msg) throws Exception;
136 
137     /**
138      * Returns the maximum allowed length of the aggregated message.
139      */
140     public final int maxContentLength() {
141         return maxContentLength;
142     }
143 
144     /**
145      * Returns the maximum number of components in the cumulation buffer.  If the number of
146      * the components in the cumulation buffer exceeds this value, the components of the
147      * cumulation buffer are consolidated into a single component, involving memory copies.
148      * The default value of this property is {@value #DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS}.
149      */
150     public final int maxCumulationBufferComponents() {
151         return maxCumulationBufferComponents;
152     }
153 
154     /**
155      * Sets the maximum number of components in the cumulation buffer.  If the number of
156      * the components in the cumulation buffer exceeds this value, the components of the
157      * cumulation buffer are consolidated into a single component, involving memory copies.
158      * The default value of this property is {@value #DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS}
159      * and its minimum allowed value is {@code 2}.
160      */
161     public final void setMaxCumulationBufferComponents(int maxCumulationBufferComponents) {
162         if (maxCumulationBufferComponents < 2) {
163             throw new IllegalArgumentException(
164                     "maxCumulationBufferComponents: " + maxCumulationBufferComponents +
165                     " (expected: >= 2)");
166         }
167 
168         if (ctx == null) {
169             this.maxCumulationBufferComponents = maxCumulationBufferComponents;
170         } else {
171             throw new IllegalStateException(
172                     "decoder properties cannot be changed once the decoder is added to a pipeline.");
173         }
174     }
175 
176     public final boolean isHandlingOversizedMessage() {
177         return handlingOversizedMessage;
178     }
179 
180     protected final ChannelHandlerContext ctx() {
181         if (ctx == null) {
182             throw new IllegalStateException("not added to a pipeline yet");
183         }
184         return ctx;
185     }
186 
187     @Override
188     protected void decode(final ChannelHandlerContext ctx, I msg, List<Object> out) throws Exception {
189         O currentMessage = this.currentMessage;
190 
191         if (isStartMessage(msg)) {
192             handlingOversizedMessage = false;
193             if (currentMessage != null) {
194                 throw new MessageAggregationException();
195             }
196 
197             @SuppressWarnings("unchecked")
198             S m = (S) msg;
199 
200             // if content length is set, preemptively close if it's too large
201             if (hasContentLength(m)) {
202                 if (contentLength(m) > maxContentLength) {
203                     // handle oversized message
204                     invokeHandleOversizedMessage(ctx, m);
205                     return;
206                 }
207             }
208 
209             // Send the continue response if necessary (e.g. 'Expect: 100-continue' header)
210             Object continueResponse = newContinueResponse(m);
211             if (continueResponse != null) {
212                 // Cache the write listener for reuse.
213                 ChannelFutureListener listener = continueResponseWriteListener;
214                 if (listener == null) {
215                     continueResponseWriteListener = listener = new ChannelFutureListener() {
216                         @Override
217                         public void operationComplete(ChannelFuture future) throws Exception {
218                             if (!future.isSuccess()) {
219                                 ctx.fireExceptionCaught(future.cause());
220                             }
221                         }
222                     };
223                 }
224                 ctx.writeAndFlush(continueResponse).addListener(listener);
225             }
226 
227             if (m instanceof DecoderResultProvider && !((DecoderResultProvider) m).decoderResult().isSuccess()) {
228                 O aggregated;
229                 if (m instanceof ByteBufHolder && ((ByteBufHolder) m).content().isReadable()) {
230                     aggregated = beginAggregation(m, ((ByteBufHolder) m).content().retain());
231                 } else {
232                     aggregated = beginAggregation(m, Unpooled.EMPTY_BUFFER);
233                 }
234                 finishAggregation(aggregated);
235                 out.add(aggregated);
236                 this.currentMessage = null;
237                 return;
238             }
239 
240             // A streamed message - initialize the cumulative buffer, and wait for incoming chunks.
241             CompositeByteBuf content = ctx.alloc().compositeBuffer(maxCumulationBufferComponents);
242             if (m instanceof ByteBufHolder) {
243                 appendPartialContent(content, ((ByteBufHolder) m).content());
244             }
245             this.currentMessage = beginAggregation(m, content);
246 
247         } else if (isContentMessage(msg)) {
248             @SuppressWarnings("unchecked")
249             final C m = (C) msg;
250             final ByteBuf partialContent = ((ByteBufHolder) msg).content();
251             final boolean isLastContentMessage = isLastContentMessage(m);
252             if (handlingOversizedMessage) {
253                 if (isLastContentMessage) {
254                     this.currentMessage = null;
255                 }
256                 // already detect the too long frame so just discard the content
257                 return;
258             }
259 
260             if (currentMessage == null) {
261                 throw new MessageAggregationException();
262             }
263 
264             // Merge the received chunk into the content of the current message.
265             CompositeByteBuf content = (CompositeByteBuf) currentMessage.content();
266 
267             // Handle oversized message.
268             if (content.readableBytes() > maxContentLength - partialContent.readableBytes()) {
269                 // By convention, full message type extends first message type.
270                 @SuppressWarnings("unchecked")
271                 S s = (S) currentMessage;
272                 invokeHandleOversizedMessage(ctx, s);
273                 return;
274             }
275 
276             // Append the content of the chunk.
277             appendPartialContent(content, partialContent);
278 
279             // Give the subtypes a chance to merge additional information such as trailing headers.
280             aggregate(currentMessage, m);
281 
282             final boolean last;
283             if (m instanceof DecoderResultProvider) {
284                 DecoderResult decoderResult = ((DecoderResultProvider) m).decoderResult();
285                 if (!decoderResult.isSuccess()) {
286                     if (currentMessage instanceof DecoderResultProvider) {
287                         ((DecoderResultProvider) currentMessage).setDecoderResult(
288                                 DecoderResult.failure(decoderResult.cause()));
289                     }
290                     last = true;
291                 } else {
292                     last = isLastContentMessage;
293                 }
294             } else {
295                 last = isLastContentMessage;
296             }
297 
298             if (last) {
299                 finishAggregation(currentMessage);
300 
301                 // All done
302                 out.add(currentMessage);
303                 this.currentMessage = null;
304             }
305         } else {
306             throw new MessageAggregationException();
307         }
308     }
309 
310     private static void appendPartialContent(CompositeByteBuf content, ByteBuf partialContent) {
311         if (partialContent.isReadable()) {
312             partialContent.retain();
313             content.addComponent(partialContent);
314             content.writerIndex(content.writerIndex() + partialContent.readableBytes());
315         }
316     }
317 
318     /**
319      * Returns {@code true} if and only if the specified start message already contains the information about the
320      * length of the whole content.
321      */
322     protected abstract boolean hasContentLength(S start) throws Exception;
323 
324     /**
325      * Retrieves the length of the whole content from the specified start message. This method is invoked only when
326      * {@link #hasContentLength(Object)} returned {@code true}.
327      */
328     protected abstract long contentLength(S start) throws Exception;
329 
330     /**
331      * Returns the 'continue response' for the specified start message if necessary. For example, this method is
332      * useful to handle an HTTP 100-continue header.
333      *
334      * @return the 'continue response', or {@code null} if there's no message to send
335      */
336     protected abstract Object newContinueResponse(S start) throws Exception;
337 
338     /**
339      * Creates a new aggregated message from the specified start message and the specified content.  If the start
340      * message implements {@link ByteBufHolder}, its content is appended to the specified {@code content}.
341      * This aggregator will continue to append the received content to the specified {@code content}.
342      */
343     protected abstract O beginAggregation(S start, ByteBuf content) throws Exception;
344 
345     /**
346      * Transfers the information provided by the specified content message to the specified aggregated message.
347      * Note that the content of the specified content message has been appended to the content of the specified
348      * aggregated message already, so that you don't need to.  Use this method to transfer the additional information
349      * that the content message provides to {@code aggregated}.
350      */
351     protected void aggregate(O aggregated, C content) throws Exception { }
352 
353     /**
354      * Invoked when the specified {@code aggregated} message is about to be passed to the next handler in the pipeline.
355      */
356     protected void finishAggregation(O aggregated) throws Exception { }
357 
358     private void invokeHandleOversizedMessage(ChannelHandlerContext ctx, S oversized) throws Exception {
359         handlingOversizedMessage = true;
360         currentMessage = null;
361         try {
362             handleOversizedMessage(ctx, oversized);
363         } finally {
364             // Release the message in case it is a full one.
365             ReferenceCountUtil.release(oversized);
366         }
367     }
368 
369     /**
370      * Invoked when an incoming request exceeds the maximum content length.  The default behvaior is to trigger an
371      * {@code exceptionCaught()} event with a {@link TooLongFrameException}.
372      *
373      * @param ctx the {@link ChannelHandlerContext}
374      * @param oversized the accumulated message up to this point, whose type is {@code S} or {@code O}
375      */
376     protected void handleOversizedMessage(ChannelHandlerContext ctx, S oversized) throws Exception {
377         ctx.fireExceptionCaught(
378                 new TooLongFrameException("content length exceeded " + maxContentLength() + " bytes."));
379     }
380 
381     @Override
382     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
383         // release current message if it is not null as it may be a left-over
384         if (currentMessage != null) {
385             currentMessage.release();
386             currentMessage = null;
387         }
388 
389         super.channelInactive(ctx);
390     }
391 
392     @Override
393     public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
394         this.ctx = ctx;
395     }
396 
397     @Override
398     public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
399         super.handlerRemoved(ctx);
400 
401         // release current message if it is not null as it may be a left-over as there is not much more we can do in
402         // this case
403         if (currentMessage != null) {
404             currentMessage.release();
405             currentMessage = null;
406         }
407     }
408 }