View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License, version 2.0 (the
5    * "License"); you may not use this file except in compliance with the License. You may obtain a
6    * copy of the License at:
7    *
8    * https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software distributed under the License
11   * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12   * or implied. See the License for the specific language governing permissions and limitations under
13   * the License.
14   */
15  package io.netty5.example.http2.helloworld.client;
16  
17  import io.netty5.buffer.api.Buffer;
18  import io.netty5.channel.ChannelHandlerContext;
19  import io.netty5.channel.SimpleChannelInboundHandler;
20  import io.netty5.handler.codec.http.FullHttpResponse;
21  import io.netty5.handler.codec.http2.HttpConversionUtil;
22  import io.netty5.util.CharsetUtil;
23  import io.netty5.util.concurrent.Future;
24  import io.netty5.util.concurrent.Promise;
25  
26  import java.util.AbstractMap.SimpleEntry;
27  import java.util.Iterator;
28  import java.util.Map;
29  import java.util.Map.Entry;
30  import java.util.concurrent.ConcurrentHashMap;
31  import java.util.concurrent.TimeUnit;
32  
33  /**
34   * Process {@link FullHttpResponse} translated from HTTP/2 frames
35   */
36  public class HttpResponseHandler extends SimpleChannelInboundHandler<FullHttpResponse> {
37  
38      private final Map<Integer, Entry<Future<Void>, Promise<Void>>> streamidPromiseMap;
39  
40      public HttpResponseHandler() {
41          // Use a concurrent map because we add and iterate from the main thread (just for the purposes of the example),
42          // but Netty also does a get on the map when messages are received in a EventLoop thread.
43          streamidPromiseMap = new ConcurrentHashMap<>();
44      }
45  
46      /**
47       * Create an association between an anticipated response stream id and a {@link Promise}
48       *
49       * @param streamId The stream for which a response is expected
50       * @param writeFuture A future that represent the request write operation
51       * @param promise The promise object that will be used to wait/notify events
52       * @return The previous object associated with {@code streamId}
53       * @see HttpResponseHandler#awaitResponses(long, TimeUnit)
54       */
55      public Entry<Future<Void>, Promise<Void>> put(int streamId, Future<Void> writeFuture, Promise<Void> promise) {
56          return streamidPromiseMap.put(streamId, new SimpleEntry<>(writeFuture, promise));
57      }
58  
59      /**
60       * Wait (sequentially) for a time duration for each anticipated response
61       *
62       * @param timeout Value of time to wait for each response
63       * @param unit Units associated with {@code timeout}
64       * @see HttpResponseHandler#put(int, Future, Promise)
65       */
66      public void awaitResponses(long timeout, TimeUnit unit) throws Exception {
67          Iterator<Entry<Integer, Entry<Future<Void>, Promise<Void>>>> itr = streamidPromiseMap.entrySet().iterator();
68          while (itr.hasNext()) {
69              Entry<Integer, Entry<Future<Void>, Promise<Void>>> entry = itr.next();
70              Future<Void> writeFuture = entry.getValue().getKey();
71              if (!writeFuture.asStage().await(timeout, unit)) {
72                  throw new IllegalStateException("Timed out waiting to write for stream id " + entry.getKey());
73              }
74              if (writeFuture.isFailed()) {
75                  throw new RuntimeException(writeFuture.cause());
76              }
77              Promise<Void> promise = entry.getValue().getValue();
78              if (!promise.asFuture().asStage().await(timeout, unit)) {
79                  throw new IllegalStateException("Timed out waiting for response on stream id " + entry.getKey());
80              }
81              if (promise.isFailed()) {
82                  throw new RuntimeException(promise.cause());
83              }
84              System.out.println("---Stream id: " + entry.getKey() + " received---");
85              itr.remove();
86          }
87      }
88  
89      @Override
90      protected void messageReceived(ChannelHandlerContext ctx, FullHttpResponse msg) throws Exception {
91          Integer streamId = msg.headers().getInt(HttpConversionUtil.ExtensionHeaderNames.STREAM_ID.text());
92          if (streamId == null) {
93              System.err.println("HttpResponseHandler unexpected message received: " + msg);
94              return;
95          }
96  
97          Entry<Future<Void>, Promise<Void>> entry = streamidPromiseMap.get(streamId);
98          if (entry == null) {
99              System.err.println("Message received for unknown stream id " + streamId);
100         } else {
101             // Do stuff with the message (for now just print it)
102             Buffer content = msg.payload();
103             if (content.readableBytes() > 0) {
104                 int contentLength = content.readableBytes();
105                 byte[] arr = new byte[contentLength];
106                 content.readBytes(arr, 0, contentLength);
107                 System.out.println(new String(arr, 0, contentLength, CharsetUtil.UTF_8));
108             }
109 
110             entry.getValue().setSuccess(null);
111         }
112     }
113 }