View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License, version 2.0 (the
5    * "License"); you may not use this file except in compliance with the License. You may obtain a
6    * copy of the License at:
7    *
8    * https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software distributed under the License
11   * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12   * or implied. See the License for the specific language governing permissions and limitations under
13   * the License.
14   */
15  package io.netty.handler.codec.http;
16  
17  import io.netty.buffer.ByteBuf;
18  import io.netty.buffer.Unpooled;
19  import io.netty.channel.ChannelFuture;
20  import io.netty.channel.ChannelFutureListener;
21  import io.netty.channel.ChannelHandlerContext;
22  import io.netty.util.ReferenceCountUtil;
23  import io.netty.util.ReferenceCounted;
24  
25  import java.util.ArrayList;
26  import java.util.Collection;
27  import java.util.List;
28  
29  import static io.netty.handler.codec.http.HttpResponseStatus.SWITCHING_PROTOCOLS;
30  import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
31  import static io.netty.util.AsciiString.containsAllContentEqualsIgnoreCase;
32  import static io.netty.util.AsciiString.containsContentEqualsIgnoreCase;
33  import static io.netty.util.internal.ObjectUtil.checkNotNull;
34  import static io.netty.util.internal.StringUtil.COMMA;
35  
36  /**
37   * A server-side handler that receives HTTP requests and optionally performs a protocol switch if
38   * the requested protocol is supported. Once an upgrade is performed, this handler removes itself
39   * from the pipeline.
40   */
41  public class HttpServerUpgradeHandler extends HttpObjectAggregator {
42  
43      /**
44       * The source codec that is used in the pipeline initially.
45       */
46      public interface SourceCodec {
47          /**
48           * Removes this codec (i.e. all associated handlers) from the pipeline.
49           */
50          void upgradeFrom(ChannelHandlerContext ctx);
51      }
52  
53      /**
54       * A codec that the source can be upgraded to.
55       */
56      public interface UpgradeCodec {
57          /**
58           * Gets all protocol-specific headers required by this protocol for a successful upgrade.
59           * Any supplied header will be required to appear in the {@link HttpHeaderNames#CONNECTION} header as well.
60           */
61          Collection<CharSequence> requiredUpgradeHeaders();
62  
63          /**
64           * Prepares the {@code upgradeHeaders} for a protocol update based upon the contents of {@code upgradeRequest}.
65           * This method returns a boolean value to proceed or abort the upgrade in progress. If {@code false} is
66           * returned, the upgrade is aborted and the {@code upgradeRequest} will be passed through the inbound pipeline
67           * as if no upgrade was performed. If {@code true} is returned, the upgrade will proceed to the next
68           * step which invokes {@link #upgradeTo}. When returning {@code true}, you can add headers to
69           * the {@code upgradeHeaders} so that they are added to the 101 Switching protocols response.
70           */
71          boolean prepareUpgradeResponse(ChannelHandlerContext ctx, FullHttpRequest upgradeRequest,
72                                      HttpHeaders upgradeHeaders);
73  
74          /**
75           * Performs an HTTP protocol upgrade from the source codec. This method is responsible for
76           * adding all handlers required for the new protocol.
77           *
78           * @param ctx the context for the current handler.
79           * @param upgradeRequest the request that triggered the upgrade to this protocol.
80           */
81          void upgradeTo(ChannelHandlerContext ctx, FullHttpRequest upgradeRequest);
82      }
83  
84      /**
85       * Creates a new {@link UpgradeCodec} for the requested protocol name.
86       */
87      public interface UpgradeCodecFactory {
88          /**
89           * Invoked by {@link HttpServerUpgradeHandler} for all the requested protocol names in the order of
90           * the client preference. The first non-{@code null} {@link UpgradeCodec} returned by this method
91           * will be selected.
92           *
93           * @return a new {@link UpgradeCodec}, or {@code null} if the specified protocol name is not supported
94           */
95          UpgradeCodec newUpgradeCodec(CharSequence protocol);
96      }
97  
98      /**
99       * User event that is fired to notify about the completion of an HTTP upgrade
100      * to another protocol. Contains the original upgrade request so that the response
101      * (if required) can be sent using the new protocol.
102      */
103     public static final class UpgradeEvent implements ReferenceCounted {
104         private final CharSequence protocol;
105         private final FullHttpRequest upgradeRequest;
106 
107         UpgradeEvent(CharSequence protocol, FullHttpRequest upgradeRequest) {
108             this.protocol = protocol;
109             this.upgradeRequest = upgradeRequest;
110         }
111 
112         /**
113          * The protocol that the channel has been upgraded to.
114          */
115         public CharSequence protocol() {
116             return protocol;
117         }
118 
119         /**
120          * Gets the request that triggered the protocol upgrade.
121          */
122         public FullHttpRequest upgradeRequest() {
123             return upgradeRequest;
124         }
125 
126         @Override
127         public int refCnt() {
128             return upgradeRequest.refCnt();
129         }
130 
131         @Override
132         public UpgradeEvent retain() {
133             upgradeRequest.retain();
134             return this;
135         }
136 
137         @Override
138         public UpgradeEvent retain(int increment) {
139             upgradeRequest.retain(increment);
140             return this;
141         }
142 
143         @Override
144         public UpgradeEvent touch() {
145             upgradeRequest.touch();
146             return this;
147         }
148 
149         @Override
150         public UpgradeEvent touch(Object hint) {
151             upgradeRequest.touch(hint);
152             return this;
153         }
154 
155         @Override
156         public boolean release() {
157             return upgradeRequest.release();
158         }
159 
160         @Override
161         public boolean release(int decrement) {
162             return upgradeRequest.release(decrement);
163         }
164 
165         @Override
166         public String toString() {
167             return "UpgradeEvent [protocol=" + protocol + ", upgradeRequest=" + upgradeRequest + ']';
168         }
169     }
170 
171     private final SourceCodec sourceCodec;
172     private final UpgradeCodecFactory upgradeCodecFactory;
173     private final HttpHeadersFactory headersFactory;
174     private final HttpHeadersFactory trailersFactory;
175     private final boolean removeAfterFirstRequest;
176     private boolean handlingUpgrade;
177     private boolean failedAggregationStart;
178 
179     /**
180      * Constructs the upgrader with the supported codecs.
181      * <p>
182      * The handler instantiated by this constructor will reject an upgrade request with non-empty content.
183      * It should not be a concern because an upgrade request is most likely a GET request.
184      * If you have a client that sends a non-GET upgrade request, please consider using
185      * {@link #HttpServerUpgradeHandler(SourceCodec, UpgradeCodecFactory, int)} to specify the maximum
186      * length of the content of an upgrade request.
187      * </p>
188      *
189      * @param sourceCodec the codec that is being used initially
190      * @param upgradeCodecFactory the factory that creates a new upgrade codec
191      *                            for one of the requested upgrade protocols
192      */
193     public HttpServerUpgradeHandler(SourceCodec sourceCodec, UpgradeCodecFactory upgradeCodecFactory) {
194         this(sourceCodec, upgradeCodecFactory, 0,
195                 DefaultHttpHeadersFactory.headersFactory(), DefaultHttpHeadersFactory.trailersFactory());
196     }
197 
198     /**
199      * Constructs the upgrader with the supported codecs.
200      *
201      * @param sourceCodec the codec that is being used initially
202      * @param upgradeCodecFactory the factory that creates a new upgrade codec
203      *                            for one of the requested upgrade protocols
204      * @param maxContentLength the maximum length of the content of an upgrade request
205      */
206     public HttpServerUpgradeHandler(
207             SourceCodec sourceCodec, UpgradeCodecFactory upgradeCodecFactory, int maxContentLength) {
208         this(sourceCodec, upgradeCodecFactory, maxContentLength,
209                 DefaultHttpHeadersFactory.headersFactory(), DefaultHttpHeadersFactory.trailersFactory());
210     }
211 
212     /**
213      * Constructs the upgrader with the supported codecs.
214      *
215      * @param sourceCodec the codec that is being used initially
216      * @param upgradeCodecFactory the factory that creates a new upgrade codec
217      *                            for one of the requested upgrade protocols
218      * @param maxContentLength the maximum length of the content of an upgrade request
219      * @param validateHeaders validate the header names and values of the upgrade response.
220      */
221     public HttpServerUpgradeHandler(SourceCodec sourceCodec, UpgradeCodecFactory upgradeCodecFactory,
222                                     int maxContentLength, boolean validateHeaders) {
223         this(sourceCodec, upgradeCodecFactory, maxContentLength,
224                 DefaultHttpHeadersFactory.headersFactory().withValidation(validateHeaders),
225                 DefaultHttpHeadersFactory.trailersFactory().withValidation(validateHeaders));
226     }
227 
228     /**
229      * Constructs the upgrader with the supported codecs.
230      *
231      * @param sourceCodec the codec that is being used initially
232      * @param upgradeCodecFactory the factory that creates a new upgrade codec
233      *                            for one of the requested upgrade protocols
234      * @param maxContentLength the maximum length of the content of an upgrade request
235      * @param headersFactory The {@link HttpHeadersFactory} to use for headers.
236      * The recommended default factory is {@link DefaultHttpHeadersFactory#headersFactory()}.
237      * @param trailersFactory The {@link HttpHeadersFactory} to use for trailers.
238      * The recommended default factory is {@link DefaultHttpHeadersFactory#trailersFactory()}.
239      */
240     public HttpServerUpgradeHandler(
241             SourceCodec sourceCodec, UpgradeCodecFactory upgradeCodecFactory, int maxContentLength,
242             HttpHeadersFactory headersFactory, HttpHeadersFactory trailersFactory) {
243         this(sourceCodec, upgradeCodecFactory, maxContentLength, headersFactory, trailersFactory, false);
244     }
245 
246     /**
247      * Constructs the upgrader with the supported codecs.
248      *
249      * @param sourceCodec the codec that is being used initially
250      * @param upgradeCodecFactory the factory that creates a new upgrade codec
251      *                            for one of the requested upgrade protocols
252      * @param maxContentLength the maximum length of the content of an upgrade request
253      * @param headersFactory The {@link HttpHeadersFactory} to use for headers.
254      * The recommended default factory is {@link DefaultHttpHeadersFactory#headersFactory()}.
255      * @param trailersFactory The {@link HttpHeadersFactory} to use for trailers.
256      * The recommended default factory is {@link DefaultHttpHeadersFactory#trailersFactory()}.
257      * @param removeAfterFirstRequest {@code true} if the handler should remove itself after the first request was
258      *                                processed (even if it was not an upgrade request), {@code false} otherwise.
259      */
260     public HttpServerUpgradeHandler(
261             SourceCodec sourceCodec, UpgradeCodecFactory upgradeCodecFactory, int maxContentLength,
262             HttpHeadersFactory headersFactory, HttpHeadersFactory trailersFactory, boolean removeAfterFirstRequest) {
263         super(maxContentLength);
264 
265         this.sourceCodec = checkNotNull(sourceCodec, "sourceCodec");
266         this.upgradeCodecFactory = checkNotNull(upgradeCodecFactory, "upgradeCodecFactory");
267         this.headersFactory = checkNotNull(headersFactory, "headersFactory");
268         this.trailersFactory = checkNotNull(trailersFactory, "trailersFactory");
269         this.removeAfterFirstRequest = removeAfterFirstRequest;
270     }
271 
272     @Override
273     protected void decode(ChannelHandlerContext ctx, HttpObject msg, List<Object> out)
274             throws Exception {
275 
276         if (!handlingUpgrade) {
277             // Not handling an upgrade request yet. Check if we received a new upgrade request.
278             if (msg instanceof HttpRequest) {
279                 HttpRequest req = (HttpRequest) msg;
280                 if (req.headers().contains(HttpHeaderNames.UPGRADE) &&
281                     shouldHandleUpgradeRequest(req)) {
282                     handlingUpgrade = true;
283                     failedAggregationStart = true; // reset if beginAggregation is called
284                 } else {
285                     if (removeAfterFirstRequest) {
286                         // This is not an upgrade request, just remove this handler.
287                         ctx.pipeline().remove(this);
288                     }
289                     ReferenceCountUtil.retain(msg);
290                     ctx.fireChannelRead(msg);
291                     return;
292                 }
293             } else {
294                 ReferenceCountUtil.retain(msg);
295                 ctx.fireChannelRead(msg);
296                 return;
297             }
298         }
299 
300         FullHttpRequest fullRequest;
301         if (msg instanceof FullHttpRequest) {
302             fullRequest = (FullHttpRequest) msg;
303             ReferenceCountUtil.retain(msg);
304             out.add(msg);
305         } else {
306             // Call the base class to handle the aggregation of the full request.
307             super.decode(ctx, msg, out);
308             if (out.isEmpty()) {
309                 if (msg instanceof LastHttpContent || failedAggregationStart) {
310                     // request failed to aggregate, try with the next request
311                     handlingUpgrade = false;
312                     releaseCurrentMessage();
313                 }
314 
315                 // The full request hasn't been created yet, still awaiting more data.
316                 return;
317             }
318 
319             // Finished aggregating the full request, get it from the output list.
320             assert out.size() == 1;
321             handlingUpgrade = false;
322             fullRequest = (FullHttpRequest) out.get(0);
323         }
324 
325         if (upgrade(ctx, fullRequest)) {
326             // The upgrade was successful, remove the message from the output list
327             // so that it's not propagated to the next handler. This request will
328             // be propagated as a user event instead.
329             out.clear();
330         } else if (removeAfterFirstRequest) {
331             // We handle the first request and were not able to upgrade, just remove this handler.
332             ctx.pipeline().remove(this);
333         }
334 
335         // The upgrade did not succeed, just allow the full request to propagate to the
336         // next handler.
337     }
338 
339     @Override
340     protected FullHttpMessage beginAggregation(HttpMessage start, ByteBuf content) throws Exception {
341         failedAggregationStart = false;
342         return super.beginAggregation(start, content);
343     }
344 
345     /**
346      * Determines whether the specified upgrade {@link HttpRequest} should be handled by this handler or not.
347      * This method will be invoked only when the request contains an {@code Upgrade} header.
348      * It always returns {@code true} by default, which means any request with an {@code Upgrade} header
349      * will be handled. You can override this method to ignore certain {@code Upgrade} headers, for example:
350      * <pre>{@code
351      * @Override
352      * protected boolean isUpgradeRequest(HttpRequest req) {
353      *   // Do not handle WebSocket upgrades.
354      *   return !req.headers().contains(HttpHeaderNames.UPGRADE, "websocket", false);
355      * }
356      * }</pre>
357      */
358     protected boolean shouldHandleUpgradeRequest(HttpRequest req) {
359         return true;
360     }
361 
362     /**
363      * Attempts to upgrade to the protocol(s) identified by the {@link HttpHeaderNames#UPGRADE} header (if provided
364      * in the request).
365      *
366      * @param ctx the context for this handler.
367      * @param request the HTTP request.
368      * @return {@code true} if the upgrade occurred, otherwise {@code false}.
369      */
370     private boolean upgrade(final ChannelHandlerContext ctx, final FullHttpRequest request) {
371         // Select the best protocol based on those requested in the UPGRADE header.
372         final List<CharSequence> requestedProtocols = splitHeader(request.headers().get(HttpHeaderNames.UPGRADE));
373         final int numRequestedProtocols = requestedProtocols.size();
374         UpgradeCodec upgradeCodec = null;
375         CharSequence upgradeProtocol = null;
376         for (int i = 0; i < numRequestedProtocols; i ++) {
377             final CharSequence p = requestedProtocols.get(i);
378             final UpgradeCodec c = upgradeCodecFactory.newUpgradeCodec(p);
379             if (c != null) {
380                 upgradeProtocol = p;
381                 upgradeCodec = c;
382                 break;
383             }
384         }
385 
386         if (upgradeCodec == null) {
387             // None of the requested protocols are supported, don't upgrade.
388             return false;
389         }
390 
391         // Make sure the CONNECTION header is present.
392         List<String> connectionHeaderValues = request.headers().getAll(HttpHeaderNames.CONNECTION);
393 
394         if (connectionHeaderValues == null || connectionHeaderValues.isEmpty()) {
395             return false;
396         }
397 
398         final StringBuilder concatenatedConnectionValue = new StringBuilder(connectionHeaderValues.size() * 10);
399         for (CharSequence connectionHeaderValue : connectionHeaderValues) {
400             concatenatedConnectionValue.append(connectionHeaderValue).append(COMMA);
401         }
402         concatenatedConnectionValue.setLength(concatenatedConnectionValue.length() - 1);
403 
404         // Make sure the CONNECTION header contains UPGRADE as well as all protocol-specific headers.
405         Collection<CharSequence> requiredHeaders = upgradeCodec.requiredUpgradeHeaders();
406         List<CharSequence> values = splitHeader(concatenatedConnectionValue);
407         if (!containsContentEqualsIgnoreCase(values, HttpHeaderNames.UPGRADE) ||
408                 !containsAllContentEqualsIgnoreCase(values, requiredHeaders)) {
409             return false;
410         }
411 
412         // Ensure that all required protocol-specific headers are found in the request.
413         for (CharSequence requiredHeader : requiredHeaders) {
414             if (!request.headers().contains(requiredHeader)) {
415                 return false;
416             }
417         }
418 
419         // Prepare and send the upgrade response. Wait for this write to complete before upgrading,
420         // since we need the old codec in-place to properly encode the response.
421         final FullHttpResponse upgradeResponse = createUpgradeResponse(upgradeProtocol);
422         if (!upgradeCodec.prepareUpgradeResponse(ctx, request, upgradeResponse.headers())) {
423             return false;
424         }
425 
426         // Create the user event to be fired once the upgrade completes.
427         final UpgradeEvent event = new UpgradeEvent(upgradeProtocol, request);
428 
429         // After writing the upgrade response we immediately prepare the
430         // pipeline for the next protocol to avoid a race between completion
431         // of the write future and receiving data before the pipeline is
432         // restructured.
433         try {
434             final ChannelFuture writeComplete = ctx.writeAndFlush(upgradeResponse);
435             // Perform the upgrade to the new protocol.
436             sourceCodec.upgradeFrom(ctx);
437             upgradeCodec.upgradeTo(ctx, request);
438 
439             // Remove this handler from the pipeline.
440             ctx.pipeline().remove(HttpServerUpgradeHandler.this);
441 
442             // Notify that the upgrade has occurred. Retain the event to offset
443             // the release() in the finally block.
444             ctx.fireUserEventTriggered(event.retain());
445 
446             // Add the listener last to avoid firing upgrade logic after
447             // the channel is already closed since the listener may fire
448             // immediately if the write failed eagerly.
449             writeComplete.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
450         } finally {
451             // Release the event if the upgrade event wasn't fired.
452             event.release();
453         }
454         return true;
455     }
456 
457     /**
458      * Creates the 101 Switching Protocols response message.
459      */
460     private FullHttpResponse createUpgradeResponse(CharSequence upgradeProtocol) {
461         DefaultFullHttpResponse res = new DefaultFullHttpResponse(
462                 HTTP_1_1, SWITCHING_PROTOCOLS, Unpooled.EMPTY_BUFFER, headersFactory, trailersFactory);
463         res.headers().add(HttpHeaderNames.CONNECTION, HttpHeaderValues.UPGRADE);
464         res.headers().add(HttpHeaderNames.UPGRADE, upgradeProtocol);
465         return res;
466     }
467 
468     /**
469      * Splits a comma-separated header value. The returned set is case-insensitive and contains each
470      * part with whitespace removed.
471      */
472     private static List<CharSequence> splitHeader(CharSequence header) {
473         final StringBuilder builder = new StringBuilder(header.length());
474         final List<CharSequence> protocols = new ArrayList<CharSequence>(4);
475         for (int i = 0; i < header.length(); ++i) {
476             char c = header.charAt(i);
477             if (Character.isWhitespace(c)) {
478                 // Don't include any whitespace.
479                 continue;
480             }
481             if (c == ',') {
482                 // Add the string and reset the builder for the next protocol.
483                 protocols.add(builder.toString());
484                 builder.setLength(0);
485             } else {
486                 builder.append(c);
487             }
488         }
489 
490         // Add the last protocol
491         if (builder.length() > 0) {
492             protocols.add(builder.toString());
493         }
494 
495         return protocols;
496     }
497 }