View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty5.handler.ssl;
17  
18  import io.netty5.buffer.BufferUtil;
19  import io.netty5.buffer.api.Buffer;
20  import io.netty5.buffer.api.BufferAllocator;
21  import io.netty5.buffer.api.CompositeBuffer;
22  import io.netty5.buffer.api.DefaultBufferAllocators;
23  import io.netty5.util.Resource;
24  import io.netty5.buffer.api.StandardAllocationTypes;
25  import io.netty5.channel.AbstractCoalescingBufferQueue;
26  import io.netty5.channel.Channel;
27  import io.netty5.channel.ChannelException;
28  import io.netty5.channel.ChannelHandler;
29  import io.netty5.channel.ChannelHandlerContext;
30  import io.netty5.channel.ChannelOption;
31  import io.netty5.channel.ChannelPipeline;
32  import io.netty5.channel.unix.UnixChannel;
33  import io.netty5.handler.codec.ByteToMessageDecoder;
34  import io.netty5.handler.codec.DecoderException;
35  import io.netty5.handler.codec.UnsupportedMessageTypeException;
36  import io.netty5.util.concurrent.DefaultPromise;
37  import io.netty5.util.concurrent.EventExecutor;
38  import io.netty5.util.concurrent.Future;
39  import io.netty5.util.concurrent.ImmediateEventExecutor;
40  import io.netty5.util.concurrent.ImmediateExecutor;
41  import io.netty5.util.concurrent.Promise;
42  import io.netty5.util.internal.PlatformDependent;
43  import io.netty5.util.internal.SilentDispose;
44  import io.netty5.util.internal.UnstableApi;
45  import io.netty5.util.internal.logging.InternalLogger;
46  import io.netty5.util.internal.logging.InternalLoggerFactory;
47  
48  import javax.net.ssl.SSLEngine;
49  import javax.net.ssl.SSLEngineResult;
50  import javax.net.ssl.SSLEngineResult.HandshakeStatus;
51  import javax.net.ssl.SSLEngineResult.Status;
52  import javax.net.ssl.SSLException;
53  import javax.net.ssl.SSLHandshakeException;
54  import javax.net.ssl.SSLSession;
55  import java.io.IOException;
56  import java.nio.ByteBuffer;
57  import java.nio.channels.ClosedChannelException;
58  import java.nio.channels.DatagramChannel;
59  import java.nio.channels.SocketChannel;
60  import java.util.concurrent.Executor;
61  import java.util.concurrent.RejectedExecutionException;
62  import java.util.concurrent.TimeUnit;
63  import java.util.regex.Pattern;
64  
65  import static io.netty5.handler.ssl.SslUtils.getEncryptedPacketLength;
66  import static io.netty5.util.internal.ObjectUtil.checkPositiveOrZero;
67  import static io.netty5.util.internal.SilentDispose.autoClosing;
68  import static java.util.Objects.requireNonNull;
69  
70  /**
71   * Adds <a href="https://en.wikipedia.org/wiki/Transport_Layer_Security">SSL
72   * &middot; TLS</a> and StartTLS support to a {@link Channel}.  Please refer
73   * to the <strong>"SecureChat"</strong> example in the distribution or the web
74   * site for the detailed usage.
75   *
76   * <h3>Beginning the handshake</h3>
77   * <p>
78   * Beside using the handshake {@link Future} to get notified about the completion of the handshake it's
79   * also possible to detect it by implement the
80   * {@link ChannelHandler#channelInboundEvent(ChannelHandlerContext, Object)}
81   * method and check for a {@link SslHandshakeCompletionEvent}.
82   *
83   * <h3>Handshake</h3>
84   * <p>
85   * The handshake will be automatically issued for you once the {@link Channel} is active and
86   * {@link SSLEngine#getUseClientMode()} returns {@code true}.
87   * So no need to bother with it by your self.
88   *
89   * <h3>Closing the session</h3>
90   * <p>
91   * To close the SSL session, the {@link #closeOutbound()} method should be
92   * called to send the {@code close_notify} message to the remote peer. One
93   * exception is when you close the {@link Channel} - {@link SslHandler}
94   * intercepts the close request and send the {@code close_notify} message
95   * before the channel closure automatically.  Once the SSL session is closed,
96   * it is not reusable, and consequently you should create a new
97   * {@link SslHandler} with a new {@link SSLEngine} as explained in the
98   * following section.
99   *
100  * <h3>Restarting the session</h3>
101  * <p>
102  * To restart the SSL session, you must remove the existing closed
103  * {@link SslHandler} from the {@link ChannelPipeline}, insert a new
104  * {@link SslHandler} with a new {@link SSLEngine} into the pipeline,
105  * and start the handshake process as described in the first section.
106  *
107  * <h3>Implementing StartTLS</h3>
108  * <p>
109  * <a href="https://en.wikipedia.org/wiki/STARTTLS">StartTLS</a> is the
110  * communication pattern that secures the wire in the middle of the plaintext
111  * connection.  Please note that it is different from SSL &middot; TLS, that
112  * secures the wire from the beginning of the connection.  Typically, StartTLS
113  * is composed of three steps:
114  * <ol>
115  * <li>Client sends a StartTLS request to server.</li>
116  * <li>Server sends a StartTLS response to client.</li>
117  * <li>Client begins SSL handshake.</li>
118  * </ol>
119  * If you implement a server, you need to:
120  * <ol>
121  * <li>create a new {@link SslHandler} instance with {@code startTls} flag set
122  *     to {@code true},</li>
123  * <li>insert the {@link SslHandler} to the {@link ChannelPipeline}, and</li>
124  * <li>write a StartTLS response.</li>
125  * </ol>
126  * Please note that you must insert {@link SslHandler} <em>before</em> sending
127  * the StartTLS response.  Otherwise the client can send begin SSL handshake
128  * before {@link SslHandler} is inserted to the {@link ChannelPipeline}, causing
129  * data corruption.
130  * <p>
131  * The client-side implementation is much simpler.
132  * <ol>
133  * <li>Write a StartTLS request,</li>
134  * <li>wait for the StartTLS response,</li>
135  * <li>create a new {@link SslHandler} instance with {@code startTls} flag set
136  *     to {@code false},</li>
137  * <li>insert the {@link SslHandler} to the {@link ChannelPipeline}, and</li>
138  * <li>Initiate SSL handshake.</li>
139  * </ol>
140  *
141  * <h3>Known issues</h3>
142  * <p>
143  * Because of a known issue with the current implementation of the SslEngine that comes
144  * with Java it may be possible that you see blocked IO-Threads while a full GC is done.
145  * <p>
146  * So if you are affected you can workaround this problem by adjust the cache settings
147  * like shown below:
148  *
149  * <pre>
150  *     SslContext context = ...;
151  *     context.getServerSessionContext().setSessionCacheSize(someSaneSize);
152  *     context.getServerSessionContext().setSessionTime(someSameTimeout);
153  * </pre>
154  * <p>
155  * What values to use here depends on the nature of your application and should be set
156  * based on monitoring and debugging of it.
157  * For more details see
158  * <a href="https://github.com/netty/netty/issues/832">#832</a> in our issue tracker.
159  */
160 public class SslHandler extends ByteToMessageDecoder {
161     private static final InternalLogger logger =
162             InternalLoggerFactory.getInstance(SslHandler.class);
163     private static final Pattern IGNORABLE_CLASS_IN_STACK = Pattern.compile(
164             "^.*(?:Socket|Datagram)Channel.*$");
165     private static final Pattern IGNORABLE_ERROR_MESSAGE = Pattern.compile(
166             "^.*(?:connection.*(?:reset|closed|abort|broken)|broken.*pipe).*$", Pattern.CASE_INSENSITIVE);
167     private static final int STATE_SENT_FIRST_MESSAGE = 1;
168     private static final int STATE_FLUSHED_BEFORE_HANDSHAKE = 1 << 1;
169     private static final int STATE_READ_DURING_HANDSHAKE = 1 << 2;
170     private static final int STATE_HANDSHAKE_STARTED = 1 << 3;
171     /**
172      * Set by wrap*() methods when something is produced.
173      * {@link #channelReadComplete(ChannelHandlerContext)} will check this flag, clear it, and call ctx.flush().
174      */
175     private static final int STATE_NEEDS_FLUSH = 1 << 4;
176     private static final int STATE_OUTBOUND_CLOSED = 1 << 5;
177     private static final int STATE_CLOSE_NOTIFY = 1 << 6;
178     private static final int STATE_PROCESS_TASK = 1 << 7;
179     /**
180      * This flag is used to determine if we need to call {@link ChannelHandlerContext#read()} to consume more data
181      * when {@link ChannelOption#AUTO_READ} is {@code false}.
182      */
183     private static final int STATE_FIRE_CHANNEL_READ = 1 << 8;
184     private static final int STATE_UNWRAP_REENTRY = 1 << 9;
185 
186     /**
187      * <a href="https://tools.ietf.org/html/rfc5246#section-6.2">2^14</a> which is the maximum sized plaintext chunk
188      * allowed by the TLS RFC.
189      */
190     private static final int MAX_PLAINTEXT_LENGTH = 16 * 1024;
191 
192     private enum SslEngineType {
193         TCNATIVE(true, COMPOSITE_CUMULATOR) {
194             @Override
195             Buffer allocateWrapBuffer(SslHandler handler, BufferAllocator allocator,
196                                       int pendingBytes, int numComponents) {
197                 ReferenceCountedOpenSslEngine engine = (ReferenceCountedOpenSslEngine) handler.engine;
198                 int maxWrapLength = engine.calculateMaxLengthForWrap(pendingBytes, numComponents);
199                 if (allocator.getAllocationType() != StandardAllocationTypes.OFF_HEAP) {
200                     allocator = DefaultBufferAllocators.offHeapAllocator();
201                 }
202                 return allocator.allocate(maxWrapLength);
203             }
204 
205             @Override
206             int calculatePendingData(SslHandler handler, int guess) {
207                 int sslPending = ((ReferenceCountedOpenSslEngine) handler.engine).sslPending();
208                 return sslPending > 0 ? sslPending : guess;
209             }
210 
211             @Override
212             boolean jdkCompatibilityMode(SSLEngine engine) {
213                 return ((ReferenceCountedOpenSslEngine) engine).jdkCompatibilityMode;
214             }
215         },
216         CONSCRYPT(true, COMPOSITE_CUMULATOR) {
217             @Override
218             Buffer allocateWrapBuffer(SslHandler handler, BufferAllocator allocator,
219                                        int pendingBytes, int numComponents) {
220                 ConscryptAlpnSslEngine engine = (ConscryptAlpnSslEngine) handler.engine;
221                 int size = engine.calculateOutNetBufSize(pendingBytes, numComponents);
222                 if (allocator.getAllocationType() != StandardAllocationTypes.OFF_HEAP) {
223                     allocator = DefaultBufferAllocators.offHeapAllocator();
224                 }
225                 return allocator.allocate(size);
226             }
227 
228             @Override
229             int calculatePendingData(SslHandler handler, int guess) {
230                 return guess;
231             }
232 
233             @Override
234             boolean jdkCompatibilityMode(SSLEngine engine) {
235                 return true;
236             }
237         },
238         JDK(false, MERGE_CUMULATOR) {
239             @Override
240             Buffer allocateWrapBuffer(SslHandler handler, BufferAllocator allocator,
241                                        int pendingBytes, int numComponents) {
242                 // As for the JDK SSLEngine we always need to allocate buffers of the size required by the SSLEngine
243                 // (normally ~16KB). This is required even if the amount of data to encrypt is very small. Use heap
244                 // buffers to reduce the native memory usage.
245                 //
246                 // Beside this the JDK SSLEngine also (as of today) will do an extra heap to direct buffer copy
247                 // if a direct buffer is used as its internals operate on byte[].
248                 if (allocator.getAllocationType() == StandardAllocationTypes.OFF_HEAP) {
249                     allocator = DefaultBufferAllocators.onHeapAllocator();
250                 }
251                 return allocator.allocate(handler.engine.getSession().getPacketBufferSize());
252             }
253 
254             @Override
255             int calculatePendingData(SslHandler handler, int guess) {
256                 return guess;
257             }
258 
259             @Override
260             boolean jdkCompatibilityMode(SSLEngine engine) {
261                 return true;
262             }
263         };
264 
265         static SslEngineType forEngine(SSLEngine engine) {
266             return engine instanceof ReferenceCountedOpenSslEngine ? TCNATIVE :
267                    engine instanceof ConscryptAlpnSslEngine ? CONSCRYPT : JDK;
268         }
269 
270         SslEngineType(boolean wantsDirectBuffer, Cumulator cumulator) {
271             this.wantsDirectBuffer = wantsDirectBuffer;
272             this.cumulator = cumulator;
273         }
274 
275         abstract int calculatePendingData(SslHandler handler, int guess);
276 
277         abstract boolean jdkCompatibilityMode(SSLEngine engine);
278 
279         abstract Buffer allocateWrapBuffer(SslHandler handler, BufferAllocator allocator,
280                                             int pendingBytes, int numComponents);
281 
282         // BEGIN Platform-dependent flags
283 
284         /**
285          * {@code true} if and only if {@link SSLEngine} expects a direct buffer and so if a heap buffer
286          * is given will make an extra memory copy.
287          */
288         final boolean wantsDirectBuffer;
289 
290         // END Platform-dependent flags
291 
292         /**
293          * When using JDK {@link SSLEngine}, we use {@link #MERGE_CUMULATOR} because it works only with
294          * one {@link ByteBuffer}.
295          *
296          * When using {@link OpenSslEngine}, we can use {@link #COMPOSITE_CUMULATOR} because it has
297          * {@link OpenSslEngine#unwrap(ByteBuffer[], ByteBuffer[])} which works with multiple {@link ByteBuffer}s
298          * and which does not need to do extra memory copies.
299          */
300         final Cumulator cumulator;
301     }
302 
303     private volatile ChannelHandlerContext ctx;
304     private final SSLEngine engine;
305     private final SslEngineType engineType;
306     private final Executor delegatedTaskExecutor;
307     private final boolean jdkCompatibilityMode;
308 
309     /**
310      * Used for converting {@link Buffer}s into {@link ByteBuffer}s, in a way that meet the expectations of the
311      * configured {@link SSLEngine}, and then calling the most optimal wrap and unwrap methods.
312      */
313     private final EngineWrapper engineWrapper;
314 
315     private final boolean startTls;
316 
317     private final SslTasksRunner sslTaskRunnerForUnwrap = new SslTasksRunner(true);
318     private final SslTasksRunner sslTaskRunner = new SslTasksRunner(false);
319 
320     private SslHandlerCoalescingBufferQueue pendingUnencryptedWrites;
321     private Promise<Channel> handshakePromise = new LazyPromise();
322     private final Promise<Channel> sslClosePromise = new LazyPromise();
323 
324     private int packetLength;
325     private short state;
326 
327     private volatile long handshakeTimeoutMillis = 10000;
328     private volatile long closeNotifyFlushTimeoutMillis = 3000;
329     private volatile long closeNotifyReadTimeoutMillis;
330     volatile int wrapDataSize = MAX_PLAINTEXT_LENGTH;
331 
332     /**
333      * Creates a new instance which runs all delegated tasks directly on the {@link EventExecutor}.
334      *
335      * @param engine  the {@link SSLEngine} this handler will use
336      */
337     public SslHandler(SSLEngine engine) {
338         this(engine, false);
339     }
340 
341     /**
342      * Creates a new instance which runs all delegated tasks directly on the {@link EventExecutor}.
343      *
344      * @param engine    the {@link SSLEngine} this handler will use
345      * @param startTls  {@code true} if the first write request shouldn't be
346      *                  encrypted by the {@link SSLEngine}
347      */
348     public SslHandler(SSLEngine engine, boolean startTls) {
349         this(engine, startTls, ImmediateExecutor.INSTANCE);
350     }
351 
352     /**
353      * Creates a new instance.
354      *
355      * @param engine  the {@link SSLEngine} this handler will use
356      * @param delegatedTaskExecutor the {@link Executor} that will be used to execute tasks that are returned by
357      *                              {@link SSLEngine#getDelegatedTask()}.
358      */
359     public SslHandler(SSLEngine engine, Executor delegatedTaskExecutor) {
360         this(engine, false, delegatedTaskExecutor);
361     }
362 
363     /**
364      * Creates a new instance.
365      *
366      * @param engine  the {@link SSLEngine} this handler will use
367      * @param startTls  {@code true} if the first write request shouldn't be
368      *                  encrypted by the {@link SSLEngine}
369      * @param delegatedTaskExecutor the {@link Executor} that will be used to execute tasks that are returned by
370      *                              {@link SSLEngine#getDelegatedTask()}.
371      */
372     public SslHandler(SSLEngine engine, boolean startTls, Executor delegatedTaskExecutor) {
373         super(SslEngineType.forEngine(engine).cumulator);
374         requireNonNull(engine, "engine");
375         requireNonNull(delegatedTaskExecutor, "delegatedTaskExecutor");
376         this.engine = engine;
377         engineType = SslEngineType.forEngine(engine);
378         this.delegatedTaskExecutor = delegatedTaskExecutor;
379         this.startTls = startTls;
380         engineWrapper = new EngineWrapper(engine, engineType.wantsDirectBuffer);
381         jdkCompatibilityMode = engineType.jdkCompatibilityMode(engine);
382     }
383 
384     public long getHandshakeTimeoutMillis() {
385         return handshakeTimeoutMillis;
386     }
387 
388     public void setHandshakeTimeout(long handshakeTimeout, TimeUnit unit) {
389         requireNonNull(unit, "unit");
390         setHandshakeTimeoutMillis(unit.toMillis(handshakeTimeout));
391     }
392 
393     public void setHandshakeTimeoutMillis(long handshakeTimeoutMillis) {
394         this.handshakeTimeoutMillis = checkPositiveOrZero(handshakeTimeoutMillis, "handshakeTimeoutMillis");
395     }
396 
397     /**
398      * Sets the number of bytes to pass to each {@link SSLEngine#wrap(ByteBuffer[], int, int, ByteBuffer)} call.
399      * <p>
400      * This value will partition data which is passed to write
401      * {@link #write(ChannelHandlerContext, Object)}. The partitioning will work as follows:
402      * <ul>
403      * <li>If {@code wrapDataSize <= 0} then we will write each data chunk as is.</li>
404      * <li>If {@code wrapDataSize > data size} then we will attempt to aggregate multiple data chunks together.</li>
405      * <li>If {@code wrapDataSize > data size}  Else if {@code wrapDataSize <= data size} then we will divide the data
406      * into chunks of {@code wrapDataSize} when writing.</li>
407      * </ul>
408      * <p>
409      * If the {@link SSLEngine} doesn't support a gather wrap operation (e.g. {@link SslProvider#OPENSSL}) then
410      * aggregating data before wrapping can help reduce the ratio between TLS overhead vs data payload which will lead
411      * to better goodput. Writing fixed chunks of data can also help target the underlying transport's (e.g. TCP)
412      * frame size. Under lossy/congested network conditions this may help the peer get full TLS packets earlier and
413      * be able to do work sooner, as opposed to waiting for the all the pieces of the TLS packet to arrive.
414      * @param wrapDataSize the number of bytes which will be passed to each
415      *      {@link SSLEngine#wrap(ByteBuffer[], int, int, ByteBuffer)} call.
416      */
417     @UnstableApi
418     public final void setWrapDataSize(int wrapDataSize) {
419         this.wrapDataSize = wrapDataSize;
420     }
421 
422     /**
423      * @deprecated use {@link #getCloseNotifyFlushTimeoutMillis()}
424      */
425     @Deprecated
426     public long getCloseNotifyTimeoutMillis() {
427         return getCloseNotifyFlushTimeoutMillis();
428     }
429 
430     /**
431      * @deprecated use {@link #setCloseNotifyFlushTimeout(long, TimeUnit)}
432      */
433     @Deprecated
434     public void setCloseNotifyTimeout(long closeNotifyTimeout, TimeUnit unit) {
435         setCloseNotifyFlushTimeout(closeNotifyTimeout, unit);
436     }
437 
438     /**
439      * @deprecated use {@link #setCloseNotifyFlushTimeoutMillis(long)}
440      */
441     @Deprecated
442     public void setCloseNotifyTimeoutMillis(long closeNotifyFlushTimeoutMillis) {
443         setCloseNotifyFlushTimeoutMillis(closeNotifyFlushTimeoutMillis);
444     }
445 
446     /**
447      * Gets the timeout for flushing the close_notify that was triggered by closing the
448      * {@link Channel}. If the close_notify was not flushed in the given timeout the {@link Channel} will be closed
449      * forcibly.
450      */
451     public final long getCloseNotifyFlushTimeoutMillis() {
452         return closeNotifyFlushTimeoutMillis;
453     }
454 
455     /**
456      * Sets the timeout for flushing the close_notify that was triggered by closing the
457      * {@link Channel}. If the close_notify was not flushed in the given timeout the {@link Channel} will be closed
458      * forcibly.
459      */
460     public final void setCloseNotifyFlushTimeout(long closeNotifyFlushTimeout, TimeUnit unit) {
461         setCloseNotifyFlushTimeoutMillis(unit.toMillis(closeNotifyFlushTimeout));
462     }
463 
464     /**
465      * See {@link #setCloseNotifyFlushTimeout(long, TimeUnit)}.
466      */
467     public final void setCloseNotifyFlushTimeoutMillis(long closeNotifyFlushTimeoutMillis) {
468         this.closeNotifyFlushTimeoutMillis = checkPositiveOrZero(closeNotifyFlushTimeoutMillis,
469                 "closeNotifyFlushTimeoutMillis");
470     }
471 
472     /**
473      * Gets the timeout (in ms) for receiving the response for the close_notify that was triggered by closing the
474      * {@link Channel}. This timeout starts after the close_notify message was successfully written to the
475      * remote peer. Use {@code 0} to directly close the {@link Channel} and not wait for the response.
476      */
477     public final long getCloseNotifyReadTimeoutMillis() {
478         return closeNotifyReadTimeoutMillis;
479     }
480 
481     /**
482      * Sets the timeout  for receiving the response for the close_notify that was triggered by closing the
483      * {@link Channel}. This timeout starts after the close_notify message was successfully written to the
484      * remote peer. Use {@code 0} to directly close the {@link Channel} and not wait for the response.
485      */
486     public final void setCloseNotifyReadTimeout(long closeNotifyReadTimeout, TimeUnit unit) {
487         setCloseNotifyReadTimeoutMillis(unit.toMillis(closeNotifyReadTimeout));
488     }
489 
490     /**
491      * See {@link #setCloseNotifyReadTimeout(long, TimeUnit)}.
492      */
493     public final void setCloseNotifyReadTimeoutMillis(long closeNotifyReadTimeoutMillis) {
494         this.closeNotifyReadTimeoutMillis = checkPositiveOrZero(closeNotifyReadTimeoutMillis,
495                 "closeNotifyReadTimeoutMillis");
496     }
497 
498     /**
499      * Returns the {@link SSLEngine} which is used by this handler.
500      */
501     public SSLEngine engine() {
502         return engine;
503     }
504 
505     /**
506      * Returns the name of the current application-level protocol.
507      *
508      * @return the protocol name or {@code null} if application-level protocol has not been negotiated
509      */
510     public String applicationProtocol() {
511         SSLEngine engine = engine();
512         if (!(engine instanceof ApplicationProtocolAccessor)) {
513             return null;
514         }
515 
516         return ((ApplicationProtocolAccessor) engine).getNegotiatedApplicationProtocol();
517     }
518 
519     /**
520      * Returns a {@link Future} that will get notified once the current TLS handshake completes.
521      *
522      * @return the {@link Future} for the initial TLS handshake if {@link #renegotiate()} was not invoked.
523      *         The {@link Future} for the most recent {@linkplain #renegotiate() TLS renegotiation} otherwise.
524      */
525     public Future<Channel> handshakeFuture() {
526         return handshakePromise.asFuture();
527     }
528 
529     /**
530      * Sends an SSL {@code close_notify} message to the specified channel and
531      * destroys the underlying {@link SSLEngine}. This will <strong>not</strong> close the underlying
532      * {@link Channel}. If you want to also close the {@link Channel} use {@link Channel#close()} or
533      * {@link ChannelHandlerContext#close()}
534      */
535     public Future<Void> closeOutbound() {
536         final ChannelHandlerContext ctx = this.ctx;
537         Promise<Void> promise = ctx.newPromise();
538         if (ctx.executor().inEventLoop()) {
539             closeOutbound0(promise);
540         } else {
541             ctx.executor().execute(() -> closeOutbound0(promise));
542         }
543         return promise.asFuture();
544     }
545 
546     private void closeOutbound0(Promise<Void> promise) {
547         setState(STATE_OUTBOUND_CLOSED);
548         engine.closeOutbound();
549         try {
550             flush(ctx, promise);
551         } catch (Exception e) {
552             if (!promise.tryFailure(e)) {
553                 logger.warn("{} flush() raised a masked exception.", ctx.channel(), e);
554             }
555         }
556     }
557 
558     /**
559      * Return the {@link Future} that will get notified if the inbound of the {@link SSLEngine} is closed.
560      *
561      * This method will return the same {@link Future} all the time.
562      *
563      * @see SSLEngine
564      */
565     public Future<Channel> sslCloseFuture() {
566         return sslClosePromise.asFuture();
567     }
568 
569     @Override
570     public void handlerRemoved0(ChannelHandlerContext ctx) throws Exception {
571         try (AutoCloseable ignore = autoClosing(engine)) {
572             if (pendingUnencryptedWrites != null && !pendingUnencryptedWrites.isEmpty()) {
573                 // Check if queue is not empty first because create a new ChannelException is expensive
574                 pendingUnencryptedWrites.releaseAndFailAll(ctx,
575                   new ChannelException("Pending write on removal of SslHandler"));
576             }
577             pendingUnencryptedWrites = null;
578 
579             SSLException cause = null;
580 
581             // If the handshake or SSLEngine closure is not done yet we should fail corresponding promise and
582             // notify the rest of the
583             // pipeline.
584             if (!handshakePromise.isDone()) {
585                 cause = new SSLHandshakeException("SslHandler removed before handshake completed");
586                 if (handshakePromise.tryFailure(cause)) {
587                     ctx.fireChannelInboundEvent(new SslHandshakeCompletionEvent(
588                             engine().getSession(), applicationProtocol(), cause));
589                 }
590             }
591             if (!sslClosePromise.isDone()) {
592                 if (cause == null) {
593                     cause = new SSLException("SslHandler removed before SSLEngine was closed");
594                 }
595                 notifyClosePromise(cause);
596             }
597         }
598     }
599 
600     @Override
601     public Future<Void> disconnect(final ChannelHandlerContext ctx) {
602         return closeOutboundAndChannel(ctx, true);
603     }
604 
605     @Override
606     public Future<Void> close(final ChannelHandlerContext ctx) {
607         return closeOutboundAndChannel(ctx, false);
608     }
609 
610     @Override
611     public void read(ChannelHandlerContext ctx) {
612         if (!handshakePromise.isDone()) {
613             setState(STATE_READ_DURING_HANDSHAKE);
614         }
615 
616         ctx.read();
617     }
618 
619     private static IllegalStateException newPendingWritesNullException() {
620         return new IllegalStateException("pendingUnencryptedWrites is null, handlerRemoved0 called?");
621     }
622 
623     @Override
624     public Future<Void> write(final ChannelHandlerContext ctx, Object msg) {
625         if (!(msg instanceof Buffer)) {
626             UnsupportedMessageTypeException exception = new UnsupportedMessageTypeException(msg, Buffer.class);
627             logger.warn(exception);
628             SilentDispose.dispose(msg, logger);
629             return ctx.newFailedFuture(exception);
630         }
631         if (pendingUnencryptedWrites == null) {
632             IllegalStateException exception = newPendingWritesNullException();
633             try {
634                 Resource.dispose(msg);
635             } catch (Exception e) {
636                 exception.addSuppressed(e);
637             }
638             return ctx.newFailedFuture(exception);
639         } else {
640             Promise<Void> promise = ctx.newPromise();
641             pendingUnencryptedWrites.add((Buffer) msg, promise);
642             return promise.asFuture();
643         }
644     }
645 
646     @Override
647     public void flush(ChannelHandlerContext ctx) {
648         // Do not encrypt the first write request if this handler is
649         // created with startTLS flag turned on.
650         if (startTls && !isStateSet(STATE_SENT_FIRST_MESSAGE)) {
651             setState(STATE_SENT_FIRST_MESSAGE);
652             pendingUnencryptedWrites.writeAndRemoveAll(ctx);
653             forceFlush(ctx);
654             // Explicit start handshake processing once we send the first message. This will also ensure
655             // we will schedule the timeout if needed.
656             startHandshakeProcessing(true);
657             return;
658         }
659 
660         if (isStateSet(STATE_PROCESS_TASK)) {
661             return;
662         }
663 
664         try {
665             wrapAndFlush(ctx);
666         } catch (Throwable cause) {
667             setHandshakeFailure(ctx, cause);
668         }
669     }
670 
671     private void wrapAndFlush(ChannelHandlerContext ctx) throws SSLException {
672         if (pendingUnencryptedWrites.isEmpty()) {
673             pendingUnencryptedWrites.add(ctx.bufferAllocator().allocate(0), ctx.newPromise());
674         }
675         if (!handshakePromise.isDone()) {
676             setState(STATE_FLUSHED_BEFORE_HANDSHAKE);
677         }
678         try {
679             wrap(ctx, false);
680         } finally {
681             // We may have written some parts of data before an exception was thrown so ensure we always flush.
682             // See https://github.com/netty/netty/issues/3900#issuecomment-172481830
683             forceFlush(ctx);
684         }
685     }
686 
687     // This method will not call setHandshakeFailure(...) !
688     private void wrap(ChannelHandlerContext ctx, boolean inUnwrap) throws SSLException {
689         Buffer out = null;
690         BufferAllocator alloc = ctx.bufferAllocator();
691         try {
692             final int wrapDataSize = this.wrapDataSize;
693             // Only continue to loop if the handler was not removed in the meantime.
694             // See https://github.com/netty/netty/issues/5860
695             outer: while (!ctx.isRemoved()) {
696                 Promise<Void> promise = ctx.newPromise();
697                 Buffer buf = wrapDataSize > 0 ?
698                         pendingUnencryptedWrites.remove(alloc, wrapDataSize, promise) :
699                         pendingUnencryptedWrites.removeFirst(promise);
700                 if (buf == null) {
701                     break;
702                 }
703 
704                 if (out == null) {
705                     out = allocateOutNetBuf(ctx, buf.readableBytes(), buf.countReadableComponents());
706                 }
707 
708                 SSLEngineResult result = wrap(engine, buf, out);
709                 if (buf.readableBytes() > 0) {
710                     pendingUnencryptedWrites.addFirst(buf, promise);
711                     // When we add the buffer/promise pair back we need to be sure we don't complete the promise
712                     // later. We only complete the promise if the buffer is completely consumed.
713                     promise = null;
714                 } else {
715                     buf.close();
716                 }
717 
718                 // We need to write any data before we invoke any methods which may trigger re-entry, otherwise
719                 // writes may occur out of order and TLS sequencing may be off (e.g. SSLV3_ALERT_BAD_RECORD_MAC).
720                 if (out.readableBytes() > 0) {
721                     final Buffer b = out;
722                     out = null;
723                     if (promise != null) {
724                         ctx.write(b).cascadeTo(promise);
725                     } else {
726                         ctx.write(b);
727                     }
728                 } else if (promise != null) {
729                     ctx.write(alloc.allocate(0)).cascadeTo(promise);
730                 }
731                 // else out is not readable we can re-use it and so save an extra allocation
732 
733                 if (result.getStatus() == Status.CLOSED) {
734                     // Make a best effort to preserve any exception that way previously encountered from the handshake
735                     // or the transport, else fallback to a general error.
736                     Throwable exception = handshakePromise.isDone() ? handshakePromise.cause() : null;
737                     if (exception == null) {
738                         exception = sslClosePromise.isDone() ? sslClosePromise.cause() : null;
739                         if (exception == null) {
740                             exception = new SslClosedEngineException("SSLEngine closed already");
741                         }
742                     }
743                     pendingUnencryptedWrites.releaseAndFailAll(ctx, exception);
744                     return;
745                 } else {
746                     switch (result.getHandshakeStatus()) {
747                         case NEED_TASK:
748                             if (!runDelegatedTasks(inUnwrap)) {
749                                 // We scheduled a task on the delegatingTaskExecutor, so stop processing as we will
750                                 // resume once the task completes.
751                                 break outer;
752                             }
753                             break;
754                         case FINISHED:
755                         case NOT_HANDSHAKING: // work around for android bug that skips the FINISHED state.
756                             setHandshakeSuccess();
757                             break;
758                         case NEED_WRAP:
759                             // If we are expected to wrap again and we produced some data we need to ensure there
760                             // is something in the queue to process as otherwise we will not try again before there
761                             // was more added. Failing to do so may fail to produce an alert that can be
762                             // consumed by the remote peer.
763                             if (result.bytesProduced() > 0 && pendingUnencryptedWrites.isEmpty()) {
764                                 pendingUnencryptedWrites.add(alloc.allocate(0));
765                             }
766                             break;
767                         case NEED_UNWRAP:
768                             // The underlying engine is starving so we need to feed it with more data.
769                             // See https://github.com/netty/netty/pull/5039
770                             readIfNeeded(ctx);
771                             return;
772                         default:
773                             throw new IllegalStateException(
774                                     "Unknown handshake status: " + result.getHandshakeStatus());
775                     }
776                 }
777             }
778         } finally {
779             if (out != null) {
780                 out.close();
781             }
782             if (inUnwrap) {
783                 setState(STATE_NEEDS_FLUSH);
784             }
785         }
786     }
787 
788     /**
789      * This method will not call
790      * {@link #setHandshakeFailure(ChannelHandlerContext, Throwable, boolean, boolean, boolean)} or
791      * {@link #setHandshakeFailure(ChannelHandlerContext, Throwable)}.
792      * @return {@code true} if this method ends on {@link SSLEngineResult.HandshakeStatus#NOT_HANDSHAKING}.
793      */
794     private boolean wrapNonAppData(final ChannelHandlerContext ctx, boolean inUnwrap) throws SSLException {
795         Buffer out = null;
796         try {
797             // Only continue to loop if the handler was not removed in the meantime.
798             // See https://github.com/netty/netty/issues/5860
799             outer: while (!ctx.isRemoved()) {
800                 if (out == null) {
801                     // As this is called for the handshake we have no real idea how big the buffer needs to be.
802                     // That said 2048 should give us enough room to include everything like ALPN / NPN data.
803                     // If this is not enough we will increase the buffer in wrap(...).
804                     out = allocateOutNetBuf(ctx, 2048, 1);
805                 }
806                 SSLEngineResult result = wrap(engine, null, out);
807                 if (result.bytesProduced() > 0) {
808                     ctx.write(out).addListener(future -> {
809                         Throwable cause = future.cause();
810                         if (cause != null) {
811                             setHandshakeFailureTransportFailure(ctx, cause);
812                         }
813                     });
814                     if (inUnwrap) {
815                         setState(STATE_NEEDS_FLUSH);
816                     }
817                     out = null;
818                 }
819 
820                 HandshakeStatus status = result.getHandshakeStatus();
821                 switch (status) {
822                     case FINISHED:
823                         // We may be here because we read data and discovered the remote peer initiated a renegotiation
824                         // and this write is to complete the new handshake. The user may have previously done a
825                         // writeAndFlush which wasn't able to wrap data due to needing the pending handshake, so we
826                         // attempt to wrap application data here if any is pending.
827                         if (setHandshakeSuccess() && inUnwrap && !pendingUnencryptedWrites.isEmpty()) {
828                             wrap(ctx, true);
829                         }
830                         return false;
831                     case NEED_TASK:
832                         if (!runDelegatedTasks(inUnwrap)) {
833                             // We scheduled a task on the delegatingTaskExecutor, so stop processing as we will
834                             // resume once the task completes.
835                             break outer;
836                         }
837                         break;
838                     case NEED_UNWRAP:
839                         if (inUnwrap || unwrapNonAppData(ctx) <= 0) {
840                             // If we asked for a wrap, the engine requested an unwrap, and we are in unwrap there is
841                             // no use in trying to call wrap again because we have already attempted (or will after we
842                             // return) to feed more data to the engine.
843                             return false;
844                         }
845                         break;
846                     case NEED_WRAP:
847                         break;
848                     case NOT_HANDSHAKING:
849                         if (setHandshakeSuccess() && inUnwrap && !pendingUnencryptedWrites.isEmpty()) {
850                             wrap(ctx, true);
851                         }
852                         // Workaround for TLS False Start problem reported at:
853                         // https://github.com/netty/netty/issues/1108#issuecomment-14266970
854                         if (!inUnwrap) {
855                             unwrapNonAppData(ctx);
856                         }
857                         return true;
858                     default:
859                         throw new IllegalStateException("Unknown handshake status: " + result.getHandshakeStatus());
860                 }
861 
862                 // Check if did not produce any bytes and if so break out of the loop, but only if we did not process
863                 // a task as last action. It's fine to not produce any data as part of executing a task.
864                 if (result.bytesProduced() == 0 && status != HandshakeStatus.NEED_TASK) {
865                     break;
866                 }
867 
868                 // It should not consume empty buffers when it is not handshaking
869                 // Fix for Android, where it was encrypting empty buffers even when not handshaking
870                 if (result.bytesConsumed() == 0 && result.getHandshakeStatus() == HandshakeStatus.NOT_HANDSHAKING) {
871                     break;
872                 }
873             }
874         }  finally {
875             if (out != null) {
876                 out.close();
877             }
878         }
879         return false;
880     }
881 
882     private SSLEngineResult wrap(SSLEngine engine, Buffer in, Buffer out)
883             throws SSLException {
884         for (;;) {
885             SSLEngineResult result = engineWrapper.wrap(in, out);
886 
887             if (result.getStatus() == Status.BUFFER_OVERFLOW) {
888                 out.ensureWritable(engine.getSession().getPacketBufferSize());
889             } else {
890                 return result;
891             }
892         }
893     }
894 
895     @Override
896     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
897         boolean handshakeFailed = handshakePromise.isFailed();
898 
899         ClosedChannelException exception = new ClosedChannelException();
900         // Make sure to release SSLEngine,
901         // and notify the handshake future if the connection has been closed during handshake.
902         setHandshakeFailure(ctx, exception, !isStateSet(STATE_OUTBOUND_CLOSED), isStateSet(STATE_HANDSHAKE_STARTED),
903                 false);
904 
905         // Ensure we always notify the sslClosePromise as well
906         notifyClosePromise(exception);
907 
908         try {
909             super.channelInactive(ctx);
910         } catch (DecoderException e) {
911             if (!handshakeFailed || !(e.getCause() instanceof SSLException)) {
912                 // We only rethrow the exception if the handshake did not fail before channelInactive(...) was called
913                 // as otherwise this may produce duplicated failures as super.channelInactive(...) will also call
914                 // channelRead(...).
915                 //
916                 // See https://github.com/netty/netty/issues/10119
917                 throw e;
918             }
919         }
920     }
921 
922     @Override
923     public void channelExceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
924         if (ignoreException(cause)) {
925             // It is safe to ignore the 'connection reset by peer' or
926             // 'broken pipe' error after sending close_notify.
927             if (logger.isDebugEnabled()) {
928                 logger.debug(
929                         "{} Swallowing a harmless 'connection reset by peer / broken pipe' error that occurred " +
930                                 "while writing close_notify in response to the peer's close_notify",
931                         ctx.channel(), cause);
932             }
933 
934             // Close the connection explicitly just in case the transport
935             // did not close the connection automatically.
936             if (ctx.channel().isActive()) {
937                 ctx.close();
938             }
939         } else {
940             ctx.fireChannelExceptionCaught(cause);
941 
942             if (cause instanceof SSLException ||
943                 cause instanceof DecoderException && cause.getCause() instanceof SSLException) {
944                 ctx.close();
945             }
946         }
947     }
948 
949     /**
950      * Checks if the given {@link Throwable} can be ignore and just "swallowed"
951      *
952      * When an ssl connection is closed a close_notify message is sent.
953      * After that the peer also sends close_notify however, it's not mandatory to receive
954      * the close_notify. The party who sent the initial close_notify can close the connection immediately
955      * then the peer will get connection reset error.
956      *
957      */
958     private boolean ignoreException(Throwable t) {
959         if (!(t instanceof SSLException) && t instanceof IOException && sslClosePromise.isDone()) {
960             String message = t.getMessage();
961 
962             // first try to match connection reset / broke peer based on the regex. This is the fastest way
963             // but may fail on different jdk impls or OS's
964             if (message != null && IGNORABLE_ERROR_MESSAGE.matcher(message).matches()) {
965                 return true;
966             }
967 
968             // Inspect the StackTraceElements to see if it was a connection reset / broken pipe or not
969             StackTraceElement[] elements = t.getStackTrace();
970             for (StackTraceElement element: elements) {
971                 String classname = element.getClassName();
972                 String methodname = element.getMethodName();
973 
974                 // skip all classes that belong to the io.netty5 package
975                 if (classname.startsWith("io.netty5.")) {
976                     continue;
977                 }
978 
979                 // check if the method name is read if not skip it
980                 if (!"read".equals(methodname)) {
981                     continue;
982                 }
983 
984                 // This will also match against SocketInputStream which is used by openjdk 7 and maybe
985                 // also others
986                 if (IGNORABLE_CLASS_IN_STACK.matcher(classname).matches()) {
987                     return true;
988                 }
989 
990                 try {
991                     // No match by now.. Try to load the class via classloader and inspect it.
992                     // This is mainly done as other JDK implementations may differ in name of
993                     // the impl.
994                     Class<?> clazz = PlatformDependent.getClassLoader(getClass()).loadClass(classname);
995 
996                     if (SocketChannel.class.isAssignableFrom(clazz)
997                             || DatagramChannel.class.isAssignableFrom(clazz)) {
998                         return true;
999                     }
1000                 } catch (Throwable cause) {
1001                     if (logger.isDebugEnabled()) {
1002                         logger.debug("Unexpected exception while loading class {} classname {}",
1003                                 getClass(), classname, cause);
1004                     }
1005                 }
1006             }
1007         }
1008 
1009         return false;
1010     }
1011 
1012     /**
1013      * Returns {@code true} if the given {@link Buffer} is encrypted. Be aware that this method
1014      * will not increase the readerIndex of the given {@link Buffer}.
1015      *
1016      * @param   buffer
1017      *                  The {@link Buffer} to read from. Be aware that it must have at least 5 bytes to read,
1018      *                  otherwise it will throw an {@link IllegalArgumentException}.
1019      * @return encrypted
1020      *                  {@code true} if the {@link Buffer} is encrypted, {@code false} otherwise.
1021      * @throws IllegalArgumentException
1022      *                  Is thrown if the given {@link Buffer} has not at least 5 bytes to read.
1023      */
1024     public static boolean isEncrypted(Buffer buffer) {
1025         if (buffer.readableBytes() < SslUtils.SSL_RECORD_HEADER_LENGTH) {
1026             throw new IllegalArgumentException(
1027                     "buffer must have at least " + SslUtils.SSL_RECORD_HEADER_LENGTH + " readable bytes");
1028         }
1029         return getEncryptedPacketLength(buffer, buffer.readerOffset()) != SslUtils.NOT_ENCRYPTED;
1030     }
1031 
1032     private void decodeJdkCompatible(ChannelHandlerContext ctx, Buffer in) throws NotSslRecordException {
1033         int packetLength = this.packetLength;
1034         // If we calculated the length of the current SSL record before, use that information.
1035         if (packetLength > 0) {
1036             if (in.readableBytes() < packetLength) {
1037                 return;
1038             }
1039         } else {
1040             // Get the packet length and wait until we get a packets worth of data to unwrap.
1041             final int readableBytes = in.readableBytes();
1042             if (readableBytes < SslUtils.SSL_RECORD_HEADER_LENGTH) {
1043                 return;
1044             }
1045             packetLength = getEncryptedPacketLength(in, in.readerOffset());
1046             if (packetLength == SslUtils.NOT_ENCRYPTED) {
1047                 // Not an SSL/TLS packet
1048                 NotSslRecordException e = new NotSslRecordException(
1049                         "not an SSL/TLS record: " + BufferUtil.hexDump(in));
1050                 in.skipReadableBytes(in.readableBytes());
1051 
1052                 // First fail the handshake promise as we may need to have access to the SSLEngine which may
1053                 // be released because the user will remove the SslHandler in an exceptionCaught(...) implementation.
1054                 setHandshakeFailure(ctx, e);
1055 
1056                 throw e;
1057             }
1058             assert packetLength > 0;
1059             if (packetLength > readableBytes) {
1060                 // wait until the whole packet can be read
1061                 this.packetLength = packetLength;
1062                 return;
1063             }
1064         }
1065 
1066         // Reset the state of this class so we can get the length of the next packet. We assume the entire packet will
1067         // be consumed by the SSLEngine.
1068         this.packetLength = 0;
1069         try {
1070             final int bytesConsumed = unwrap(ctx, in, packetLength);
1071             assert bytesConsumed == packetLength || engine.isInboundDone() :
1072                     "we feed the SSLEngine a packets worth of data: " + packetLength + " but it only consumed: " +
1073                             bytesConsumed;
1074         } catch (Throwable cause) {
1075             handleUnwrapThrowable(ctx, cause);
1076         }
1077     }
1078 
1079     private void decodeNonJdkCompatible(ChannelHandlerContext ctx, Buffer in) {
1080         try {
1081             unwrap(ctx, in, in.readableBytes());
1082         } catch (Throwable cause) {
1083             handleUnwrapThrowable(ctx, cause);
1084         }
1085     }
1086 
1087     private void handleUnwrapThrowable(ChannelHandlerContext ctx, Throwable cause) {
1088         try {
1089             // We should attempt to notify the handshake failure before writing any pending data. If we are in unwrap
1090             // and failed during the handshake process, and we attempt to wrap, then promises will fail, and if
1091             // listeners immediately close the Channel then we may end up firing the handshake event after the Channel
1092             // has been closed.
1093             if (handshakePromise.tryFailure(cause)) {
1094                 ctx.fireChannelInboundEvent(new SslHandshakeCompletionEvent(
1095                         engine().getSession(), applicationProtocol(), cause));
1096             }
1097 
1098             // We need to flush one time as there may be an alert that we should send to the remote peer because
1099             // of the SSLException reported here.
1100             wrapAndFlush(ctx);
1101         } catch (SSLException ex) {
1102             logger.debug("SSLException during trying to call SSLEngine.wrap(...)" +
1103                     " because of an previous SSLException, ignoring...", ex);
1104         } finally {
1105             // ensure we always flush and close the channel.
1106             setHandshakeFailure(ctx, cause, true, false, true);
1107         }
1108         PlatformDependent.throwException(cause);
1109     }
1110 
1111     @Override
1112     protected void decode(ChannelHandlerContext ctx, Buffer in) throws SSLException {
1113         if (isStateSet(STATE_PROCESS_TASK)) {
1114             return;
1115         }
1116         if (jdkCompatibilityMode) {
1117             decodeJdkCompatible(ctx, in);
1118         } else {
1119             decodeNonJdkCompatible(ctx, in);
1120         }
1121     }
1122 
1123     @Override
1124     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
1125         channelReadComplete0(ctx);
1126     }
1127 
1128     private void channelReadComplete0(ChannelHandlerContext ctx) {
1129         // Discard bytes of the cumulation buffer if needed.
1130         discardSomeReadBytes();
1131 
1132         flushIfNeeded(ctx);
1133         readIfNeeded(ctx);
1134 
1135         clearState(STATE_FIRE_CHANNEL_READ);
1136         ctx.fireChannelReadComplete();
1137     }
1138 
1139     private void readIfNeeded(ChannelHandlerContext ctx) {
1140         // If handshake is not finished yet, we need more data.
1141         if (!ctx.channel().getOption(ChannelOption.AUTO_READ) &&
1142                 (!isStateSet(STATE_FIRE_CHANNEL_READ) || !handshakePromise.isDone())) {
1143             // No auto-read used and no message passed through the ChannelPipeline or the handshake was not complete
1144             // yet, which means we need to trigger the read to ensure we not encounter any stalls.
1145             ctx.read();
1146         }
1147     }
1148 
1149     private void flushIfNeeded(ChannelHandlerContext ctx) {
1150         if (isStateSet(STATE_NEEDS_FLUSH)) {
1151             forceFlush(ctx);
1152         }
1153     }
1154 
1155     /**
1156      * Calls {@link SSLEngine#unwrap(ByteBuffer, ByteBuffer)} with a null buffer to handle handshakes, etc.
1157      */
1158     private int unwrapNonAppData(ChannelHandlerContext ctx) throws SSLException {
1159         return unwrap(ctx, null, 0);
1160     }
1161 
1162     /**
1163      * Unwraps inbound SSL records.
1164      */
1165     private int unwrap(ChannelHandlerContext ctx, Buffer packet, int length) throws SSLException {
1166         final int originalLength = length;
1167         boolean wrapLater = false;
1168         boolean notifyClosure = false;
1169         boolean executedRead = false;
1170         Buffer decodeOut = allocate(ctx, length);
1171         try {
1172             // Only continue to loop if the handler was not removed in the meantime.
1173             // See https://github.com/netty/netty/issues/5860
1174             do {
1175                 // The engineWrapper.unwrap skips the consumed bytes in the packet buffer immediately,
1176                 // which is useful in case unwrap is called in a re-entry scenario. For example LocalChannel.read()
1177                 // may entry this method in a re-entry fashion and if the peer is writing into a shared buffer we may
1178                 // unwrap the same data multiple times.
1179                 final SSLEngineResult result = engineWrapper.unwrap(packet, length, decodeOut);
1180                 final Status status = result.getStatus();
1181                 final HandshakeStatus handshakeStatus = result.getHandshakeStatus();
1182                 final int produced = result.bytesProduced();
1183                 final int consumed = result.bytesConsumed();
1184                 length -= consumed;
1185 
1186                 // The expected sequence of events is:
1187                 // 1. Notify of handshake success
1188                 // 2. fireChannelRead for unwrapped data
1189                 if (handshakeStatus == HandshakeStatus.FINISHED || handshakeStatus == HandshakeStatus.NOT_HANDSHAKING) {
1190                     wrapLater |= (decodeOut.readableBytes() > 0 ?
1191                             setHandshakeSuccessUnwrapMarkReentry() : setHandshakeSuccess()) ||
1192                             handshakeStatus == HandshakeStatus.FINISHED;
1193                 }
1194 
1195                 // Dispatch decoded data after we have notified of handshake success. If this method has been invoked
1196                 // in a re-entry fashion we execute a task on the executor queue to process after the stack unwinds
1197                 // to preserve order of events.
1198                 if (decodeOut.readableBytes() > 0) {
1199                     setState(STATE_FIRE_CHANNEL_READ);
1200                     if (isStateSet(STATE_UNWRAP_REENTRY)) {
1201                         executedRead = true;
1202                         executeChannelRead(ctx, decodeOut);
1203                     } else {
1204                         ctx.fireChannelRead(decodeOut);
1205                     }
1206                     decodeOut = null;
1207                 }
1208 
1209                 if (status == Status.CLOSED) {
1210                     notifyClosure = true; // Notify about the CLOSED state of the SSLEngine. See #137
1211                 } else if (status == Status.BUFFER_OVERFLOW) {
1212                     if (decodeOut != null) {
1213                         decodeOut.close();
1214                     }
1215                     final int applicationBufferSize = engine.getSession().getApplicationBufferSize();
1216                     // Allocate a new buffer which can hold all the rest data and loop again.
1217                     // It may happen that applicationBufferSize < produced while there is still more to unwrap, in this
1218                     // case we will just allocate a new buffer with the capacity of applicationBufferSize and call
1219                     // unwrap again.
1220                     decodeOut = allocate(ctx, engineType.calculatePendingData(this, applicationBufferSize < produced ?
1221                             applicationBufferSize : applicationBufferSize - produced));
1222                     continue;
1223                 }
1224 
1225                 if (handshakeStatus == HandshakeStatus.NEED_TASK) {
1226                     boolean pending = runDelegatedTasks(true);
1227                     if (!pending) {
1228                         // We scheduled a task on the delegatingTaskExecutor, so stop processing as we will
1229                         // resume once the task completes.
1230                         //
1231                         // We break out of the loop only and do NOT return here as we still may need to notify
1232                         // about the closure of the SSLEngine.
1233                         wrapLater = false;
1234                         break;
1235                     }
1236                 } else if (handshakeStatus == HandshakeStatus.NEED_WRAP) {
1237                     // If the wrap operation transitions the status to NOT_HANDSHAKING and there is no more data to
1238                     // unwrap then the next call to unwrap will not produce any data. We can avoid the potentially
1239                     // costly unwrap operation and break out of the loop.
1240                     if (wrapNonAppData(ctx, true) && length == 0) {
1241                         break;
1242                     }
1243                 }
1244 
1245                 if (status == Status.BUFFER_UNDERFLOW ||
1246                         // If we processed NEED_TASK we should try again even we did not consume or produce anything.
1247                         handshakeStatus != HandshakeStatus.NEED_TASK &&
1248                         (consumed == 0 && produced == 0 ||
1249                          length == 0 && handshakeStatus == HandshakeStatus.NOT_HANDSHAKING)) {
1250                     if (handshakeStatus == HandshakeStatus.NEED_UNWRAP) {
1251                         // The underlying engine is starving so we need to feed it with more data.
1252                         // See https://github.com/netty/netty/pull/5039
1253                         readIfNeeded(ctx);
1254                     }
1255 
1256                     break;
1257                 } else if (decodeOut == null) {
1258                     decodeOut = allocate(ctx, length);
1259                 }
1260             } while (!ctx.isRemoved());
1261 
1262             if (isStateSet(STATE_FLUSHED_BEFORE_HANDSHAKE) && handshakePromise.isDone()) {
1263                 // We need to call wrap(...) in case there was a flush done before the handshake completed to ensure
1264                 // we do not stale.
1265                 //
1266                 // See https://github.com/netty/netty/pull/2437
1267                 clearState(STATE_FLUSHED_BEFORE_HANDSHAKE);
1268                 wrapLater = true;
1269             }
1270 
1271             if (wrapLater) {
1272                 wrap(ctx, true);
1273             }
1274         } finally {
1275             if (decodeOut != null) {
1276                 decodeOut.close();
1277             }
1278 
1279             if (notifyClosure) {
1280                 if (executedRead) {
1281                     executeNotifyClosePromise(ctx);
1282                 } else {
1283                     notifyClosePromise(null);
1284                 }
1285             }
1286         }
1287         return originalLength - length;
1288     }
1289 
1290     private boolean setHandshakeSuccessUnwrapMarkReentry() {
1291         // setHandshakeSuccess calls out to external methods which may trigger re-entry. We need to preserve ordering of
1292         // fireChannelRead for decodeOut relative to re-entry data.
1293         final boolean setReentryState = !isStateSet(STATE_UNWRAP_REENTRY);
1294         if (setReentryState) {
1295             setState(STATE_UNWRAP_REENTRY);
1296         }
1297         try {
1298             return setHandshakeSuccess();
1299         } finally {
1300             // It is unlikely this specific method will be re-entry because handshake completion is infrequent, but just
1301             // in case we only clear the state if we set it in the first place.
1302             if (setReentryState) {
1303                 clearState(STATE_UNWRAP_REENTRY);
1304             }
1305         }
1306     }
1307 
1308     private void executeNotifyClosePromise(final ChannelHandlerContext ctx) {
1309         try {
1310             ctx.executor().execute(() -> notifyClosePromise(null));
1311         } catch (RejectedExecutionException e) {
1312             notifyClosePromise(e);
1313         }
1314     }
1315 
1316     private static void executeChannelRead(final ChannelHandlerContext ctx, final Buffer decodedOut) {
1317         try {
1318             ctx.executor().execute(() -> ctx.fireChannelRead(decodedOut));
1319         } catch (RejectedExecutionException e) {
1320             decodedOut.close();
1321             throw e;
1322         }
1323     }
1324 
1325     private static boolean inEventLoop(Executor executor) {
1326         return executor instanceof EventExecutor && ((EventExecutor) executor).inEventLoop();
1327     }
1328 
1329     /**
1330      * Will either run the delegated task directly calling {@link Runnable#run()} and return {@code true} or will
1331      * offload the delegated task using {@link Executor#execute(Runnable)} and return {@code false}.
1332      *
1333      * If the task is offloaded it will take care to resume its work on the {@link EventExecutor} once there are no
1334      * more tasks to process.
1335      */
1336     private boolean runDelegatedTasks(boolean inUnwrap) {
1337         if (delegatedTaskExecutor == ImmediateExecutor.INSTANCE || inEventLoop(delegatedTaskExecutor)) {
1338             // We should run the task directly in the EventExecutor thread and not offload at all. As we are on the
1339             // EventLoop we can just run all tasks at once.
1340             for (;;) {
1341                 Runnable task = engine.getDelegatedTask();
1342                 if (task == null) {
1343                     return true;
1344                 }
1345                 setState(STATE_PROCESS_TASK);
1346                 if (task instanceof AsyncRunnable) {
1347                     // Let's set the task to processing task before we try to execute it.
1348                     boolean pending = false;
1349                     try {
1350                         AsyncRunnable asyncTask = (AsyncRunnable) task;
1351                         AsyncTaskCompletionHandler completionHandler = new AsyncTaskCompletionHandler(inUnwrap);
1352                         asyncTask.run(completionHandler);
1353                         pending = completionHandler.resumeLater();
1354                         if (pending) {
1355                             return false;
1356                         }
1357                     } finally {
1358                         if (!pending) {
1359                             // The task has completed, lets clear the state. If it is not completed we will clear the
1360                             // state once it is.
1361                             clearState(STATE_PROCESS_TASK);
1362                         }
1363                     }
1364                 } else {
1365                     try {
1366                         task.run();
1367                     } finally {
1368                         clearState(STATE_PROCESS_TASK);
1369                     }
1370                 }
1371             }
1372         } else {
1373             executeDelegatedTask(inUnwrap);
1374             return false;
1375         }
1376     }
1377 
1378     private SslTasksRunner getTaskRunner(boolean inUnwrap) {
1379         return inUnwrap ? sslTaskRunnerForUnwrap : sslTaskRunner;
1380     }
1381 
1382     private void executeDelegatedTask(boolean inUnwrap) {
1383         executeDelegatedTask(getTaskRunner(inUnwrap));
1384     }
1385 
1386     private void executeDelegatedTask(SslTasksRunner task) {
1387         setState(STATE_PROCESS_TASK);
1388         try {
1389             delegatedTaskExecutor.execute(task);
1390         } catch (RejectedExecutionException e) {
1391             clearState(STATE_PROCESS_TASK);
1392             throw e;
1393         }
1394     }
1395 
1396     private final class AsyncTaskCompletionHandler implements Runnable {
1397         private final boolean inUnwrap;
1398         boolean didRun;
1399         boolean resumeLater;
1400 
1401         AsyncTaskCompletionHandler(boolean inUnwrap) {
1402             this.inUnwrap = inUnwrap;
1403         }
1404 
1405         @Override
1406         public void run() {
1407             didRun = true;
1408             if (resumeLater) {
1409                 getTaskRunner(inUnwrap).runComplete();
1410             }
1411         }
1412 
1413         boolean resumeLater() {
1414             if (!didRun) {
1415                 resumeLater = true;
1416                 return true;
1417             }
1418             return false;
1419         }
1420     }
1421 
1422     /**
1423      * {@link Runnable} that will be scheduled on the {@code delegatedTaskExecutor} and will take care
1424      * of resume work on the {@link EventExecutor} once the task was executed.
1425      */
1426     private final class SslTasksRunner implements Runnable {
1427         private final boolean inUnwrap;
1428         private final Runnable runCompleteTask = this::runComplete;
1429 
1430         SslTasksRunner(boolean inUnwrap) {
1431             this.inUnwrap = inUnwrap;
1432         }
1433 
1434         // Handle errors which happened during task processing.
1435         private void taskError(Throwable e) {
1436             if (inUnwrap) {
1437                 // As the error happened while the task was scheduled as part of unwrap(...) we also need to ensure
1438                 // we fire it through the pipeline as inbound error to be consistent with what we do in decode(...).
1439                 //
1440                 // This will also ensure we fail the handshake future and flush all produced data.
1441                 try {
1442                     handleUnwrapThrowable(ctx, e);
1443                 } catch (Throwable cause) {
1444                     safeExceptionCaught(cause);
1445                 }
1446             } else {
1447                 setHandshakeFailure(ctx, e);
1448                 forceFlush(ctx);
1449             }
1450         }
1451 
1452         // Try to call exceptionCaught(...)
1453         private void safeExceptionCaught(Throwable cause) {
1454             try {
1455                 channelExceptionCaught(ctx, wrapIfNeeded(cause));
1456             } catch (Throwable error) {
1457                 ctx.fireChannelExceptionCaught(error);
1458             }
1459         }
1460 
1461         private Throwable wrapIfNeeded(Throwable cause) {
1462             if (!inUnwrap) {
1463                 // If we are not in unwrap(...) we can just rethrow without wrapping at all.
1464                 return cause;
1465             }
1466             // As the exception would have been triggered by an inbound operation we will need to wrap it in a
1467             // DecoderException to mimic what a decoder would do when decode(...) throws.
1468             return cause instanceof DecoderException ? cause : new DecoderException(cause);
1469         }
1470 
1471         private void tryDecodeAgain() {
1472             try {
1473                 channelRead(ctx, ctx.bufferAllocator().allocate(0));
1474             } catch (Throwable cause) {
1475                 safeExceptionCaught(cause);
1476             } finally {
1477                 // As we called channelRead(...) we also need to call channelReadComplete(...) which
1478                 // will ensure we either call ctx.fireChannelReadComplete() or will trigger a ctx.read() if
1479                 // more data is needed.
1480                 channelReadComplete0(ctx);
1481             }
1482         }
1483 
1484         /**
1485          * Executed after the wrapped {@code task} was executed via {@code delegatedTaskExecutor} to resume work
1486          * on the {@link EventExecutor}.
1487          */
1488         private void resumeOnEventExecutor() {
1489             assert ctx.executor().inEventLoop();
1490             clearState(STATE_PROCESS_TASK);
1491             try {
1492                 HandshakeStatus status = engine.getHandshakeStatus();
1493                 switch (status) {
1494                     // There is another task that needs to be executed and offloaded to the delegatingTaskExecutor as
1495                     // a result of this. Let's reschedule....
1496                     case NEED_TASK:
1497                         executeDelegatedTask(this);
1498 
1499                         break;
1500 
1501                     // The handshake finished, lets notify about the completion of it and resume processing.
1502                     case FINISHED:
1503                     // Not handshaking anymore, lets notify about the completion if not done yet and resume processing.
1504                     case NOT_HANDSHAKING:
1505                         setHandshakeSuccess(); // NOT_HANDSHAKING -> workaround for android skipping FINISHED state.
1506                         try {
1507                             // Lets call wrap to ensure we produce the alert if there is any pending and also to
1508                             // ensure we flush any queued data..
1509                             wrap(ctx, inUnwrap);
1510                         } catch (Throwable e) {
1511                             taskError(e);
1512                             return;
1513                         }
1514                         if (inUnwrap) {
1515                             // If we were in the unwrap call when the task was processed we should also try to unwrap
1516                             // non app data first as there may not anything left in the inbound buffer to process.
1517                             unwrapNonAppData(ctx);
1518                         }
1519 
1520                         // Flush now as we may have written some data as part of the wrap call.
1521                         forceFlush(ctx);
1522 
1523                         tryDecodeAgain();
1524                         break;
1525 
1526                     // We need more data so lets try to unwrap first and then call decode again which will feed us
1527                     // with buffered data (if there is any).
1528                     case NEED_UNWRAP:
1529                         try {
1530                             unwrapNonAppData(ctx);
1531                         } catch (SSLException e) {
1532                             handleUnwrapThrowable(ctx, e);
1533                             return;
1534                         }
1535                         tryDecodeAgain();
1536                         break;
1537 
1538                     // To make progress we need to call SSLEngine.wrap(...) which may produce more output data
1539                     // that will be written to the Channel.
1540                     case NEED_WRAP:
1541                         try {
1542                             if (!wrapNonAppData(ctx, false) && inUnwrap) {
1543                                 // The handshake finished in wrapNonAppData(...), we need to try call
1544                                 // unwrapNonAppData(...) as we may have some alert that we should read.
1545                                 //
1546                                 // This mimics what we would do when we are calling this method while in unwrap(...).
1547                                 unwrapNonAppData(ctx);
1548                             }
1549 
1550                             // Flush now as we may have written some data as part of the wrap call.
1551                             forceFlush(ctx);
1552                         } catch (Throwable e) {
1553                             taskError(e);
1554                             return;
1555                         }
1556 
1557                         // Now try to feed in more data that we have buffered.
1558                         tryDecodeAgain();
1559                         break;
1560 
1561                     default:
1562                         // Should never reach here as we handle all cases.
1563                         throw new AssertionError();
1564                 }
1565             } catch (Throwable cause) {
1566                 safeExceptionCaught(cause);
1567             }
1568         }
1569 
1570         void runComplete() {
1571             EventExecutor executor = ctx.executor();
1572             // Jump back on the EventExecutor. We do this even if we are already on the EventLoop to guard against
1573             // reentrancy issues. Failing to do so could lead to the situation of tryDecode(...) be called and so
1574             // channelRead(...) while still in the decode loop. In this case channelRead(...) might release the input
1575             // buffer if its empty which would then result in an IllegalReferenceCountException when we try to continue
1576             // decoding.
1577             //
1578             // See https://github.com/netty/netty-tcnative/issues/680
1579             executor.execute(this::resumeOnEventExecutor);
1580         }
1581 
1582         @Override
1583         public void run() {
1584             try {
1585                 Runnable task = engine.getDelegatedTask();
1586                 if (task == null) {
1587                     // The task was processed in the meantime. Let's just return.
1588                     return;
1589                 }
1590                 if (task instanceof AsyncRunnable) {
1591                     AsyncRunnable asyncTask = (AsyncRunnable) task;
1592                     asyncTask.run(runCompleteTask);
1593                 } else {
1594                     task.run();
1595                     runComplete();
1596                 }
1597             } catch (Throwable cause) {
1598                 handleException(cause);
1599             }
1600         }
1601 
1602         private void handleException(final Throwable cause) {
1603             EventExecutor executor = ctx.executor();
1604             if (executor.inEventLoop()) {
1605                 clearState(STATE_PROCESS_TASK);
1606                 safeExceptionCaught(cause);
1607             } else {
1608                 try {
1609                     executor.execute(() -> {
1610                         clearState(STATE_PROCESS_TASK);
1611                         safeExceptionCaught(cause);
1612                     });
1613                 } catch (RejectedExecutionException ignore) {
1614                     clearState(STATE_PROCESS_TASK);
1615                     // the context itself will handle the rejected exception when try to schedule the operation so
1616                     // ignore the RejectedExecutionException
1617                     ctx.fireChannelExceptionCaught(cause);
1618                 }
1619             }
1620         }
1621     }
1622 
1623     /**
1624      * Notify all the handshake futures about the successfully handshake
1625      * @return {@code true} if {@link #handshakePromise} was set successfully and a {@link SslHandshakeCompletionEvent}
1626      * was fired. {@code false} otherwise.
1627      */
1628     private boolean setHandshakeSuccess() {
1629         // Our control flow may invoke this method multiple times for a single FINISHED event. For example
1630         // wrapNonAppData may drain pendingUnencryptedWrites in wrap which transitions to handshake from FINISHED to
1631         // NOT_HANDSHAKING which invokes setHandshakeSuccess, and then wrapNonAppData also directly invokes this method.
1632         final boolean notified = !handshakePromise.isDone() && handshakePromise.trySuccess(ctx.channel());
1633         if (notified) {
1634             if (logger.isDebugEnabled()) {
1635                 SSLSession session = engine.getSession();
1636                 logger.debug(
1637                         "{} HANDSHAKEN: protocol:{} cipher suite:{}",
1638                         ctx.channel(),
1639                         session.getProtocol(),
1640                         session.getCipherSuite());
1641             }
1642             ctx.fireChannelInboundEvent(
1643                     new SslHandshakeCompletionEvent(engine().getSession(), applicationProtocol()));
1644         }
1645         if (isStateSet(STATE_READ_DURING_HANDSHAKE)) {
1646             clearState(STATE_READ_DURING_HANDSHAKE);
1647             if (!ctx.channel().getOption(ChannelOption.AUTO_READ)) {
1648                 ctx.read();
1649             }
1650         }
1651         return notified;
1652     }
1653 
1654     /**
1655      * Notify all the handshake futures about the failure during the handshake.
1656      */
1657     private void setHandshakeFailure(ChannelHandlerContext ctx, Throwable cause) {
1658         setHandshakeFailure(ctx, cause, true, true, false);
1659     }
1660 
1661     /**
1662      * Notify all the handshake futures about the failure during the handshake.
1663      */
1664     private void setHandshakeFailure(ChannelHandlerContext ctx, Throwable cause, boolean closeInbound,
1665                                      boolean notify, boolean alwaysFlushAndClose) {
1666         try {
1667             // Release all resources such as internal buffers that SSLEngine is managing.
1668             setState(STATE_OUTBOUND_CLOSED);
1669             engine.closeOutbound();
1670 
1671             if (closeInbound) {
1672                 try {
1673                     engine.closeInbound();
1674                 } catch (SSLException e) {
1675                     if (logger.isDebugEnabled()) {
1676                         // only log in debug mode as it most likely harmless and latest chrome still trigger
1677                         // this all the time.
1678                         //
1679                         // See https://github.com/netty/netty/issues/1340
1680                         String msg = e.getMessage();
1681                         if (msg == null || !(msg.contains("possible truncation attack") ||
1682                                 msg.contains("closing inbound before receiving peer's close_notify"))) {
1683                             logger.debug("{} SSLEngine.closeInbound() raised an exception.", ctx.channel(), e);
1684                         }
1685                     }
1686                 }
1687             }
1688             if (handshakePromise.tryFailure(cause) || alwaysFlushAndClose) {
1689                 SslUtils.handleHandshakeFailure(
1690                         ctx, engine().getSession(), applicationProtocol(), cause, notify);
1691             }
1692         } finally {
1693             // Ensure we remove and fail all pending writes in all cases and so release memory quickly.
1694             releaseAndFailAll(ctx, cause);
1695         }
1696     }
1697 
1698     private void setHandshakeFailureTransportFailure(ChannelHandlerContext ctx, Throwable cause) {
1699         // If TLS control frames fail to write we are in an unknown state and may become out of
1700         // sync with our peer. We give up and close the channel. This will also take care of
1701         // cleaning up any outstanding state (e.g. handshake promise, queued unencrypted data).
1702         try {
1703             SSLException transportFailure = new SSLException("failure when writing TLS control frames", cause);
1704             releaseAndFailAll(ctx, transportFailure);
1705             if (handshakePromise.tryFailure(transportFailure)) {
1706                 ctx.fireChannelInboundEvent(new SslHandshakeCompletionEvent(
1707                         engine().getSession(), applicationProtocol(), transportFailure));
1708             }
1709         } finally {
1710             ctx.close();
1711         }
1712     }
1713 
1714     private void releaseAndFailAll(ChannelHandlerContext ctx, Throwable cause) {
1715         if (pendingUnencryptedWrites != null) {
1716             pendingUnencryptedWrites.releaseAndFailAll(ctx, cause);
1717         }
1718     }
1719 
1720     private void notifyClosePromise(Throwable cause) {
1721         if (cause == null) {
1722             if (sslClosePromise.trySuccess(ctx.channel())) {
1723                 ctx.fireChannelInboundEvent(new SslCloseCompletionEvent(engine().getSession()));
1724             }
1725         } else {
1726             if (sslClosePromise.tryFailure(cause)) {
1727                 ctx.fireChannelInboundEvent(new SslCloseCompletionEvent(engine().getSession(), cause));
1728             }
1729         }
1730     }
1731 
1732     private Future<Void> closeOutboundAndChannel(
1733             final ChannelHandlerContext ctx, boolean disconnect) {
1734         setState(STATE_OUTBOUND_CLOSED);
1735         engine.closeOutbound();
1736 
1737         if (!ctx.channel().isActive()) {
1738             if (disconnect) {
1739                 return ctx.disconnect();
1740             }
1741             return ctx.close();
1742         }
1743         Promise<Void> promise = ctx.newPromise();
1744         Promise<Void> closeNotifyPromise = ctx.newPromise();
1745         try {
1746             flush(ctx, closeNotifyPromise);
1747         } finally {
1748             if (!isStateSet(STATE_CLOSE_NOTIFY)) {
1749                 setState(STATE_CLOSE_NOTIFY);
1750                 // It's important that we do not pass the original Promise to safeClose(...) as when flush(....)
1751                 // throws an Exception it will be propagated to the AbstractChannelHandlerContext which will try
1752                 // to fail the promise because of this. This will then fail as it was already completed by
1753                 // safeClose(...). We create a new Promise and try to notify the original Promise
1754                 // once it is complete. If we fail to do so we just ignore it as in this case it was failed already
1755                 // because of a propagated Exception.
1756                 //
1757                 // See https://github.com/netty/netty/issues/5931
1758                 Promise<Void> cascade = ctx.newPromise();
1759                 cascade.asFuture().cascadeTo(promise);
1760                 safeClose(ctx, closeNotifyPromise.asFuture(), cascade);
1761             } else {
1762                 /// We already handling the close_notify so just attach the promise to the sslClosePromise.
1763                 sslCloseFuture().addListener(future -> promise.setSuccess(null));
1764             }
1765         }
1766         return promise.asFuture();
1767     }
1768 
1769     private void flush(ChannelHandlerContext ctx, Promise<Void> promise) {
1770         if (pendingUnencryptedWrites != null) {
1771             pendingUnencryptedWrites.add(ctx.bufferAllocator().allocate(0), promise);
1772         } else {
1773             promise.setFailure(newPendingWritesNullException());
1774         }
1775         flush(ctx);
1776     }
1777 
1778     @Override
1779     public void handlerAdded0(final ChannelHandlerContext ctx) throws Exception {
1780         this.ctx = ctx;
1781 
1782         Channel channel = ctx.channel();
1783         pendingUnencryptedWrites = new SslHandlerCoalescingBufferQueue(16);
1784 
1785         setOpensslEngineSocketFd(channel);
1786         boolean fastOpen = channel.isOptionSupported(ChannelOption.TCP_FASTOPEN_CONNECT) &&
1787                 Boolean.TRUE.equals(channel.getOption(ChannelOption.TCP_FASTOPEN_CONNECT));
1788         boolean active = channel.isActive();
1789         if (active || fastOpen) {
1790             // Explicitly flush the handshake only if the channel is already active.
1791             // With TCP Fast Open, we write to the outbound buffer before the TCP connect is established.
1792             // The buffer will then be flushed as part of establishing the connection, saving us a round-trip.
1793             startHandshakeProcessing(active);
1794             // If we weren't able to include client_hello in the TCP SYN (e.g. no token, disabled at the OS) we have to
1795             // flush pending data in the outbound buffer later in channelActive().
1796             if (fastOpen) {
1797                 setState(STATE_NEEDS_FLUSH);
1798             }
1799         }
1800     }
1801 
1802     private void startHandshakeProcessing(boolean flushAtEnd) {
1803         if (!isStateSet(STATE_HANDSHAKE_STARTED)) {
1804             setState(STATE_HANDSHAKE_STARTED);
1805             if (engine.getUseClientMode()) {
1806                 // Begin the initial handshake.
1807                 // channelActive() event has been fired already, which means this.channelActive() will
1808                 // not be invoked. We have to initialize here instead.
1809                 handshake(flushAtEnd);
1810             }
1811             applyHandshakeTimeout();
1812         } else if (isStateSet(STATE_NEEDS_FLUSH)) {
1813             forceFlush(ctx);
1814         }
1815     }
1816 
1817     /**
1818      * Performs TLS renegotiation.
1819      */
1820     public Future<Channel> renegotiate() {
1821         ChannelHandlerContext ctx = this.ctx;
1822         if (ctx == null) {
1823             throw new IllegalStateException();
1824         }
1825 
1826         return renegotiate(ctx.executor().newPromise());
1827     }
1828 
1829     /**
1830      * Performs TLS renegotiation.
1831      */
1832     public Future<Channel> renegotiate(final Promise<Channel> promise) {
1833         requireNonNull(promise, "promise");
1834 
1835         ChannelHandlerContext ctx = this.ctx;
1836         if (ctx == null) {
1837             throw new IllegalStateException();
1838         }
1839 
1840         EventExecutor executor = ctx.executor();
1841         if (!executor.inEventLoop()) {
1842             executor.execute(() -> renegotiateOnEventLoop(promise));
1843             return promise.asFuture();
1844         }
1845 
1846         renegotiateOnEventLoop(promise);
1847         return promise.asFuture();
1848     }
1849 
1850     private void renegotiateOnEventLoop(final Promise<Channel> newHandshakePromise) {
1851         Future<Channel> oldHandshakePromise = handshakeFuture();
1852         if (!oldHandshakePromise.isDone()) {
1853             // There's no need to handshake because handshake is in progress already.
1854             // Merge the new promise into the old one.
1855             oldHandshakePromise.cascadeTo(newHandshakePromise);
1856         } else {
1857             handshakePromise = newHandshakePromise;
1858             handshake(true);
1859             applyHandshakeTimeout();
1860         }
1861     }
1862 
1863     /**
1864      * Performs TLS (re)negotiation.
1865      * @param flushAtEnd Set to {@code true} if the outbound buffer should be flushed (written to the network) at the
1866      *                  end. Set to {@code false} if the handshake will be flushed later, e.g. as part of TCP Fast Open
1867      *                  connect.
1868      */
1869     private void handshake(boolean flushAtEnd) {
1870         if (engine.getHandshakeStatus() != HandshakeStatus.NOT_HANDSHAKING) {
1871             // Not all SSLEngine implementations support calling beginHandshake multiple times while a handshake
1872             // is in progress. See https://github.com/netty/netty/issues/4718.
1873             return;
1874         }
1875         if (handshakePromise.isDone()) {
1876             // If the handshake is done already lets just return directly as there is no need to trigger it again.
1877             // This can happen if the handshake(...) was triggered before we called channelActive(...) by a
1878             // flush() that was triggered by a FutureListener that was added to the Future returned
1879             // from the connect(...) method. In this case we will see the flush() happen before we had a chance to
1880             // call fireChannelActive() on the pipeline.
1881             return;
1882         }
1883 
1884         // Begin handshake.
1885         final ChannelHandlerContext ctx = this.ctx;
1886         try {
1887             engine.beginHandshake();
1888             wrapNonAppData(ctx, false);
1889         } catch (Throwable e) {
1890             setHandshakeFailure(ctx, e);
1891         } finally {
1892             if (flushAtEnd) {
1893                 forceFlush(ctx);
1894             }
1895         }
1896     }
1897 
1898     private void applyHandshakeTimeout() {
1899         final Promise<Channel> localHandshakePromise = handshakePromise;
1900 
1901         // Set timeout if necessary.
1902         final long handshakeTimeoutMillis = this.handshakeTimeoutMillis;
1903         if (handshakeTimeoutMillis <= 0 || localHandshakePromise.isDone()) {
1904             return;
1905         }
1906 
1907         Future<?> timeoutFuture = ctx.executor().schedule(() -> {
1908             if (localHandshakePromise.isDone()) {
1909                 return;
1910             }
1911 
1912             SSLException exception =
1913                     new SslHandshakeTimeoutException("handshake timed out after " + handshakeTimeoutMillis + "ms");
1914             try {
1915                 if (localHandshakePromise.tryFailure(exception)) {
1916                     SslUtils.handleHandshakeFailure(
1917                             ctx, engine().getSession(), applicationProtocol(), exception, true);
1918                 }
1919             } finally {
1920                 releaseAndFailAll(ctx, exception);
1921             }
1922         }, handshakeTimeoutMillis, TimeUnit.MILLISECONDS);
1923 
1924         // Cancel the handshake timeout when handshake is finished.
1925         localHandshakePromise.asFuture().addListener(f -> timeoutFuture.cancel());
1926     }
1927 
1928     private void forceFlush(ChannelHandlerContext ctx) {
1929         clearState(STATE_NEEDS_FLUSH);
1930         ctx.flush();
1931     }
1932 
1933      private void setOpensslEngineSocketFd(Channel c) {
1934          if (c instanceof UnixChannel && engine instanceof ReferenceCountedOpenSslEngine) {
1935              ((ReferenceCountedOpenSslEngine) engine).bioSetFd(((UnixChannel) c).fd().intValue());
1936          }
1937      }
1938 
1939     /**
1940      * Issues an initial TLS handshake once connected when used in client-mode
1941      */
1942     @Override
1943     public void channelActive(final ChannelHandlerContext ctx) throws Exception {
1944         setOpensslEngineSocketFd(ctx.channel());
1945         if (!startTls) {
1946             startHandshakeProcessing(true);
1947         }
1948         ctx.fireChannelActive();
1949     }
1950 
1951     private void safeClose(
1952             final ChannelHandlerContext ctx, final Future<Void> flushFuture,
1953             final Promise<Void> promise) {
1954         if (!ctx.channel().isActive()) {
1955             ctx.close().cascadeTo(promise);
1956             return;
1957         }
1958 
1959         Future<?> timeoutFuture;
1960         if (!flushFuture.isDone()) {
1961             long closeNotifyTimeout = closeNotifyFlushTimeoutMillis;
1962             if (closeNotifyTimeout > 0) {
1963                 // Force-close the connection if close_notify is not fully sent in time.
1964                 timeoutFuture = ctx.executor().schedule(() -> {
1965                     // May be done in the meantime as cancel(...) is only best effort.
1966                     if (!flushFuture.isDone()) {
1967                         logger.warn("{} Last write attempt timed out; force-closing the connection.",
1968                                 ctx.channel());
1969                         addCloseListener(ctx.close(), promise);
1970                     }
1971                 }, closeNotifyTimeout, TimeUnit.MILLISECONDS);
1972             } else {
1973                 timeoutFuture = null;
1974             }
1975         } else {
1976             timeoutFuture = null;
1977         }
1978 
1979         // Close the connection if close_notify is sent in time.
1980         flushFuture.addListener(f -> {
1981             if (timeoutFuture != null) {
1982                 timeoutFuture.cancel();
1983             }
1984             final long closeNotifyReadTimeout = closeNotifyReadTimeoutMillis;
1985             if (closeNotifyReadTimeout <= 0) {
1986                 if (ctx.channel().isActive()) {
1987                     // Trigger the close in all cases to make sure the promise is notified
1988                     // See https://github.com/netty/netty/issues/2358
1989                     addCloseListener(ctx.close(), promise);
1990                 } else {
1991                     promise.trySuccess(null);
1992                 }
1993             } else {
1994                 Future<?> closeNotifyReadTimeoutFuture;
1995 
1996                 Future<Channel> closeFuture = sslCloseFuture();
1997 
1998                 if (!closeFuture.isDone()) {
1999                     closeNotifyReadTimeoutFuture = ctx.executor().schedule(() -> {
2000                         if (!closeFuture.isDone()) {
2001                             logger.debug(
2002                                     "{} did not receive close_notify in {}ms; force-closing the connection.",
2003                                     ctx.channel(), closeNotifyReadTimeout);
2004 
2005                             // Do the close now...
2006                             addCloseListener(ctx.close(), promise);
2007                         }
2008                     }, closeNotifyReadTimeout, TimeUnit.MILLISECONDS);
2009                 } else {
2010                     closeNotifyReadTimeoutFuture = null;
2011                 }
2012 
2013                 // Do the close once we received the close_notify.
2014                 closeFuture.addListener(future -> {
2015                     if (closeNotifyReadTimeoutFuture != null) {
2016                         closeNotifyReadTimeoutFuture.cancel();
2017                     }
2018                     if (ctx.channel().isActive()) {
2019                         addCloseListener(ctx.close(), promise);
2020                     } else {
2021                         promise.trySuccess(null);
2022                     }
2023                 });
2024             }
2025         });
2026     }
2027 
2028     private static void addCloseListener(Future<Void> future, Promise<Void> promise) {
2029         // We notify the promise in the PromiseNotifier as there is a "race" where the close(...) call
2030         // by the timeoutFuture and the close call in the flushFuture listener will be called. Because of
2031         // this we need to use trySuccess() and tryFailure(...) as otherwise we can cause an
2032         // IllegalStateException.
2033         // Also we not want to log if the notification happens as this is expected in some cases.
2034         // See https://github.com/netty/netty/issues/5598
2035         future.cascadeTo(promise);
2036     }
2037 
2038     /**
2039      * Allocate buffer of the right direct/not-direct type that the SSLEngine prefers.
2040      */
2041     private Buffer allocate(ChannelHandlerContext ctx, int capacity) {
2042         BufferAllocator alloc = ctx.bufferAllocator();
2043 
2044         boolean hasDirect = alloc.getAllocationType().isDirect();
2045         boolean wantsDirect = engineType.wantsDirectBuffer;
2046         if (hasDirect && !wantsDirect) {
2047             alloc = DefaultBufferAllocators.onHeapAllocator();
2048         } else if (!hasDirect && wantsDirect) {
2049             alloc = DefaultBufferAllocators.offHeapAllocator();
2050         }
2051 
2052         return alloc.allocate(capacity);
2053     }
2054 
2055     /**
2056      * Allocates an outbound network buffer for {@link SSLEngine#wrap(ByteBuffer, ByteBuffer)} which can encrypt
2057      * the specified amount of pending bytes.
2058      */
2059     private Buffer allocateOutNetBuf(ChannelHandlerContext ctx, int pendingBytes, int numComponents) {
2060         return engineType.allocateWrapBuffer(this, ctx.bufferAllocator(), pendingBytes, numComponents);
2061     }
2062 
2063     private boolean isStateSet(int bit) {
2064         return (state & bit) == bit;
2065     }
2066 
2067     private void setState(int bit) {
2068         state |= bit;
2069     }
2070 
2071     private void clearState(int bit) {
2072         state &= ~bit;
2073     }
2074 
2075     @Override
2076     public long pendingOutboundBytes(ChannelHandlerContext ctx) {
2077         return pendingUnencryptedWrites == null ? 0 : pendingUnencryptedWrites.readableBytes();
2078     }
2079 
2080     /**
2081      * Each call to SSL_write will introduce about ~100 bytes of overhead. This coalescing queue attempts to increase
2082      * goodput by aggregating the plaintext in chunks of {@link #wrapDataSize}. If many small chunks are written
2083      * this can increase goodput, decrease the amount of calls to SSL_write, and decrease overall encryption operations.
2084      */
2085     private final class SslHandlerCoalescingBufferQueue extends AbstractCoalescingBufferQueue {
2086 
2087         SslHandlerCoalescingBufferQueue(int initSize) {
2088             super(initSize);
2089         }
2090 
2091         @Override
2092         protected Buffer compose(BufferAllocator alloc, Buffer cumulation, Buffer next) {
2093             if (cumulation.writableBytes() >= next.readableBytes()) {
2094                 cumulation.writeBytes(next);
2095                 next.close();
2096                 return cumulation;
2097             }
2098             if (cumulation instanceof CompositeBuffer) {
2099                 CompositeBuffer composite = (CompositeBuffer) cumulation;
2100                 if (next.readableBytes() < wrapDataSize / 2) {
2101                     composite.ensureWritable(wrapDataSize);
2102                     composite.writeBytes(next);
2103                     next.close();
2104                 } else {
2105                     composite.extendWith(next.send());
2106                 }
2107                 return composite;
2108             }
2109             int minIncrement = 2 * wrapDataSize; // Amortise cost of allocation.
2110             cumulation = copyAndCompose(alloc, cumulation, next, minIncrement);
2111             return cumulation;
2112         }
2113 
2114         @Override
2115         protected Buffer composeFirst(BufferAllocator allocator, Buffer first) {
2116             if (first instanceof CompositeBuffer) {
2117                 CompositeBuffer composite = (CompositeBuffer) first;
2118                 if (engineType.wantsDirectBuffer && !allocator.getAllocationType().isDirect()) {
2119                     first = DefaultBufferAllocators.offHeapAllocator().allocate(composite.readableBytes());
2120                 } else {
2121                     first = allocator.allocate(composite.readableBytes());
2122                 }
2123                 try {
2124                     first.writeBytes(composite);
2125                 } catch (Throwable cause) {
2126                     first.close();
2127                     throw cause;
2128                 }
2129                 composite.close();
2130             }
2131             return first;
2132         }
2133 
2134         @Override
2135         protected Buffer removeEmptyValue() {
2136             return null;
2137         }
2138     }
2139 
2140     private final class LazyPromise extends DefaultPromise<Channel> {
2141 
2142         LazyPromise() {
2143             super(ImmediateEventExecutor.INSTANCE);
2144         }
2145 
2146         @Override
2147         protected void checkDeadLock() {
2148             if (ctx == null) {
2149                 // If ctx is null the handlerAdded(...) callback was not called, in this case the checkDeadLock()
2150                 // method was called from another Thread then the one that is used by ctx.executor(). We need to
2151                 // guard against this as a user can see a race if handshakeFuture().sync() is called but the
2152                 // handlerAdded(..) method was not yet as it is called from the EventExecutor of the
2153                 // ChannelHandlerContext.
2154                 return;
2155             }
2156             checkDeadLock(ctx.executor());
2157         }
2158     }
2159 }