View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License, version 2.0 (the
5    * "License"); you may not use this file except in compliance with the License. You may obtain a
6    * copy of the License at:
7    *
8    * http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software distributed under the License
11   * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12   * or implied. See the License for the specific language governing permissions and limitations under
13   * the License.
14   */
15  package io.netty.handler.codec.http;
16  
17  import io.netty.channel.ChannelFuture;
18  import io.netty.channel.ChannelFutureListener;
19  import io.netty.channel.ChannelHandlerContext;
20  import io.netty.handler.codec.AsciiString;
21  import io.netty.util.ReferenceCountUtil;
22  import io.netty.util.ReferenceCounted;
23  
24  import java.util.Collection;
25  import java.util.LinkedHashMap;
26  import java.util.LinkedHashSet;
27  import java.util.List;
28  import java.util.Locale;
29  import java.util.Map;
30  import java.util.Set;
31  import java.util.TreeSet;
32  
33  import static io.netty.handler.codec.http.HttpResponseStatus.*;
34  import static io.netty.handler.codec.http.HttpVersion.*;
35  
36  /**
37   * A server-side handler that receives HTTP requests and optionally performs a protocol switch if
38   * the requested protocol is supported. Once an upgrade is performed, this handler removes itself
39   * from the pipeline.
40   */
41  public class HttpServerUpgradeHandler extends HttpObjectAggregator {
42  
43      /**
44       * The source codec that is used in the pipeline initially.
45       */
46      public interface SourceCodec {
47          /**
48           * Removes this codec (i.e. all associated handlers) from the pipeline.
49           */
50          void upgradeFrom(ChannelHandlerContext ctx);
51      }
52  
53      /**
54       * A codec that the source can be upgraded to.
55       */
56      public interface UpgradeCodec {
57          /**
58           * Returns the name of the protocol supported by this codec, as indicated by the
59           * {@link HttpHeaderNames#UPGRADE} header.
60           */
61          String protocol();
62  
63          /**
64           * Gets all protocol-specific headers required by this protocol for a successful upgrade.
65           * Any supplied header will be required to appear in the {@link HttpHeaderNames#CONNECTION} header as well.
66           */
67          Collection<String> requiredUpgradeHeaders();
68  
69          /**
70           * Adds any headers to the 101 Switching protocols response that are appropriate for this protocol.
71           */
72          void prepareUpgradeResponse(ChannelHandlerContext ctx, FullHttpRequest upgradeRequest,
73                  FullHttpResponse upgradeResponse);
74  
75          /**
76           * Performs an HTTP protocol upgrade from the source codec. This method is responsible for
77           * adding all handlers required for the new protocol.
78           *
79           * @param ctx the context for the current handler.
80           * @param upgradeRequest the request that triggered the upgrade to this protocol. The
81           *            upgraded protocol is responsible for sending the response.
82           * @param upgradeResponse a 101 Switching Protocols response that is populated with the
83           *            {@link HttpHeaderNames#CONNECTION} and {@link HttpHeaderNames#UPGRADE} headers.
84           *            The protocol is required to send this before sending any other frames back to the client.
85           *            The headers may be augmented as necessary by the protocol before sending.
86           */
87          void upgradeTo(ChannelHandlerContext ctx, FullHttpRequest upgradeRequest, FullHttpResponse upgradeResponse);
88      }
89  
90      /**
91       * User event that is fired to notify about the completion of an HTTP upgrade
92       * to another protocol. Contains the original upgrade request so that the response
93       * (if required) can be sent using the new protocol.
94       */
95      public static final class UpgradeEvent implements ReferenceCounted {
96          private final String protocol;
97          private final FullHttpRequest upgradeRequest;
98  
99          private UpgradeEvent(String protocol, FullHttpRequest upgradeRequest) {
100             this.protocol = protocol;
101             this.upgradeRequest = upgradeRequest;
102         }
103 
104         /**
105          * The protocol that the channel has been upgraded to.
106          */
107         public String protocol() {
108             return protocol;
109         }
110 
111         /**
112          * Gets the request that triggered the protocol upgrade.
113          */
114         public FullHttpRequest upgradeRequest() {
115             return upgradeRequest;
116         }
117 
118         @Override
119         public int refCnt() {
120             return upgradeRequest.refCnt();
121         }
122 
123         @Override
124         public UpgradeEvent retain() {
125             upgradeRequest.retain();
126             return this;
127         }
128 
129         @Override
130         public UpgradeEvent retain(int increment) {
131             upgradeRequest.retain(increment);
132             return this;
133         }
134 
135         @Override
136         public UpgradeEvent touch() {
137             upgradeRequest.touch();
138             return this;
139         }
140 
141         @Override
142         public UpgradeEvent touch(Object hint) {
143             upgradeRequest.touch(hint);
144             return this;
145         }
146 
147         @Override
148         public boolean release() {
149             return upgradeRequest.release();
150         }
151 
152         @Override
153         public boolean release(int decrement) {
154             return upgradeRequest.release();
155         }
156 
157         @Override
158         public String toString() {
159             return "UpgradeEvent [protocol=" + protocol + ", upgradeRequest=" + upgradeRequest + ']';
160         }
161     }
162 
163     private final Map<String, UpgradeCodec> upgradeCodecMap;
164     private final SourceCodec sourceCodec;
165     private boolean handlingUpgrade;
166 
167     /**
168      * Constructs the upgrader with the supported codecs.
169      *
170      * @param sourceCodec the codec that is being used initially.
171      * @param upgradeCodecs the codecs (in order of preference) that this server supports
172      *            upgrading to from the source codec.
173      * @param maxContentLength the maximum length of the aggregated content.
174      */
175     public HttpServerUpgradeHandler(SourceCodec sourceCodec,
176             Collection<UpgradeCodec> upgradeCodecs, int maxContentLength) {
177         super(maxContentLength);
178         if (sourceCodec == null) {
179             throw new NullPointerException("sourceCodec");
180         }
181         if (upgradeCodecs == null) {
182             throw new NullPointerException("upgradeCodecs");
183         }
184         this.sourceCodec = sourceCodec;
185         upgradeCodecMap = new LinkedHashMap<String, UpgradeCodec>(upgradeCodecs.size());
186         for (UpgradeCodec upgradeCodec : upgradeCodecs) {
187             String name = upgradeCodec.protocol().toUpperCase(Locale.US);
188             upgradeCodecMap.put(name, upgradeCodec);
189         }
190     }
191 
192     @Override
193     protected void decode(ChannelHandlerContext ctx, HttpObject msg, List<Object> out)
194             throws Exception {
195         // Determine if we're already handling an upgrade request or just starting a new one.
196         handlingUpgrade |= isUpgradeRequest(msg);
197         if (!handlingUpgrade) {
198             // Not handling an upgrade request, just pass it to the next handler.
199             ReferenceCountUtil.retain(msg);
200             out.add(msg);
201             return;
202         }
203 
204         FullHttpRequest fullRequest;
205         if (msg instanceof FullHttpRequest) {
206             fullRequest = (FullHttpRequest) msg;
207             ReferenceCountUtil.retain(msg);
208             out.add(msg);
209         } else {
210             // Call the base class to handle the aggregation of the full request.
211             super.decode(ctx, msg, out);
212             if (out.isEmpty()) {
213                 // The full request hasn't been created yet, still awaiting more data.
214                 return;
215             }
216 
217             // Finished aggregating the full request, get it from the output list.
218             assert out.size() == 1;
219             handlingUpgrade = false;
220             fullRequest = (FullHttpRequest) out.get(0);
221         }
222 
223         if (upgrade(ctx, fullRequest)) {
224             // The upgrade was successful, remove the message from the output list
225             // so that it's not propagated to the next handler. This request will
226             // be propagated as a user event instead.
227             out.clear();
228         }
229 
230         // The upgrade did not succeed, just allow the full request to propagate to the
231         // next handler.
232     }
233 
234     /**
235      * Determines whether or not the message is an HTTP upgrade request.
236      */
237     private static boolean isUpgradeRequest(HttpObject msg) {
238         return msg instanceof HttpRequest && ((HttpRequest) msg).headers().get(HttpHeaderNames.UPGRADE) != null;
239     }
240 
241     /**
242      * Attempts to upgrade to the protocol(s) identified by the {@link HttpHeaderNames#UPGRADE} header (if provided
243      * in the request).
244      *
245      * @param ctx the context for this handler.
246      * @param request the HTTP request.
247      * @return {@code true} if the upgrade occurred, otherwise {@code false}.
248      */
249     private boolean upgrade(final ChannelHandlerContext ctx, final FullHttpRequest request) {
250         // Select the best protocol based on those requested in the UPGRADE header.
251         CharSequence upgradeHeader = request.headers().get(HttpHeaderNames.UPGRADE);
252         final UpgradeCodec upgradeCodec = selectUpgradeCodec(upgradeHeader);
253         if (upgradeCodec == null) {
254             // None of the requested protocols are supported, don't upgrade.
255             return false;
256         }
257 
258         // Make sure the CONNECTION header is present.
259         CharSequence connectionHeader = request.headers().get(HttpHeaderNames.CONNECTION);
260         if (connectionHeader == null) {
261             return false;
262         }
263 
264         // Make sure the CONNECTION header contains UPGRADE as well as all protocol-specific headers.
265         Collection<String> requiredHeaders = upgradeCodec.requiredUpgradeHeaders();
266         Set<CharSequence> values = splitHeader(connectionHeader);
267         if (!values.contains(HttpHeaderNames.UPGRADE) || !values.containsAll(requiredHeaders)) {
268             return false;
269         }
270 
271         // Ensure that all required protocol-specific headers are found in the request.
272         for (String requiredHeader : requiredHeaders) {
273             if (!request.headers().contains(requiredHeader)) {
274                 return false;
275             }
276         }
277 
278         // Create the user event to be fired once the upgrade completes.
279         final UpgradeEvent event = new UpgradeEvent(upgradeCodec.protocol(), request);
280 
281         // Prepare and send the upgrade response. Wait for this write to complete before upgrading,
282         // since we need the old codec in-place to properly encode the response.
283         final FullHttpResponse upgradeResponse = createUpgradeResponse(upgradeCodec);
284         upgradeCodec.prepareUpgradeResponse(ctx, request, upgradeResponse);
285         ctx.writeAndFlush(upgradeResponse).addListener(new ChannelFutureListener() {
286             @Override
287             public void operationComplete(ChannelFuture future) throws Exception {
288                 try {
289                     if (future.isSuccess()) {
290                         // Perform the upgrade to the new protocol.
291                         sourceCodec.upgradeFrom(ctx);
292                         upgradeCodec.upgradeTo(ctx, request, upgradeResponse);
293 
294                         // Notify that the upgrade has occurred. Retain the event to offset
295                         // the release() in the finally block.
296                         ctx.fireUserEventTriggered(event.retain());
297 
298                         // Remove this handler from the pipeline.
299                         ctx.pipeline().remove(HttpServerUpgradeHandler.this);
300                     } else {
301                         future.channel().close();
302                     }
303                 } finally {
304                     // Release the event if the upgrade event wasn't fired.
305                     event.release();
306                 }
307             }
308         });
309         return true;
310     }
311 
312     /**
313      * Looks up the most desirable supported upgrade codec from the list of choices in the UPGRADE
314      * header. If no suitable codec was found, returns {@code null}.
315      */
316     private UpgradeCodec selectUpgradeCodec(CharSequence upgradeHeader) {
317         Set<CharSequence> requestedProtocols = splitHeader(upgradeHeader);
318 
319         // Retain only the protocols that are in the protocol map. Maintain the original insertion
320         // order into the protocolMap, so that the first one in the remaining set is the most
321         // desirable protocol for the server.
322         Set<String> supportedProtocols = new LinkedHashSet<String>(upgradeCodecMap.keySet());
323         supportedProtocols.retainAll(requestedProtocols);
324 
325         if (!supportedProtocols.isEmpty()) {
326             String protocol = supportedProtocols.iterator().next().toUpperCase(Locale.US);
327             return upgradeCodecMap.get(protocol);
328         }
329         return null;
330     }
331 
332     /**
333      * Creates the 101 Switching Protocols response message.
334      */
335     private static FullHttpResponse createUpgradeResponse(UpgradeCodec upgradeCodec) {
336         DefaultFullHttpResponse res = new DefaultFullHttpResponse(HTTP_1_1, SWITCHING_PROTOCOLS);
337         res.headers().add(HttpHeaderNames.CONNECTION, HttpHeaderValues.UPGRADE);
338         res.headers().add(HttpHeaderNames.UPGRADE, upgradeCodec.protocol());
339         res.headers().add(HttpHeaderNames.CONTENT_LENGTH, "0");
340         return res;
341     }
342 
343     /**
344      * Splits a comma-separated header value. The returned set is case-insensitive and contains each
345      * part with whitespace removed.
346      */
347     private static Set<CharSequence> splitHeader(CharSequence header) {
348         StringBuilder builder = new StringBuilder(header.length());
349         Set<CharSequence> protocols = new TreeSet<CharSequence>(AsciiString.CHARSEQUENCE_CASE_INSENSITIVE_ORDER);
350         for (int i = 0; i < header.length(); ++i) {
351             char c = header.charAt(i);
352             if (Character.isWhitespace(c)) {
353                 // Don't include any whitespace.
354                 continue;
355             }
356             if (c == ',') {
357                 // Add the string and reset the builder for the next protocol.
358                 protocols.add(builder.toString());
359                 builder.setLength(0);
360             } else {
361                 builder.append(c);
362             }
363         }
364 
365         // Add the last protocol
366         if (builder.length() > 0) {
367             protocols.add(builder.toString());
368         }
369 
370         return protocols;
371     }
372 }