1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package io.netty5.example.http2.helloworld.client;
16
17 import io.netty5.buffer.api.Buffer;
18 import io.netty5.channel.ChannelHandlerContext;
19 import io.netty5.channel.SimpleChannelInboundHandler;
20 import io.netty5.handler.codec.http.FullHttpResponse;
21 import io.netty5.handler.codec.http2.HttpConversionUtil;
22 import io.netty5.util.CharsetUtil;
23 import io.netty5.util.concurrent.Future;
24 import io.netty5.util.concurrent.Promise;
25
26 import java.util.AbstractMap.SimpleEntry;
27 import java.util.Iterator;
28 import java.util.Map;
29 import java.util.Map.Entry;
30 import java.util.concurrent.ConcurrentHashMap;
31 import java.util.concurrent.TimeUnit;
32
33
34
35
36 public class HttpResponseHandler extends SimpleChannelInboundHandler<FullHttpResponse> {
37
38 private final Map<Integer, Entry<Future<Void>, Promise<Void>>> streamidPromiseMap;
39
40 public HttpResponseHandler() {
41
42
43 streamidPromiseMap = new ConcurrentHashMap<>();
44 }
45
46
47
48
49
50
51
52
53
54
55 public Entry<Future<Void>, Promise<Void>> put(int streamId, Future<Void> writeFuture, Promise<Void> promise) {
56 return streamidPromiseMap.put(streamId, new SimpleEntry<>(writeFuture, promise));
57 }
58
59
60
61
62
63
64
65
66 public void awaitResponses(long timeout, TimeUnit unit) throws Exception {
67 Iterator<Entry<Integer, Entry<Future<Void>, Promise<Void>>>> itr = streamidPromiseMap.entrySet().iterator();
68 while (itr.hasNext()) {
69 Entry<Integer, Entry<Future<Void>, Promise<Void>>> entry = itr.next();
70 Future<Void> writeFuture = entry.getValue().getKey();
71 if (!writeFuture.asStage().await(timeout, unit)) {
72 throw new IllegalStateException("Timed out waiting to write for stream id " + entry.getKey());
73 }
74 if (writeFuture.isFailed()) {
75 throw new RuntimeException(writeFuture.cause());
76 }
77 Promise<Void> promise = entry.getValue().getValue();
78 if (!promise.asFuture().asStage().await(timeout, unit)) {
79 throw new IllegalStateException("Timed out waiting for response on stream id " + entry.getKey());
80 }
81 if (promise.isFailed()) {
82 throw new RuntimeException(promise.cause());
83 }
84 System.out.println("---Stream id: " + entry.getKey() + " received---");
85 itr.remove();
86 }
87 }
88
89 @Override
90 protected void messageReceived(ChannelHandlerContext ctx, FullHttpResponse msg) throws Exception {
91 Integer streamId = msg.headers().getInt(HttpConversionUtil.ExtensionHeaderNames.STREAM_ID.text());
92 if (streamId == null) {
93 System.err.println("HttpResponseHandler unexpected message received: " + msg);
94 return;
95 }
96
97 Entry<Future<Void>, Promise<Void>> entry = streamidPromiseMap.get(streamId);
98 if (entry == null) {
99 System.err.println("Message received for unknown stream id " + streamId);
100 } else {
101
102 Buffer content = msg.payload();
103 if (content.readableBytes() > 0) {
104 int contentLength = content.readableBytes();
105 byte[] arr = new byte[contentLength];
106 content.readBytes(arr, 0, contentLength);
107 System.out.println(new String(arr, 0, contentLength, CharsetUtil.UTF_8));
108 }
109
110 entry.getValue().setSuccess(null);
111 }
112 }
113 }