1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  package io.netty5.handler.codec.http2;
16  
17  import io.netty5.buffer.api.Buffer;
18  import io.netty5.channel.ChannelHandlerContext;
19  import io.netty5.handler.codec.http.EmptyHttpHeaders;
20  import io.netty5.handler.codec.http.FullHttpMessage;
21  import io.netty5.handler.codec.http.HttpContent;
22  import io.netty5.handler.codec.http.HttpHeaders;
23  import io.netty5.handler.codec.http.HttpMessage;
24  import io.netty5.handler.codec.http.HttpScheme;
25  import io.netty5.handler.codec.http.LastHttpContent;
26  import io.netty5.handler.codec.http2.Http2CodecUtil.SimpleChannelPromiseAggregator;
27  import io.netty5.util.Resource;
28  import io.netty5.util.concurrent.Future;
29  import io.netty5.util.concurrent.Promise;
30  import io.netty5.util.internal.UnstableApi;
31  
32  
33  
34  
35  
36  
37  @UnstableApi
38  public class HttpToHttp2ConnectionHandler extends Http2ConnectionHandler {
39  
40      private final boolean validateHeaders;
41      private int currentStreamId;
42      private HttpScheme httpScheme;
43  
44      protected HttpToHttp2ConnectionHandler(Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder,
45                                             Http2Settings initialSettings, boolean validateHeaders) {
46          super(decoder, encoder, initialSettings);
47          this.validateHeaders = validateHeaders;
48      }
49  
50      protected HttpToHttp2ConnectionHandler(Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder,
51                                             Http2Settings initialSettings, boolean validateHeaders,
52                                             boolean decoupleCloseAndGoAway) {
53          this(decoder, encoder, initialSettings, validateHeaders, decoupleCloseAndGoAway, null);
54      }
55  
56      protected HttpToHttp2ConnectionHandler(Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder,
57                                             Http2Settings initialSettings, boolean validateHeaders,
58                                             boolean decoupleCloseAndGoAway, HttpScheme httpScheme) {
59          super(decoder, encoder, initialSettings, decoupleCloseAndGoAway);
60          this.validateHeaders = validateHeaders;
61          this.httpScheme = httpScheme;
62      }
63  
64      protected HttpToHttp2ConnectionHandler(Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder,
65                                             Http2Settings initialSettings, boolean validateHeaders,
66                                             boolean decoupleCloseAndGoAway, boolean flushPreface,
67                                             HttpScheme httpScheme) {
68          super(decoder, encoder, initialSettings, decoupleCloseAndGoAway, flushPreface);
69          this.validateHeaders = validateHeaders;
70          this.httpScheme = httpScheme;
71      }
72  
73      
74  
75  
76  
77  
78  
79  
80      private int getStreamId(HttpHeaders httpHeaders) throws Exception {
81          return httpHeaders.getInt(HttpConversionUtil.ExtensionHeaderNames.STREAM_ID.text(),
82                                    connection().local().incrementAndGetNextStreamId());
83      }
84  
85      
86  
87  
88      @Override
89      public Future<Void> write(ChannelHandlerContext ctx, Object msg) {
90          if (!(msg instanceof HttpMessage || msg instanceof HttpContent)) {
91              return ctx.write(msg);
92          }
93  
94          boolean release = true;
95          Promise<Void> promise = ctx.newPromise();
96          SimpleChannelPromiseAggregator promiseAggregator =
97                  new SimpleChannelPromiseAggregator(promise, ctx.executor());
98          try {
99              Http2ConnectionEncoder encoder = encoder();
100             boolean endStream = false;
101             if (msg instanceof HttpMessage) {
102                 final HttpMessage httpMsg = (HttpMessage) msg;
103 
104                 
105                 currentStreamId = getStreamId(httpMsg.headers());
106 
107                 
108                 if (httpScheme != null &&
109                         !httpMsg.headers().contains(HttpConversionUtil.ExtensionHeaderNames.SCHEME.text())) {
110                     httpMsg.headers().set(HttpConversionUtil.ExtensionHeaderNames.SCHEME.text(), httpScheme.name());
111                 }
112 
113                 
114                 Http2Headers http2Headers = HttpConversionUtil.toHttp2Headers(httpMsg, validateHeaders);
115                 endStream = msg instanceof FullHttpMessage && ((FullHttpMessage<?>) msg).payload().readableBytes() == 0;
116                 writeHeaders(ctx, encoder, currentStreamId, httpMsg.headers(), http2Headers,
117                         endStream).cascadeTo(promiseAggregator.newPromise());
118             }
119 
120             if (!endStream && msg instanceof HttpContent) {
121                 boolean isLastContent = false;
122                 HttpHeaders trailers = EmptyHttpHeaders.INSTANCE;
123                 Http2Headers http2Trailers = EmptyHttp2Headers.INSTANCE;
124                 if (msg instanceof LastHttpContent) {
125                     isLastContent = true;
126 
127                     
128                     final LastHttpContent<?> lastContent = (LastHttpContent<?>) msg;
129                     trailers = lastContent.trailingHeaders();
130                     http2Trailers = HttpConversionUtil.toHttp2Headers(trailers, validateHeaders);
131                 }
132 
133                 
134                 final Buffer payload = ((HttpContent<?>) msg).payload();
135                 endStream = isLastContent && trailers.isEmpty();
136                 encoder.writeData(ctx, currentStreamId, payload, 0, endStream)
137                         .cascadeTo(promiseAggregator.newPromise());
138                 release = false;
139 
140                 if (!trailers.isEmpty()) {
141                     
142                     writeHeaders(ctx, encoder, currentStreamId, trailers, http2Trailers, true)
143                             .cascadeTo(promiseAggregator.newPromise());
144                 }
145             }
146         } catch (Throwable t) {
147             onError(ctx, true, t);
148             promiseAggregator.setFailure(t);
149         } finally {
150             if (release) {
151                 Resource.dispose(msg);
152             }
153             promiseAggregator.doneAllocatingPromises();
154         }
155         return promise.asFuture();
156     }
157 
158     private static Future<Void> writeHeaders(ChannelHandlerContext ctx, Http2ConnectionEncoder encoder, int streamId,
159                                      HttpHeaders headers, Http2Headers http2Headers, boolean endStream) {
160         int dependencyId = headers.getInt(
161                 HttpConversionUtil.ExtensionHeaderNames.STREAM_DEPENDENCY_ID.text(), 0);
162         short weight = headers.getShort(
163                 HttpConversionUtil.ExtensionHeaderNames.STREAM_WEIGHT.text(), Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT);
164         return encoder.writeHeaders(ctx, streamId, http2Headers, dependencyId, weight, false,
165                 0, endStream);
166     }
167 }