View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License, version 2.0 (the
5    * "License"); you may not use this file except in compliance with the License. You may obtain a
6    * copy of the License at:
7    *
8    * https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software distributed under the License
11   * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12   * or implied. See the License for the specific language governing permissions and limitations under
13   * the License.
14   */
15  package io.netty.example.http2.helloworld.client;
16  
17  import io.netty.buffer.ByteBuf;
18  import io.netty.channel.ChannelFuture;
19  import io.netty.channel.ChannelHandlerContext;
20  import io.netty.channel.ChannelPromise;
21  import io.netty.channel.SimpleChannelInboundHandler;
22  import io.netty.handler.codec.http.FullHttpResponse;
23  import io.netty.handler.codec.http2.HttpConversionUtil;
24  import io.netty.util.CharsetUtil;
25  import io.netty.util.internal.PlatformDependent;
26  
27  import java.util.AbstractMap.SimpleEntry;
28  import java.util.Iterator;
29  import java.util.Map;
30  import java.util.Map.Entry;
31  import java.util.concurrent.TimeUnit;
32  
33  /**
34   * Process {@link io.netty.handler.codec.http.FullHttpResponse} translated from HTTP/2 frames
35   */
36  public class HttpResponseHandler extends SimpleChannelInboundHandler<FullHttpResponse> {
37  
38      private final Map<Integer, Entry<ChannelFuture, ChannelPromise>> streamidPromiseMap;
39  
40      public HttpResponseHandler() {
41          // Use a concurrent map because we add and iterate from the main thread (just for the purposes of the example),
42          // but Netty also does a get on the map when messages are received in a EventLoop thread.
43          streamidPromiseMap = PlatformDependent.newConcurrentHashMap();
44      }
45  
46      /**
47       * Create an association between an anticipated response stream id and a {@link io.netty.channel.ChannelPromise}
48       *
49       * @param streamId The stream for which a response is expected
50       * @param writeFuture A future that represent the request write operation
51       * @param promise The promise object that will be used to wait/notify events
52       * @return The previous object associated with {@code streamId}
53       * @see HttpResponseHandler#awaitResponses(long, java.util.concurrent.TimeUnit)
54       */
55      public Entry<ChannelFuture, ChannelPromise> put(int streamId, ChannelFuture writeFuture, ChannelPromise promise) {
56          return streamidPromiseMap.put(streamId, new SimpleEntry<ChannelFuture, ChannelPromise>(writeFuture, promise));
57      }
58  
59      /**
60       * Wait (sequentially) for a time duration for each anticipated response
61       *
62       * @param timeout Value of time to wait for each response
63       * @param unit Units associated with {@code timeout}
64       * @see HttpResponseHandler#put(int, io.netty.channel.ChannelFuture, io.netty.channel.ChannelPromise)
65       */
66      public void awaitResponses(long timeout, TimeUnit unit) {
67          Iterator<Entry<Integer, Entry<ChannelFuture, ChannelPromise>>> itr = streamidPromiseMap.entrySet().iterator();
68          while (itr.hasNext()) {
69              Entry<Integer, Entry<ChannelFuture, ChannelPromise>> entry = itr.next();
70              ChannelFuture writeFuture = entry.getValue().getKey();
71              if (!writeFuture.awaitUninterruptibly(timeout, unit)) {
72                  throw new IllegalStateException("Timed out waiting to write for stream id " + entry.getKey());
73              }
74              if (!writeFuture.isSuccess()) {
75                  throw new RuntimeException(writeFuture.cause());
76              }
77              ChannelPromise promise = entry.getValue().getValue();
78              if (!promise.awaitUninterruptibly(timeout, unit)) {
79                  throw new IllegalStateException("Timed out waiting for response on stream id " + entry.getKey());
80              }
81              if (!promise.isSuccess()) {
82                  throw new RuntimeException(promise.cause());
83              }
84              System.out.println("---Stream id: " + entry.getKey() + " received---");
85              itr.remove();
86          }
87      }
88  
89      @Override
90      protected void channelRead0(ChannelHandlerContext ctx, FullHttpResponse msg) throws Exception {
91          Integer streamId = msg.headers().getInt(HttpConversionUtil.ExtensionHeaderNames.STREAM_ID.text());
92          if (streamId == null) {
93              System.err.println("HttpResponseHandler unexpected message received: " + msg);
94              return;
95          }
96  
97          Entry<ChannelFuture, ChannelPromise> entry = streamidPromiseMap.get(streamId);
98          if (entry == null) {
99              System.err.println("Message received for unknown stream id " + streamId);
100         } else {
101             // Do stuff with the message (for now just print it)
102             ByteBuf content = msg.content();
103             if (content.isReadable()) {
104                 int contentLength = content.readableBytes();
105                 byte[] arr = new byte[contentLength];
106                 content.readBytes(arr);
107                 System.out.println(new String(arr, 0, contentLength, CharsetUtil.UTF_8));
108             }
109 
110             entry.getValue().setSuccess();
111         }
112     }
113 }