1 /*
2 * Copyright 2012 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License,
5 * version 2.0 (the "License"); you may not use this file except in compliance
6 * with the License. You may obtain a copy of the License at:
7 *
8 * https://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations
14 * under the License.
15 */
16 package io.netty5.handler.ssl;
17
18 import io.netty5.buffer.BufferUtil;
19 import io.netty5.buffer.api.Buffer;
20 import io.netty5.buffer.api.BufferAllocator;
21 import io.netty5.buffer.api.CompositeBuffer;
22 import io.netty5.buffer.api.DefaultBufferAllocators;
23 import io.netty5.util.Resource;
24 import io.netty5.buffer.api.StandardAllocationTypes;
25 import io.netty5.channel.AbstractCoalescingBufferQueue;
26 import io.netty5.channel.Channel;
27 import io.netty5.channel.ChannelException;
28 import io.netty5.channel.ChannelHandler;
29 import io.netty5.channel.ChannelHandlerContext;
30 import io.netty5.channel.ChannelOption;
31 import io.netty5.channel.ChannelPipeline;
32 import io.netty5.channel.unix.UnixChannel;
33 import io.netty5.handler.codec.ByteToMessageDecoder;
34 import io.netty5.handler.codec.DecoderException;
35 import io.netty5.handler.codec.UnsupportedMessageTypeException;
36 import io.netty5.util.concurrent.DefaultPromise;
37 import io.netty5.util.concurrent.EventExecutor;
38 import io.netty5.util.concurrent.Future;
39 import io.netty5.util.concurrent.ImmediateEventExecutor;
40 import io.netty5.util.concurrent.ImmediateExecutor;
41 import io.netty5.util.concurrent.Promise;
42 import io.netty5.util.internal.PlatformDependent;
43 import io.netty5.util.internal.SilentDispose;
44 import io.netty5.util.internal.UnstableApi;
45 import io.netty5.util.internal.logging.InternalLogger;
46 import io.netty5.util.internal.logging.InternalLoggerFactory;
47
48 import javax.net.ssl.SSLEngine;
49 import javax.net.ssl.SSLEngineResult;
50 import javax.net.ssl.SSLEngineResult.HandshakeStatus;
51 import javax.net.ssl.SSLEngineResult.Status;
52 import javax.net.ssl.SSLException;
53 import javax.net.ssl.SSLHandshakeException;
54 import javax.net.ssl.SSLSession;
55 import java.io.IOException;
56 import java.nio.ByteBuffer;
57 import java.nio.channels.ClosedChannelException;
58 import java.nio.channels.DatagramChannel;
59 import java.nio.channels.SocketChannel;
60 import java.util.concurrent.Executor;
61 import java.util.concurrent.RejectedExecutionException;
62 import java.util.concurrent.TimeUnit;
63 import java.util.regex.Pattern;
64
65 import static io.netty5.handler.ssl.SslUtils.getEncryptedPacketLength;
66 import static io.netty5.util.internal.ObjectUtil.checkPositiveOrZero;
67 import static io.netty5.util.internal.SilentDispose.autoClosing;
68 import static java.util.Objects.requireNonNull;
69
70 /**
71 * Adds <a href="https://en.wikipedia.org/wiki/Transport_Layer_Security">SSL
72 * · TLS</a> and StartTLS support to a {@link Channel}. Please refer
73 * to the <strong>"SecureChat"</strong> example in the distribution or the web
74 * site for the detailed usage.
75 *
76 * <h3>Beginning the handshake</h3>
77 * <p>
78 * Beside using the handshake {@link Future} to get notified about the completion of the handshake it's
79 * also possible to detect it by implement the
80 * {@link ChannelHandler#channelInboundEvent(ChannelHandlerContext, Object)}
81 * method and check for a {@link SslHandshakeCompletionEvent}.
82 *
83 * <h3>Handshake</h3>
84 * <p>
85 * The handshake will be automatically issued for you once the {@link Channel} is active and
86 * {@link SSLEngine#getUseClientMode()} returns {@code true}.
87 * So no need to bother with it by your self.
88 *
89 * <h3>Closing the session</h3>
90 * <p>
91 * To close the SSL session, the {@link #closeOutbound()} method should be
92 * called to send the {@code close_notify} message to the remote peer. One
93 * exception is when you close the {@link Channel} - {@link SslHandler}
94 * intercepts the close request and send the {@code close_notify} message
95 * before the channel closure automatically. Once the SSL session is closed,
96 * it is not reusable, and consequently you should create a new
97 * {@link SslHandler} with a new {@link SSLEngine} as explained in the
98 * following section.
99 *
100 * <h3>Restarting the session</h3>
101 * <p>
102 * To restart the SSL session, you must remove the existing closed
103 * {@link SslHandler} from the {@link ChannelPipeline}, insert a new
104 * {@link SslHandler} with a new {@link SSLEngine} into the pipeline,
105 * and start the handshake process as described in the first section.
106 *
107 * <h3>Implementing StartTLS</h3>
108 * <p>
109 * <a href="https://en.wikipedia.org/wiki/STARTTLS">StartTLS</a> is the
110 * communication pattern that secures the wire in the middle of the plaintext
111 * connection. Please note that it is different from SSL · TLS, that
112 * secures the wire from the beginning of the connection. Typically, StartTLS
113 * is composed of three steps:
114 * <ol>
115 * <li>Client sends a StartTLS request to server.</li>
116 * <li>Server sends a StartTLS response to client.</li>
117 * <li>Client begins SSL handshake.</li>
118 * </ol>
119 * If you implement a server, you need to:
120 * <ol>
121 * <li>create a new {@link SslHandler} instance with {@code startTls} flag set
122 * to {@code true},</li>
123 * <li>insert the {@link SslHandler} to the {@link ChannelPipeline}, and</li>
124 * <li>write a StartTLS response.</li>
125 * </ol>
126 * Please note that you must insert {@link SslHandler} <em>before</em> sending
127 * the StartTLS response. Otherwise the client can send begin SSL handshake
128 * before {@link SslHandler} is inserted to the {@link ChannelPipeline}, causing
129 * data corruption.
130 * <p>
131 * The client-side implementation is much simpler.
132 * <ol>
133 * <li>Write a StartTLS request,</li>
134 * <li>wait for the StartTLS response,</li>
135 * <li>create a new {@link SslHandler} instance with {@code startTls} flag set
136 * to {@code false},</li>
137 * <li>insert the {@link SslHandler} to the {@link ChannelPipeline}, and</li>
138 * <li>Initiate SSL handshake.</li>
139 * </ol>
140 *
141 * <h3>Known issues</h3>
142 * <p>
143 * Because of a known issue with the current implementation of the SslEngine that comes
144 * with Java it may be possible that you see blocked IO-Threads while a full GC is done.
145 * <p>
146 * So if you are affected you can workaround this problem by adjust the cache settings
147 * like shown below:
148 *
149 * <pre>
150 * SslContext context = ...;
151 * context.getServerSessionContext().setSessionCacheSize(someSaneSize);
152 * context.getServerSessionContext().setSessionTime(someSameTimeout);
153 * </pre>
154 * <p>
155 * What values to use here depends on the nature of your application and should be set
156 * based on monitoring and debugging of it.
157 * For more details see
158 * <a href="https://github.com/netty/netty/issues/832">#832</a> in our issue tracker.
159 */
160 public class SslHandler extends ByteToMessageDecoder {
161 private static final InternalLogger logger =
162 InternalLoggerFactory.getInstance(SslHandler.class);
163 private static final Pattern IGNORABLE_CLASS_IN_STACK = Pattern.compile(
164 "^.*(?:Socket|Datagram)Channel.*$");
165 private static final Pattern IGNORABLE_ERROR_MESSAGE = Pattern.compile(
166 "^.*(?:connection.*(?:reset|closed|abort|broken)|broken.*pipe).*$", Pattern.CASE_INSENSITIVE);
167 private static final int STATE_SENT_FIRST_MESSAGE = 1;
168 private static final int STATE_FLUSHED_BEFORE_HANDSHAKE = 1 << 1;
169 private static final int STATE_READ_DURING_HANDSHAKE = 1 << 2;
170 private static final int STATE_HANDSHAKE_STARTED = 1 << 3;
171 /**
172 * Set by wrap*() methods when something is produced.
173 * {@link #channelReadComplete(ChannelHandlerContext)} will check this flag, clear it, and call ctx.flush().
174 */
175 private static final int STATE_NEEDS_FLUSH = 1 << 4;
176 private static final int STATE_OUTBOUND_CLOSED = 1 << 5;
177 private static final int STATE_CLOSE_NOTIFY = 1 << 6;
178 private static final int STATE_PROCESS_TASK = 1 << 7;
179 /**
180 * This flag is used to determine if we need to call {@link ChannelHandlerContext#read()} to consume more data
181 * when {@link ChannelOption#AUTO_READ} is {@code false}.
182 */
183 private static final int STATE_FIRE_CHANNEL_READ = 1 << 8;
184 private static final int STATE_UNWRAP_REENTRY = 1 << 9;
185
186 /**
187 * <a href="https://tools.ietf.org/html/rfc5246#section-6.2">2^14</a> which is the maximum sized plaintext chunk
188 * allowed by the TLS RFC.
189 */
190 private static final int MAX_PLAINTEXT_LENGTH = 16 * 1024;
191
192 private enum SslEngineType {
193 TCNATIVE(true, COMPOSITE_CUMULATOR) {
194 @Override
195 Buffer allocateWrapBuffer(SslHandler handler, BufferAllocator allocator,
196 int pendingBytes, int numComponents) {
197 ReferenceCountedOpenSslEngine engine = (ReferenceCountedOpenSslEngine) handler.engine;
198 int maxWrapLength = engine.calculateMaxLengthForWrap(pendingBytes, numComponents);
199 if (allocator.getAllocationType() != StandardAllocationTypes.OFF_HEAP) {
200 allocator = DefaultBufferAllocators.offHeapAllocator();
201 }
202 return allocator.allocate(maxWrapLength);
203 }
204
205 @Override
206 int calculatePendingData(SslHandler handler, int guess) {
207 int sslPending = ((ReferenceCountedOpenSslEngine) handler.engine).sslPending();
208 return sslPending > 0 ? sslPending : guess;
209 }
210
211 @Override
212 boolean jdkCompatibilityMode(SSLEngine engine) {
213 return ((ReferenceCountedOpenSslEngine) engine).jdkCompatibilityMode;
214 }
215 },
216 CONSCRYPT(true, COMPOSITE_CUMULATOR) {
217 @Override
218 Buffer allocateWrapBuffer(SslHandler handler, BufferAllocator allocator,
219 int pendingBytes, int numComponents) {
220 ConscryptAlpnSslEngine engine = (ConscryptAlpnSslEngine) handler.engine;
221 int size = engine.calculateOutNetBufSize(pendingBytes, numComponents);
222 if (allocator.getAllocationType() != StandardAllocationTypes.OFF_HEAP) {
223 allocator = DefaultBufferAllocators.offHeapAllocator();
224 }
225 return allocator.allocate(size);
226 }
227
228 @Override
229 int calculatePendingData(SslHandler handler, int guess) {
230 return guess;
231 }
232
233 @Override
234 boolean jdkCompatibilityMode(SSLEngine engine) {
235 return true;
236 }
237 },
238 JDK(false, MERGE_CUMULATOR) {
239 @Override
240 Buffer allocateWrapBuffer(SslHandler handler, BufferAllocator allocator,
241 int pendingBytes, int numComponents) {
242 // As for the JDK SSLEngine we always need to allocate buffers of the size required by the SSLEngine
243 // (normally ~16KB). This is required even if the amount of data to encrypt is very small. Use heap
244 // buffers to reduce the native memory usage.
245 //
246 // Beside this the JDK SSLEngine also (as of today) will do an extra heap to direct buffer copy
247 // if a direct buffer is used as its internals operate on byte[].
248 if (allocator.getAllocationType() == StandardAllocationTypes.OFF_HEAP) {
249 allocator = DefaultBufferAllocators.onHeapAllocator();
250 }
251 return allocator.allocate(handler.engine.getSession().getPacketBufferSize());
252 }
253
254 @Override
255 int calculatePendingData(SslHandler handler, int guess) {
256 return guess;
257 }
258
259 @Override
260 boolean jdkCompatibilityMode(SSLEngine engine) {
261 return true;
262 }
263 };
264
265 static SslEngineType forEngine(SSLEngine engine) {
266 return engine instanceof ReferenceCountedOpenSslEngine ? TCNATIVE :
267 engine instanceof ConscryptAlpnSslEngine ? CONSCRYPT : JDK;
268 }
269
270 SslEngineType(boolean wantsDirectBuffer, Cumulator cumulator) {
271 this.wantsDirectBuffer = wantsDirectBuffer;
272 this.cumulator = cumulator;
273 }
274
275 abstract int calculatePendingData(SslHandler handler, int guess);
276
277 abstract boolean jdkCompatibilityMode(SSLEngine engine);
278
279 abstract Buffer allocateWrapBuffer(SslHandler handler, BufferAllocator allocator,
280 int pendingBytes, int numComponents);
281
282 // BEGIN Platform-dependent flags
283
284 /**
285 * {@code true} if and only if {@link SSLEngine} expects a direct buffer and so if a heap buffer
286 * is given will make an extra memory copy.
287 */
288 final boolean wantsDirectBuffer;
289
290 // END Platform-dependent flags
291
292 /**
293 * When using JDK {@link SSLEngine}, we use {@link #MERGE_CUMULATOR} because it works only with
294 * one {@link ByteBuffer}.
295 *
296 * When using {@link OpenSslEngine}, we can use {@link #COMPOSITE_CUMULATOR} because it has
297 * {@link OpenSslEngine#unwrap(ByteBuffer[], ByteBuffer[])} which works with multiple {@link ByteBuffer}s
298 * and which does not need to do extra memory copies.
299 */
300 final Cumulator cumulator;
301 }
302
303 private volatile ChannelHandlerContext ctx;
304 private final SSLEngine engine;
305 private final SslEngineType engineType;
306 private final Executor delegatedTaskExecutor;
307 private final boolean jdkCompatibilityMode;
308
309 /**
310 * Used for converting {@link Buffer}s into {@link ByteBuffer}s, in a way that meet the expectations of the
311 * configured {@link SSLEngine}, and then calling the most optimal wrap and unwrap methods.
312 */
313 private final EngineWrapper engineWrapper;
314
315 private final boolean startTls;
316
317 private final SslTasksRunner sslTaskRunnerForUnwrap = new SslTasksRunner(true);
318 private final SslTasksRunner sslTaskRunner = new SslTasksRunner(false);
319
320 private SslHandlerCoalescingBufferQueue pendingUnencryptedWrites;
321 private Promise<Channel> handshakePromise = new LazyPromise();
322 private final Promise<Channel> sslClosePromise = new LazyPromise();
323
324 private int packetLength;
325 private short state;
326
327 private volatile long handshakeTimeoutMillis = 10000;
328 private volatile long closeNotifyFlushTimeoutMillis = 3000;
329 private volatile long closeNotifyReadTimeoutMillis;
330 volatile int wrapDataSize = MAX_PLAINTEXT_LENGTH;
331
332 /**
333 * Creates a new instance which runs all delegated tasks directly on the {@link EventExecutor}.
334 *
335 * @param engine the {@link SSLEngine} this handler will use
336 */
337 public SslHandler(SSLEngine engine) {
338 this(engine, false);
339 }
340
341 /**
342 * Creates a new instance which runs all delegated tasks directly on the {@link EventExecutor}.
343 *
344 * @param engine the {@link SSLEngine} this handler will use
345 * @param startTls {@code true} if the first write request shouldn't be
346 * encrypted by the {@link SSLEngine}
347 */
348 public SslHandler(SSLEngine engine, boolean startTls) {
349 this(engine, startTls, ImmediateExecutor.INSTANCE);
350 }
351
352 /**
353 * Creates a new instance.
354 *
355 * @param engine the {@link SSLEngine} this handler will use
356 * @param delegatedTaskExecutor the {@link Executor} that will be used to execute tasks that are returned by
357 * {@link SSLEngine#getDelegatedTask()}.
358 */
359 public SslHandler(SSLEngine engine, Executor delegatedTaskExecutor) {
360 this(engine, false, delegatedTaskExecutor);
361 }
362
363 /**
364 * Creates a new instance.
365 *
366 * @param engine the {@link SSLEngine} this handler will use
367 * @param startTls {@code true} if the first write request shouldn't be
368 * encrypted by the {@link SSLEngine}
369 * @param delegatedTaskExecutor the {@link Executor} that will be used to execute tasks that are returned by
370 * {@link SSLEngine#getDelegatedTask()}.
371 */
372 public SslHandler(SSLEngine engine, boolean startTls, Executor delegatedTaskExecutor) {
373 super(SslEngineType.forEngine(engine).cumulator);
374 requireNonNull(engine, "engine");
375 requireNonNull(delegatedTaskExecutor, "delegatedTaskExecutor");
376 this.engine = engine;
377 engineType = SslEngineType.forEngine(engine);
378 this.delegatedTaskExecutor = delegatedTaskExecutor;
379 this.startTls = startTls;
380 engineWrapper = new EngineWrapper(engine, engineType.wantsDirectBuffer);
381 jdkCompatibilityMode = engineType.jdkCompatibilityMode(engine);
382 }
383
384 public long getHandshakeTimeoutMillis() {
385 return handshakeTimeoutMillis;
386 }
387
388 public void setHandshakeTimeout(long handshakeTimeout, TimeUnit unit) {
389 requireNonNull(unit, "unit");
390 setHandshakeTimeoutMillis(unit.toMillis(handshakeTimeout));
391 }
392
393 public void setHandshakeTimeoutMillis(long handshakeTimeoutMillis) {
394 this.handshakeTimeoutMillis = checkPositiveOrZero(handshakeTimeoutMillis, "handshakeTimeoutMillis");
395 }
396
397 /**
398 * Sets the number of bytes to pass to each {@link SSLEngine#wrap(ByteBuffer[], int, int, ByteBuffer)} call.
399 * <p>
400 * This value will partition data which is passed to write
401 * {@link #write(ChannelHandlerContext, Object)}. The partitioning will work as follows:
402 * <ul>
403 * <li>If {@code wrapDataSize <= 0} then we will write each data chunk as is.</li>
404 * <li>If {@code wrapDataSize > data size} then we will attempt to aggregate multiple data chunks together.</li>
405 * <li>If {@code wrapDataSize > data size} Else if {@code wrapDataSize <= data size} then we will divide the data
406 * into chunks of {@code wrapDataSize} when writing.</li>
407 * </ul>
408 * <p>
409 * If the {@link SSLEngine} doesn't support a gather wrap operation (e.g. {@link SslProvider#OPENSSL}) then
410 * aggregating data before wrapping can help reduce the ratio between TLS overhead vs data payload which will lead
411 * to better goodput. Writing fixed chunks of data can also help target the underlying transport's (e.g. TCP)
412 * frame size. Under lossy/congested network conditions this may help the peer get full TLS packets earlier and
413 * be able to do work sooner, as opposed to waiting for the all the pieces of the TLS packet to arrive.
414 * @param wrapDataSize the number of bytes which will be passed to each
415 * {@link SSLEngine#wrap(ByteBuffer[], int, int, ByteBuffer)} call.
416 */
417 @UnstableApi
418 public final void setWrapDataSize(int wrapDataSize) {
419 this.wrapDataSize = wrapDataSize;
420 }
421
422 /**
423 * @deprecated use {@link #getCloseNotifyFlushTimeoutMillis()}
424 */
425 @Deprecated
426 public long getCloseNotifyTimeoutMillis() {
427 return getCloseNotifyFlushTimeoutMillis();
428 }
429
430 /**
431 * @deprecated use {@link #setCloseNotifyFlushTimeout(long, TimeUnit)}
432 */
433 @Deprecated
434 public void setCloseNotifyTimeout(long closeNotifyTimeout, TimeUnit unit) {
435 setCloseNotifyFlushTimeout(closeNotifyTimeout, unit);
436 }
437
438 /**
439 * @deprecated use {@link #setCloseNotifyFlushTimeoutMillis(long)}
440 */
441 @Deprecated
442 public void setCloseNotifyTimeoutMillis(long closeNotifyFlushTimeoutMillis) {
443 setCloseNotifyFlushTimeoutMillis(closeNotifyFlushTimeoutMillis);
444 }
445
446 /**
447 * Gets the timeout for flushing the close_notify that was triggered by closing the
448 * {@link Channel}. If the close_notify was not flushed in the given timeout the {@link Channel} will be closed
449 * forcibly.
450 */
451 public final long getCloseNotifyFlushTimeoutMillis() {
452 return closeNotifyFlushTimeoutMillis;
453 }
454
455 /**
456 * Sets the timeout for flushing the close_notify that was triggered by closing the
457 * {@link Channel}. If the close_notify was not flushed in the given timeout the {@link Channel} will be closed
458 * forcibly.
459 */
460 public final void setCloseNotifyFlushTimeout(long closeNotifyFlushTimeout, TimeUnit unit) {
461 setCloseNotifyFlushTimeoutMillis(unit.toMillis(closeNotifyFlushTimeout));
462 }
463
464 /**
465 * See {@link #setCloseNotifyFlushTimeout(long, TimeUnit)}.
466 */
467 public final void setCloseNotifyFlushTimeoutMillis(long closeNotifyFlushTimeoutMillis) {
468 this.closeNotifyFlushTimeoutMillis = checkPositiveOrZero(closeNotifyFlushTimeoutMillis,
469 "closeNotifyFlushTimeoutMillis");
470 }
471
472 /**
473 * Gets the timeout (in ms) for receiving the response for the close_notify that was triggered by closing the
474 * {@link Channel}. This timeout starts after the close_notify message was successfully written to the
475 * remote peer. Use {@code 0} to directly close the {@link Channel} and not wait for the response.
476 */
477 public final long getCloseNotifyReadTimeoutMillis() {
478 return closeNotifyReadTimeoutMillis;
479 }
480
481 /**
482 * Sets the timeout for receiving the response for the close_notify that was triggered by closing the
483 * {@link Channel}. This timeout starts after the close_notify message was successfully written to the
484 * remote peer. Use {@code 0} to directly close the {@link Channel} and not wait for the response.
485 */
486 public final void setCloseNotifyReadTimeout(long closeNotifyReadTimeout, TimeUnit unit) {
487 setCloseNotifyReadTimeoutMillis(unit.toMillis(closeNotifyReadTimeout));
488 }
489
490 /**
491 * See {@link #setCloseNotifyReadTimeout(long, TimeUnit)}.
492 */
493 public final void setCloseNotifyReadTimeoutMillis(long closeNotifyReadTimeoutMillis) {
494 this.closeNotifyReadTimeoutMillis = checkPositiveOrZero(closeNotifyReadTimeoutMillis,
495 "closeNotifyReadTimeoutMillis");
496 }
497
498 /**
499 * Returns the {@link SSLEngine} which is used by this handler.
500 */
501 public SSLEngine engine() {
502 return engine;
503 }
504
505 /**
506 * Returns the name of the current application-level protocol.
507 *
508 * @return the protocol name or {@code null} if application-level protocol has not been negotiated
509 */
510 public String applicationProtocol() {
511 SSLEngine engine = engine();
512 if (!(engine instanceof ApplicationProtocolAccessor)) {
513 return null;
514 }
515
516 return ((ApplicationProtocolAccessor) engine).getNegotiatedApplicationProtocol();
517 }
518
519 /**
520 * Returns a {@link Future} that will get notified once the current TLS handshake completes.
521 *
522 * @return the {@link Future} for the initial TLS handshake if {@link #renegotiate()} was not invoked.
523 * The {@link Future} for the most recent {@linkplain #renegotiate() TLS renegotiation} otherwise.
524 */
525 public Future<Channel> handshakeFuture() {
526 return handshakePromise.asFuture();
527 }
528
529 /**
530 * Sends an SSL {@code close_notify} message to the specified channel and
531 * destroys the underlying {@link SSLEngine}. This will <strong>not</strong> close the underlying
532 * {@link Channel}. If you want to also close the {@link Channel} use {@link Channel#close()} or
533 * {@link ChannelHandlerContext#close()}
534 */
535 public Future<Void> closeOutbound() {
536 final ChannelHandlerContext ctx = this.ctx;
537 Promise<Void> promise = ctx.newPromise();
538 if (ctx.executor().inEventLoop()) {
539 closeOutbound0(promise);
540 } else {
541 ctx.executor().execute(() -> closeOutbound0(promise));
542 }
543 return promise.asFuture();
544 }
545
546 private void closeOutbound0(Promise<Void> promise) {
547 setState(STATE_OUTBOUND_CLOSED);
548 engine.closeOutbound();
549 try {
550 flush(ctx, promise);
551 } catch (Exception e) {
552 if (!promise.tryFailure(e)) {
553 logger.warn("{} flush() raised a masked exception.", ctx.channel(), e);
554 }
555 }
556 }
557
558 /**
559 * Return the {@link Future} that will get notified if the inbound of the {@link SSLEngine} is closed.
560 *
561 * This method will return the same {@link Future} all the time.
562 *
563 * @see SSLEngine
564 */
565 public Future<Channel> sslCloseFuture() {
566 return sslClosePromise.asFuture();
567 }
568
569 @Override
570 public void handlerRemoved0(ChannelHandlerContext ctx) throws Exception {
571 try (AutoCloseable ignore = autoClosing(engine)) {
572 if (pendingUnencryptedWrites != null && !pendingUnencryptedWrites.isEmpty()) {
573 // Check if queue is not empty first because create a new ChannelException is expensive
574 pendingUnencryptedWrites.releaseAndFailAll(ctx,
575 new ChannelException("Pending write on removal of SslHandler"));
576 }
577 pendingUnencryptedWrites = null;
578
579 SSLException cause = null;
580
581 // If the handshake or SSLEngine closure is not done yet we should fail corresponding promise and
582 // notify the rest of the
583 // pipeline.
584 if (!handshakePromise.isDone()) {
585 cause = new SSLHandshakeException("SslHandler removed before handshake completed");
586 if (handshakePromise.tryFailure(cause)) {
587 ctx.fireChannelInboundEvent(new SslHandshakeCompletionEvent(
588 engine().getSession(), applicationProtocol(), cause));
589 }
590 }
591 if (!sslClosePromise.isDone()) {
592 if (cause == null) {
593 cause = new SSLException("SslHandler removed before SSLEngine was closed");
594 }
595 notifyClosePromise(cause);
596 }
597 }
598 }
599
600 @Override
601 public Future<Void> disconnect(final ChannelHandlerContext ctx) {
602 return closeOutboundAndChannel(ctx, true);
603 }
604
605 @Override
606 public Future<Void> close(final ChannelHandlerContext ctx) {
607 return closeOutboundAndChannel(ctx, false);
608 }
609
610 @Override
611 public void read(ChannelHandlerContext ctx) {
612 if (!handshakePromise.isDone()) {
613 setState(STATE_READ_DURING_HANDSHAKE);
614 }
615
616 ctx.read();
617 }
618
619 private static IllegalStateException newPendingWritesNullException() {
620 return new IllegalStateException("pendingUnencryptedWrites is null, handlerRemoved0 called?");
621 }
622
623 @Override
624 public Future<Void> write(final ChannelHandlerContext ctx, Object msg) {
625 if (!(msg instanceof Buffer)) {
626 UnsupportedMessageTypeException exception = new UnsupportedMessageTypeException(msg, Buffer.class);
627 logger.warn(exception);
628 SilentDispose.dispose(msg, logger);
629 return ctx.newFailedFuture(exception);
630 }
631 if (pendingUnencryptedWrites == null) {
632 IllegalStateException exception = newPendingWritesNullException();
633 try {
634 Resource.dispose(msg);
635 } catch (Exception e) {
636 exception.addSuppressed(e);
637 }
638 return ctx.newFailedFuture(exception);
639 } else {
640 Promise<Void> promise = ctx.newPromise();
641 pendingUnencryptedWrites.add((Buffer) msg, promise);
642 return promise.asFuture();
643 }
644 }
645
646 @Override
647 public void flush(ChannelHandlerContext ctx) {
648 // Do not encrypt the first write request if this handler is
649 // created with startTLS flag turned on.
650 if (startTls && !isStateSet(STATE_SENT_FIRST_MESSAGE)) {
651 setState(STATE_SENT_FIRST_MESSAGE);
652 pendingUnencryptedWrites.writeAndRemoveAll(ctx);
653 forceFlush(ctx);
654 // Explicit start handshake processing once we send the first message. This will also ensure
655 // we will schedule the timeout if needed.
656 startHandshakeProcessing(true);
657 return;
658 }
659
660 if (isStateSet(STATE_PROCESS_TASK)) {
661 return;
662 }
663
664 try {
665 wrapAndFlush(ctx);
666 } catch (Throwable cause) {
667 setHandshakeFailure(ctx, cause);
668 }
669 }
670
671 private void wrapAndFlush(ChannelHandlerContext ctx) throws SSLException {
672 if (pendingUnencryptedWrites.isEmpty()) {
673 pendingUnencryptedWrites.add(ctx.bufferAllocator().allocate(0), ctx.newPromise());
674 }
675 if (!handshakePromise.isDone()) {
676 setState(STATE_FLUSHED_BEFORE_HANDSHAKE);
677 }
678 try {
679 wrap(ctx, false);
680 } finally {
681 // We may have written some parts of data before an exception was thrown so ensure we always flush.
682 // See https://github.com/netty/netty/issues/3900#issuecomment-172481830
683 forceFlush(ctx);
684 }
685 }
686
687 // This method will not call setHandshakeFailure(...) !
688 private void wrap(ChannelHandlerContext ctx, boolean inUnwrap) throws SSLException {
689 Buffer out = null;
690 BufferAllocator alloc = ctx.bufferAllocator();
691 try {
692 final int wrapDataSize = this.wrapDataSize;
693 // Only continue to loop if the handler was not removed in the meantime.
694 // See https://github.com/netty/netty/issues/5860
695 outer: while (!ctx.isRemoved()) {
696 Promise<Void> promise = ctx.newPromise();
697 Buffer buf = wrapDataSize > 0 ?
698 pendingUnencryptedWrites.remove(alloc, wrapDataSize, promise) :
699 pendingUnencryptedWrites.removeFirst(promise);
700 if (buf == null) {
701 break;
702 }
703
704 if (out == null) {
705 out = allocateOutNetBuf(ctx, buf.readableBytes(), buf.countReadableComponents());
706 }
707
708 SSLEngineResult result = wrap(engine, buf, out);
709 if (buf.readableBytes() > 0) {
710 pendingUnencryptedWrites.addFirst(buf, promise);
711 // When we add the buffer/promise pair back we need to be sure we don't complete the promise
712 // later. We only complete the promise if the buffer is completely consumed.
713 promise = null;
714 } else {
715 buf.close();
716 }
717
718 // We need to write any data before we invoke any methods which may trigger re-entry, otherwise
719 // writes may occur out of order and TLS sequencing may be off (e.g. SSLV3_ALERT_BAD_RECORD_MAC).
720 if (out.readableBytes() > 0) {
721 final Buffer b = out;
722 out = null;
723 if (promise != null) {
724 ctx.write(b).cascadeTo(promise);
725 } else {
726 ctx.write(b);
727 }
728 } else if (promise != null) {
729 ctx.write(alloc.allocate(0)).cascadeTo(promise);
730 }
731 // else out is not readable we can re-use it and so save an extra allocation
732
733 if (result.getStatus() == Status.CLOSED) {
734 // Make a best effort to preserve any exception that way previously encountered from the handshake
735 // or the transport, else fallback to a general error.
736 Throwable exception = handshakePromise.isDone() ? handshakePromise.cause() : null;
737 if (exception == null) {
738 exception = sslClosePromise.isDone() ? sslClosePromise.cause() : null;
739 if (exception == null) {
740 exception = new SslClosedEngineException("SSLEngine closed already");
741 }
742 }
743 pendingUnencryptedWrites.releaseAndFailAll(ctx, exception);
744 return;
745 } else {
746 switch (result.getHandshakeStatus()) {
747 case NEED_TASK:
748 if (!runDelegatedTasks(inUnwrap)) {
749 // We scheduled a task on the delegatingTaskExecutor, so stop processing as we will
750 // resume once the task completes.
751 break outer;
752 }
753 break;
754 case FINISHED:
755 case NOT_HANDSHAKING: // work around for android bug that skips the FINISHED state.
756 setHandshakeSuccess();
757 break;
758 case NEED_WRAP:
759 // If we are expected to wrap again and we produced some data we need to ensure there
760 // is something in the queue to process as otherwise we will not try again before there
761 // was more added. Failing to do so may fail to produce an alert that can be
762 // consumed by the remote peer.
763 if (result.bytesProduced() > 0 && pendingUnencryptedWrites.isEmpty()) {
764 pendingUnencryptedWrites.add(alloc.allocate(0));
765 }
766 break;
767 case NEED_UNWRAP:
768 // The underlying engine is starving so we need to feed it with more data.
769 // See https://github.com/netty/netty/pull/5039
770 readIfNeeded(ctx);
771 return;
772 default:
773 throw new IllegalStateException(
774 "Unknown handshake status: " + result.getHandshakeStatus());
775 }
776 }
777 }
778 } finally {
779 if (out != null) {
780 out.close();
781 }
782 if (inUnwrap) {
783 setState(STATE_NEEDS_FLUSH);
784 }
785 }
786 }
787
788 /**
789 * This method will not call
790 * {@link #setHandshakeFailure(ChannelHandlerContext, Throwable, boolean, boolean, boolean)} or
791 * {@link #setHandshakeFailure(ChannelHandlerContext, Throwable)}.
792 * @return {@code true} if this method ends on {@link SSLEngineResult.HandshakeStatus#NOT_HANDSHAKING}.
793 */
794 private boolean wrapNonAppData(final ChannelHandlerContext ctx, boolean inUnwrap) throws SSLException {
795 Buffer out = null;
796 try {
797 // Only continue to loop if the handler was not removed in the meantime.
798 // See https://github.com/netty/netty/issues/5860
799 outer: while (!ctx.isRemoved()) {
800 if (out == null) {
801 // As this is called for the handshake we have no real idea how big the buffer needs to be.
802 // That said 2048 should give us enough room to include everything like ALPN / NPN data.
803 // If this is not enough we will increase the buffer in wrap(...).
804 out = allocateOutNetBuf(ctx, 2048, 1);
805 }
806 SSLEngineResult result = wrap(engine, null, out);
807 if (result.bytesProduced() > 0) {
808 ctx.write(out).addListener(future -> {
809 Throwable cause = future.cause();
810 if (cause != null) {
811 setHandshakeFailureTransportFailure(ctx, cause);
812 }
813 });
814 if (inUnwrap) {
815 setState(STATE_NEEDS_FLUSH);
816 }
817 out = null;
818 }
819
820 HandshakeStatus status = result.getHandshakeStatus();
821 switch (status) {
822 case FINISHED:
823 // We may be here because we read data and discovered the remote peer initiated a renegotiation
824 // and this write is to complete the new handshake. The user may have previously done a
825 // writeAndFlush which wasn't able to wrap data due to needing the pending handshake, so we
826 // attempt to wrap application data here if any is pending.
827 if (setHandshakeSuccess() && inUnwrap && !pendingUnencryptedWrites.isEmpty()) {
828 wrap(ctx, true);
829 }
830 return false;
831 case NEED_TASK:
832 if (!runDelegatedTasks(inUnwrap)) {
833 // We scheduled a task on the delegatingTaskExecutor, so stop processing as we will
834 // resume once the task completes.
835 break outer;
836 }
837 break;
838 case NEED_UNWRAP:
839 if (inUnwrap || unwrapNonAppData(ctx) <= 0) {
840 // If we asked for a wrap, the engine requested an unwrap, and we are in unwrap there is
841 // no use in trying to call wrap again because we have already attempted (or will after we
842 // return) to feed more data to the engine.
843 return false;
844 }
845 break;
846 case NEED_WRAP:
847 break;
848 case NOT_HANDSHAKING:
849 if (setHandshakeSuccess() && inUnwrap && !pendingUnencryptedWrites.isEmpty()) {
850 wrap(ctx, true);
851 }
852 // Workaround for TLS False Start problem reported at:
853 // https://github.com/netty/netty/issues/1108#issuecomment-14266970
854 if (!inUnwrap) {
855 unwrapNonAppData(ctx);
856 }
857 return true;
858 default:
859 throw new IllegalStateException("Unknown handshake status: " + result.getHandshakeStatus());
860 }
861
862 // Check if did not produce any bytes and if so break out of the loop, but only if we did not process
863 // a task as last action. It's fine to not produce any data as part of executing a task.
864 if (result.bytesProduced() == 0 && status != HandshakeStatus.NEED_TASK) {
865 break;
866 }
867
868 // It should not consume empty buffers when it is not handshaking
869 // Fix for Android, where it was encrypting empty buffers even when not handshaking
870 if (result.bytesConsumed() == 0 && result.getHandshakeStatus() == HandshakeStatus.NOT_HANDSHAKING) {
871 break;
872 }
873 }
874 } finally {
875 if (out != null) {
876 out.close();
877 }
878 }
879 return false;
880 }
881
882 private SSLEngineResult wrap(SSLEngine engine, Buffer in, Buffer out)
883 throws SSLException {
884 for (;;) {
885 SSLEngineResult result = engineWrapper.wrap(in, out);
886
887 if (result.getStatus() == Status.BUFFER_OVERFLOW) {
888 out.ensureWritable(engine.getSession().getPacketBufferSize());
889 } else {
890 return result;
891 }
892 }
893 }
894
895 @Override
896 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
897 boolean handshakeFailed = handshakePromise.isFailed();
898
899 ClosedChannelException exception = new ClosedChannelException();
900 // Make sure to release SSLEngine,
901 // and notify the handshake future if the connection has been closed during handshake.
902 setHandshakeFailure(ctx, exception, !isStateSet(STATE_OUTBOUND_CLOSED), isStateSet(STATE_HANDSHAKE_STARTED),
903 false);
904
905 // Ensure we always notify the sslClosePromise as well
906 notifyClosePromise(exception);
907
908 try {
909 super.channelInactive(ctx);
910 } catch (DecoderException e) {
911 if (!handshakeFailed || !(e.getCause() instanceof SSLException)) {
912 // We only rethrow the exception if the handshake did not fail before channelInactive(...) was called
913 // as otherwise this may produce duplicated failures as super.channelInactive(...) will also call
914 // channelRead(...).
915 //
916 // See https://github.com/netty/netty/issues/10119
917 throw e;
918 }
919 }
920 }
921
922 @Override
923 public void channelExceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
924 if (ignoreException(cause)) {
925 // It is safe to ignore the 'connection reset by peer' or
926 // 'broken pipe' error after sending close_notify.
927 if (logger.isDebugEnabled()) {
928 logger.debug(
929 "{} Swallowing a harmless 'connection reset by peer / broken pipe' error that occurred " +
930 "while writing close_notify in response to the peer's close_notify",
931 ctx.channel(), cause);
932 }
933
934 // Close the connection explicitly just in case the transport
935 // did not close the connection automatically.
936 if (ctx.channel().isActive()) {
937 ctx.close();
938 }
939 } else {
940 ctx.fireChannelExceptionCaught(cause);
941
942 if (cause instanceof SSLException ||
943 cause instanceof DecoderException && cause.getCause() instanceof SSLException) {
944 ctx.close();
945 }
946 }
947 }
948
949 /**
950 * Checks if the given {@link Throwable} can be ignore and just "swallowed"
951 *
952 * When an ssl connection is closed a close_notify message is sent.
953 * After that the peer also sends close_notify however, it's not mandatory to receive
954 * the close_notify. The party who sent the initial close_notify can close the connection immediately
955 * then the peer will get connection reset error.
956 *
957 */
958 private boolean ignoreException(Throwable t) {
959 if (!(t instanceof SSLException) && t instanceof IOException && sslClosePromise.isDone()) {
960 String message = t.getMessage();
961
962 // first try to match connection reset / broke peer based on the regex. This is the fastest way
963 // but may fail on different jdk impls or OS's
964 if (message != null && IGNORABLE_ERROR_MESSAGE.matcher(message).matches()) {
965 return true;
966 }
967
968 // Inspect the StackTraceElements to see if it was a connection reset / broken pipe or not
969 StackTraceElement[] elements = t.getStackTrace();
970 for (StackTraceElement element: elements) {
971 String classname = element.getClassName();
972 String methodname = element.getMethodName();
973
974 // skip all classes that belong to the io.netty5 package
975 if (classname.startsWith("io.netty5.")) {
976 continue;
977 }
978
979 // check if the method name is read if not skip it
980 if (!"read".equals(methodname)) {
981 continue;
982 }
983
984 // This will also match against SocketInputStream which is used by openjdk 7 and maybe
985 // also others
986 if (IGNORABLE_CLASS_IN_STACK.matcher(classname).matches()) {
987 return true;
988 }
989
990 try {
991 // No match by now.. Try to load the class via classloader and inspect it.
992 // This is mainly done as other JDK implementations may differ in name of
993 // the impl.
994 Class<?> clazz = PlatformDependent.getClassLoader(getClass()).loadClass(classname);
995
996 if (SocketChannel.class.isAssignableFrom(clazz)
997 || DatagramChannel.class.isAssignableFrom(clazz)) {
998 return true;
999 }
1000 } catch (Throwable cause) {
1001 if (logger.isDebugEnabled()) {
1002 logger.debug("Unexpected exception while loading class {} classname {}",
1003 getClass(), classname, cause);
1004 }
1005 }
1006 }
1007 }
1008
1009 return false;
1010 }
1011
1012 /**
1013 * Returns {@code true} if the given {@link Buffer} is encrypted. Be aware that this method
1014 * will not increase the readerIndex of the given {@link Buffer}.
1015 *
1016 * @param buffer
1017 * The {@link Buffer} to read from. Be aware that it must have at least 5 bytes to read,
1018 * otherwise it will throw an {@link IllegalArgumentException}.
1019 * @return encrypted
1020 * {@code true} if the {@link Buffer} is encrypted, {@code false} otherwise.
1021 * @throws IllegalArgumentException
1022 * Is thrown if the given {@link Buffer} has not at least 5 bytes to read.
1023 */
1024 public static boolean isEncrypted(Buffer buffer) {
1025 if (buffer.readableBytes() < SslUtils.SSL_RECORD_HEADER_LENGTH) {
1026 throw new IllegalArgumentException(
1027 "buffer must have at least " + SslUtils.SSL_RECORD_HEADER_LENGTH + " readable bytes");
1028 }
1029 return getEncryptedPacketLength(buffer, buffer.readerOffset()) != SslUtils.NOT_ENCRYPTED;
1030 }
1031
1032 private void decodeJdkCompatible(ChannelHandlerContext ctx, Buffer in) throws NotSslRecordException {
1033 int packetLength = this.packetLength;
1034 // If we calculated the length of the current SSL record before, use that information.
1035 if (packetLength > 0) {
1036 if (in.readableBytes() < packetLength) {
1037 return;
1038 }
1039 } else {
1040 // Get the packet length and wait until we get a packets worth of data to unwrap.
1041 final int readableBytes = in.readableBytes();
1042 if (readableBytes < SslUtils.SSL_RECORD_HEADER_LENGTH) {
1043 return;
1044 }
1045 packetLength = getEncryptedPacketLength(in, in.readerOffset());
1046 if (packetLength == SslUtils.NOT_ENCRYPTED) {
1047 // Not an SSL/TLS packet
1048 NotSslRecordException e = new NotSslRecordException(
1049 "not an SSL/TLS record: " + BufferUtil.hexDump(in));
1050 in.skipReadableBytes(in.readableBytes());
1051
1052 // First fail the handshake promise as we may need to have access to the SSLEngine which may
1053 // be released because the user will remove the SslHandler in an exceptionCaught(...) implementation.
1054 setHandshakeFailure(ctx, e);
1055
1056 throw e;
1057 }
1058 assert packetLength > 0;
1059 if (packetLength > readableBytes) {
1060 // wait until the whole packet can be read
1061 this.packetLength = packetLength;
1062 return;
1063 }
1064 }
1065
1066 // Reset the state of this class so we can get the length of the next packet. We assume the entire packet will
1067 // be consumed by the SSLEngine.
1068 this.packetLength = 0;
1069 try {
1070 final int bytesConsumed = unwrap(ctx, in, packetLength);
1071 assert bytesConsumed == packetLength || engine.isInboundDone() :
1072 "we feed the SSLEngine a packets worth of data: " + packetLength + " but it only consumed: " +
1073 bytesConsumed;
1074 } catch (Throwable cause) {
1075 handleUnwrapThrowable(ctx, cause);
1076 }
1077 }
1078
1079 private void decodeNonJdkCompatible(ChannelHandlerContext ctx, Buffer in) {
1080 try {
1081 unwrap(ctx, in, in.readableBytes());
1082 } catch (Throwable cause) {
1083 handleUnwrapThrowable(ctx, cause);
1084 }
1085 }
1086
1087 private void handleUnwrapThrowable(ChannelHandlerContext ctx, Throwable cause) {
1088 try {
1089 // We should attempt to notify the handshake failure before writing any pending data. If we are in unwrap
1090 // and failed during the handshake process, and we attempt to wrap, then promises will fail, and if
1091 // listeners immediately close the Channel then we may end up firing the handshake event after the Channel
1092 // has been closed.
1093 if (handshakePromise.tryFailure(cause)) {
1094 ctx.fireChannelInboundEvent(new SslHandshakeCompletionEvent(
1095 engine().getSession(), applicationProtocol(), cause));
1096 }
1097
1098 // We need to flush one time as there may be an alert that we should send to the remote peer because
1099 // of the SSLException reported here.
1100 wrapAndFlush(ctx);
1101 } catch (SSLException ex) {
1102 logger.debug("SSLException during trying to call SSLEngine.wrap(...)" +
1103 " because of an previous SSLException, ignoring...", ex);
1104 } finally {
1105 // ensure we always flush and close the channel.
1106 setHandshakeFailure(ctx, cause, true, false, true);
1107 }
1108 PlatformDependent.throwException(cause);
1109 }
1110
1111 @Override
1112 protected void decode(ChannelHandlerContext ctx, Buffer in) throws SSLException {
1113 if (isStateSet(STATE_PROCESS_TASK)) {
1114 return;
1115 }
1116 if (jdkCompatibilityMode) {
1117 decodeJdkCompatible(ctx, in);
1118 } else {
1119 decodeNonJdkCompatible(ctx, in);
1120 }
1121 }
1122
1123 @Override
1124 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
1125 channelReadComplete0(ctx);
1126 }
1127
1128 private void channelReadComplete0(ChannelHandlerContext ctx) {
1129 // Discard bytes of the cumulation buffer if needed.
1130 discardSomeReadBytes();
1131
1132 flushIfNeeded(ctx);
1133 readIfNeeded(ctx);
1134
1135 clearState(STATE_FIRE_CHANNEL_READ);
1136 ctx.fireChannelReadComplete();
1137 }
1138
1139 private void readIfNeeded(ChannelHandlerContext ctx) {
1140 // If handshake is not finished yet, we need more data.
1141 if (!ctx.channel().getOption(ChannelOption.AUTO_READ) &&
1142 (!isStateSet(STATE_FIRE_CHANNEL_READ) || !handshakePromise.isDone())) {
1143 // No auto-read used and no message passed through the ChannelPipeline or the handshake was not complete
1144 // yet, which means we need to trigger the read to ensure we not encounter any stalls.
1145 ctx.read();
1146 }
1147 }
1148
1149 private void flushIfNeeded(ChannelHandlerContext ctx) {
1150 if (isStateSet(STATE_NEEDS_FLUSH)) {
1151 forceFlush(ctx);
1152 }
1153 }
1154
1155 /**
1156 * Calls {@link SSLEngine#unwrap(ByteBuffer, ByteBuffer)} with a null buffer to handle handshakes, etc.
1157 */
1158 private int unwrapNonAppData(ChannelHandlerContext ctx) throws SSLException {
1159 return unwrap(ctx, null, 0);
1160 }
1161
1162 /**
1163 * Unwraps inbound SSL records.
1164 */
1165 private int unwrap(ChannelHandlerContext ctx, Buffer packet, int length) throws SSLException {
1166 final int originalLength = length;
1167 boolean wrapLater = false;
1168 boolean notifyClosure = false;
1169 boolean executedRead = false;
1170 Buffer decodeOut = allocate(ctx, length);
1171 try {
1172 // Only continue to loop if the handler was not removed in the meantime.
1173 // See https://github.com/netty/netty/issues/5860
1174 do {
1175 // The engineWrapper.unwrap skips the consumed bytes in the packet buffer immediately,
1176 // which is useful in case unwrap is called in a re-entry scenario. For example LocalChannel.read()
1177 // may entry this method in a re-entry fashion and if the peer is writing into a shared buffer we may
1178 // unwrap the same data multiple times.
1179 final SSLEngineResult result = engineWrapper.unwrap(packet, length, decodeOut);
1180 final Status status = result.getStatus();
1181 final HandshakeStatus handshakeStatus = result.getHandshakeStatus();
1182 final int produced = result.bytesProduced();
1183 final int consumed = result.bytesConsumed();
1184 length -= consumed;
1185
1186 // The expected sequence of events is:
1187 // 1. Notify of handshake success
1188 // 2. fireChannelRead for unwrapped data
1189 if (handshakeStatus == HandshakeStatus.FINISHED || handshakeStatus == HandshakeStatus.NOT_HANDSHAKING) {
1190 wrapLater |= (decodeOut.readableBytes() > 0 ?
1191 setHandshakeSuccessUnwrapMarkReentry() : setHandshakeSuccess()) ||
1192 handshakeStatus == HandshakeStatus.FINISHED;
1193 }
1194
1195 // Dispatch decoded data after we have notified of handshake success. If this method has been invoked
1196 // in a re-entry fashion we execute a task on the executor queue to process after the stack unwinds
1197 // to preserve order of events.
1198 if (decodeOut.readableBytes() > 0) {
1199 setState(STATE_FIRE_CHANNEL_READ);
1200 if (isStateSet(STATE_UNWRAP_REENTRY)) {
1201 executedRead = true;
1202 executeChannelRead(ctx, decodeOut);
1203 } else {
1204 ctx.fireChannelRead(decodeOut);
1205 }
1206 decodeOut = null;
1207 }
1208
1209 if (status == Status.CLOSED) {
1210 notifyClosure = true; // Notify about the CLOSED state of the SSLEngine. See #137
1211 } else if (status == Status.BUFFER_OVERFLOW) {
1212 if (decodeOut != null) {
1213 decodeOut.close();
1214 }
1215 final int applicationBufferSize = engine.getSession().getApplicationBufferSize();
1216 // Allocate a new buffer which can hold all the rest data and loop again.
1217 // It may happen that applicationBufferSize < produced while there is still more to unwrap, in this
1218 // case we will just allocate a new buffer with the capacity of applicationBufferSize and call
1219 // unwrap again.
1220 decodeOut = allocate(ctx, engineType.calculatePendingData(this, applicationBufferSize < produced ?
1221 applicationBufferSize : applicationBufferSize - produced));
1222 continue;
1223 }
1224
1225 if (handshakeStatus == HandshakeStatus.NEED_TASK) {
1226 boolean pending = runDelegatedTasks(true);
1227 if (!pending) {
1228 // We scheduled a task on the delegatingTaskExecutor, so stop processing as we will
1229 // resume once the task completes.
1230 //
1231 // We break out of the loop only and do NOT return here as we still may need to notify
1232 // about the closure of the SSLEngine.
1233 wrapLater = false;
1234 break;
1235 }
1236 } else if (handshakeStatus == HandshakeStatus.NEED_WRAP) {
1237 // If the wrap operation transitions the status to NOT_HANDSHAKING and there is no more data to
1238 // unwrap then the next call to unwrap will not produce any data. We can avoid the potentially
1239 // costly unwrap operation and break out of the loop.
1240 if (wrapNonAppData(ctx, true) && length == 0) {
1241 break;
1242 }
1243 }
1244
1245 if (status == Status.BUFFER_UNDERFLOW ||
1246 // If we processed NEED_TASK we should try again even we did not consume or produce anything.
1247 handshakeStatus != HandshakeStatus.NEED_TASK &&
1248 (consumed == 0 && produced == 0 ||
1249 length == 0 && handshakeStatus == HandshakeStatus.NOT_HANDSHAKING)) {
1250 if (handshakeStatus == HandshakeStatus.NEED_UNWRAP) {
1251 // The underlying engine is starving so we need to feed it with more data.
1252 // See https://github.com/netty/netty/pull/5039
1253 readIfNeeded(ctx);
1254 }
1255
1256 break;
1257 } else if (decodeOut == null) {
1258 decodeOut = allocate(ctx, length);
1259 }
1260 } while (!ctx.isRemoved());
1261
1262 if (isStateSet(STATE_FLUSHED_BEFORE_HANDSHAKE) && handshakePromise.isDone()) {
1263 // We need to call wrap(...) in case there was a flush done before the handshake completed to ensure
1264 // we do not stale.
1265 //
1266 // See https://github.com/netty/netty/pull/2437
1267 clearState(STATE_FLUSHED_BEFORE_HANDSHAKE);
1268 wrapLater = true;
1269 }
1270
1271 if (wrapLater) {
1272 wrap(ctx, true);
1273 }
1274 } finally {
1275 if (decodeOut != null) {
1276 decodeOut.close();
1277 }
1278
1279 if (notifyClosure) {
1280 if (executedRead) {
1281 executeNotifyClosePromise(ctx);
1282 } else {
1283 notifyClosePromise(null);
1284 }
1285 }
1286 }
1287 return originalLength - length;
1288 }
1289
1290 private boolean setHandshakeSuccessUnwrapMarkReentry() {
1291 // setHandshakeSuccess calls out to external methods which may trigger re-entry. We need to preserve ordering of
1292 // fireChannelRead for decodeOut relative to re-entry data.
1293 final boolean setReentryState = !isStateSet(STATE_UNWRAP_REENTRY);
1294 if (setReentryState) {
1295 setState(STATE_UNWRAP_REENTRY);
1296 }
1297 try {
1298 return setHandshakeSuccess();
1299 } finally {
1300 // It is unlikely this specific method will be re-entry because handshake completion is infrequent, but just
1301 // in case we only clear the state if we set it in the first place.
1302 if (setReentryState) {
1303 clearState(STATE_UNWRAP_REENTRY);
1304 }
1305 }
1306 }
1307
1308 private void executeNotifyClosePromise(final ChannelHandlerContext ctx) {
1309 try {
1310 ctx.executor().execute(() -> notifyClosePromise(null));
1311 } catch (RejectedExecutionException e) {
1312 notifyClosePromise(e);
1313 }
1314 }
1315
1316 private static void executeChannelRead(final ChannelHandlerContext ctx, final Buffer decodedOut) {
1317 try {
1318 ctx.executor().execute(() -> ctx.fireChannelRead(decodedOut));
1319 } catch (RejectedExecutionException e) {
1320 decodedOut.close();
1321 throw e;
1322 }
1323 }
1324
1325 private static boolean inEventLoop(Executor executor) {
1326 return executor instanceof EventExecutor && ((EventExecutor) executor).inEventLoop();
1327 }
1328
1329 /**
1330 * Will either run the delegated task directly calling {@link Runnable#run()} and return {@code true} or will
1331 * offload the delegated task using {@link Executor#execute(Runnable)} and return {@code false}.
1332 *
1333 * If the task is offloaded it will take care to resume its work on the {@link EventExecutor} once there are no
1334 * more tasks to process.
1335 */
1336 private boolean runDelegatedTasks(boolean inUnwrap) {
1337 if (delegatedTaskExecutor == ImmediateExecutor.INSTANCE || inEventLoop(delegatedTaskExecutor)) {
1338 // We should run the task directly in the EventExecutor thread and not offload at all. As we are on the
1339 // EventLoop we can just run all tasks at once.
1340 for (;;) {
1341 Runnable task = engine.getDelegatedTask();
1342 if (task == null) {
1343 return true;
1344 }
1345 setState(STATE_PROCESS_TASK);
1346 if (task instanceof AsyncRunnable) {
1347 // Let's set the task to processing task before we try to execute it.
1348 boolean pending = false;
1349 try {
1350 AsyncRunnable asyncTask = (AsyncRunnable) task;
1351 AsyncTaskCompletionHandler completionHandler = new AsyncTaskCompletionHandler(inUnwrap);
1352 asyncTask.run(completionHandler);
1353 pending = completionHandler.resumeLater();
1354 if (pending) {
1355 return false;
1356 }
1357 } finally {
1358 if (!pending) {
1359 // The task has completed, lets clear the state. If it is not completed we will clear the
1360 // state once it is.
1361 clearState(STATE_PROCESS_TASK);
1362 }
1363 }
1364 } else {
1365 try {
1366 task.run();
1367 } finally {
1368 clearState(STATE_PROCESS_TASK);
1369 }
1370 }
1371 }
1372 } else {
1373 executeDelegatedTask(inUnwrap);
1374 return false;
1375 }
1376 }
1377
1378 private SslTasksRunner getTaskRunner(boolean inUnwrap) {
1379 return inUnwrap ? sslTaskRunnerForUnwrap : sslTaskRunner;
1380 }
1381
1382 private void executeDelegatedTask(boolean inUnwrap) {
1383 executeDelegatedTask(getTaskRunner(inUnwrap));
1384 }
1385
1386 private void executeDelegatedTask(SslTasksRunner task) {
1387 setState(STATE_PROCESS_TASK);
1388 try {
1389 delegatedTaskExecutor.execute(task);
1390 } catch (RejectedExecutionException e) {
1391 clearState(STATE_PROCESS_TASK);
1392 throw e;
1393 }
1394 }
1395
1396 private final class AsyncTaskCompletionHandler implements Runnable {
1397 private final boolean inUnwrap;
1398 boolean didRun;
1399 boolean resumeLater;
1400
1401 AsyncTaskCompletionHandler(boolean inUnwrap) {
1402 this.inUnwrap = inUnwrap;
1403 }
1404
1405 @Override
1406 public void run() {
1407 didRun = true;
1408 if (resumeLater) {
1409 getTaskRunner(inUnwrap).runComplete();
1410 }
1411 }
1412
1413 boolean resumeLater() {
1414 if (!didRun) {
1415 resumeLater = true;
1416 return true;
1417 }
1418 return false;
1419 }
1420 }
1421
1422 /**
1423 * {@link Runnable} that will be scheduled on the {@code delegatedTaskExecutor} and will take care
1424 * of resume work on the {@link EventExecutor} once the task was executed.
1425 */
1426 private final class SslTasksRunner implements Runnable {
1427 private final boolean inUnwrap;
1428 private final Runnable runCompleteTask = this::runComplete;
1429
1430 SslTasksRunner(boolean inUnwrap) {
1431 this.inUnwrap = inUnwrap;
1432 }
1433
1434 // Handle errors which happened during task processing.
1435 private void taskError(Throwable e) {
1436 if (inUnwrap) {
1437 // As the error happened while the task was scheduled as part of unwrap(...) we also need to ensure
1438 // we fire it through the pipeline as inbound error to be consistent with what we do in decode(...).
1439 //
1440 // This will also ensure we fail the handshake future and flush all produced data.
1441 try {
1442 handleUnwrapThrowable(ctx, e);
1443 } catch (Throwable cause) {
1444 safeExceptionCaught(cause);
1445 }
1446 } else {
1447 setHandshakeFailure(ctx, e);
1448 forceFlush(ctx);
1449 }
1450 }
1451
1452 // Try to call exceptionCaught(...)
1453 private void safeExceptionCaught(Throwable cause) {
1454 try {
1455 channelExceptionCaught(ctx, wrapIfNeeded(cause));
1456 } catch (Throwable error) {
1457 ctx.fireChannelExceptionCaught(error);
1458 }
1459 }
1460
1461 private Throwable wrapIfNeeded(Throwable cause) {
1462 if (!inUnwrap) {
1463 // If we are not in unwrap(...) we can just rethrow without wrapping at all.
1464 return cause;
1465 }
1466 // As the exception would have been triggered by an inbound operation we will need to wrap it in a
1467 // DecoderException to mimic what a decoder would do when decode(...) throws.
1468 return cause instanceof DecoderException ? cause : new DecoderException(cause);
1469 }
1470
1471 private void tryDecodeAgain() {
1472 try {
1473 channelRead(ctx, ctx.bufferAllocator().allocate(0));
1474 } catch (Throwable cause) {
1475 safeExceptionCaught(cause);
1476 } finally {
1477 // As we called channelRead(...) we also need to call channelReadComplete(...) which
1478 // will ensure we either call ctx.fireChannelReadComplete() or will trigger a ctx.read() if
1479 // more data is needed.
1480 channelReadComplete0(ctx);
1481 }
1482 }
1483
1484 /**
1485 * Executed after the wrapped {@code task} was executed via {@code delegatedTaskExecutor} to resume work
1486 * on the {@link EventExecutor}.
1487 */
1488 private void resumeOnEventExecutor() {
1489 assert ctx.executor().inEventLoop();
1490 clearState(STATE_PROCESS_TASK);
1491 try {
1492 HandshakeStatus status = engine.getHandshakeStatus();
1493 switch (status) {
1494 // There is another task that needs to be executed and offloaded to the delegatingTaskExecutor as
1495 // a result of this. Let's reschedule....
1496 case NEED_TASK:
1497 executeDelegatedTask(this);
1498
1499 break;
1500
1501 // The handshake finished, lets notify about the completion of it and resume processing.
1502 case FINISHED:
1503 // Not handshaking anymore, lets notify about the completion if not done yet and resume processing.
1504 case NOT_HANDSHAKING:
1505 setHandshakeSuccess(); // NOT_HANDSHAKING -> workaround for android skipping FINISHED state.
1506 try {
1507 // Lets call wrap to ensure we produce the alert if there is any pending and also to
1508 // ensure we flush any queued data..
1509 wrap(ctx, inUnwrap);
1510 } catch (Throwable e) {
1511 taskError(e);
1512 return;
1513 }
1514 if (inUnwrap) {
1515 // If we were in the unwrap call when the task was processed we should also try to unwrap
1516 // non app data first as there may not anything left in the inbound buffer to process.
1517 unwrapNonAppData(ctx);
1518 }
1519
1520 // Flush now as we may have written some data as part of the wrap call.
1521 forceFlush(ctx);
1522
1523 tryDecodeAgain();
1524 break;
1525
1526 // We need more data so lets try to unwrap first and then call decode again which will feed us
1527 // with buffered data (if there is any).
1528 case NEED_UNWRAP:
1529 try {
1530 unwrapNonAppData(ctx);
1531 } catch (SSLException e) {
1532 handleUnwrapThrowable(ctx, e);
1533 return;
1534 }
1535 tryDecodeAgain();
1536 break;
1537
1538 // To make progress we need to call SSLEngine.wrap(...) which may produce more output data
1539 // that will be written to the Channel.
1540 case NEED_WRAP:
1541 try {
1542 if (!wrapNonAppData(ctx, false) && inUnwrap) {
1543 // The handshake finished in wrapNonAppData(...), we need to try call
1544 // unwrapNonAppData(...) as we may have some alert that we should read.
1545 //
1546 // This mimics what we would do when we are calling this method while in unwrap(...).
1547 unwrapNonAppData(ctx);
1548 }
1549
1550 // Flush now as we may have written some data as part of the wrap call.
1551 forceFlush(ctx);
1552 } catch (Throwable e) {
1553 taskError(e);
1554 return;
1555 }
1556
1557 // Now try to feed in more data that we have buffered.
1558 tryDecodeAgain();
1559 break;
1560
1561 default:
1562 // Should never reach here as we handle all cases.
1563 throw new AssertionError();
1564 }
1565 } catch (Throwable cause) {
1566 safeExceptionCaught(cause);
1567 }
1568 }
1569
1570 void runComplete() {
1571 EventExecutor executor = ctx.executor();
1572 // Jump back on the EventExecutor. We do this even if we are already on the EventLoop to guard against
1573 // reentrancy issues. Failing to do so could lead to the situation of tryDecode(...) be called and so
1574 // channelRead(...) while still in the decode loop. In this case channelRead(...) might release the input
1575 // buffer if its empty which would then result in an IllegalReferenceCountException when we try to continue
1576 // decoding.
1577 //
1578 // See https://github.com/netty/netty-tcnative/issues/680
1579 executor.execute(this::resumeOnEventExecutor);
1580 }
1581
1582 @Override
1583 public void run() {
1584 try {
1585 Runnable task = engine.getDelegatedTask();
1586 if (task == null) {
1587 // The task was processed in the meantime. Let's just return.
1588 return;
1589 }
1590 if (task instanceof AsyncRunnable) {
1591 AsyncRunnable asyncTask = (AsyncRunnable) task;
1592 asyncTask.run(runCompleteTask);
1593 } else {
1594 task.run();
1595 runComplete();
1596 }
1597 } catch (Throwable cause) {
1598 handleException(cause);
1599 }
1600 }
1601
1602 private void handleException(final Throwable cause) {
1603 EventExecutor executor = ctx.executor();
1604 if (executor.inEventLoop()) {
1605 clearState(STATE_PROCESS_TASK);
1606 safeExceptionCaught(cause);
1607 } else {
1608 try {
1609 executor.execute(() -> {
1610 clearState(STATE_PROCESS_TASK);
1611 safeExceptionCaught(cause);
1612 });
1613 } catch (RejectedExecutionException ignore) {
1614 clearState(STATE_PROCESS_TASK);
1615 // the context itself will handle the rejected exception when try to schedule the operation so
1616 // ignore the RejectedExecutionException
1617 ctx.fireChannelExceptionCaught(cause);
1618 }
1619 }
1620 }
1621 }
1622
1623 /**
1624 * Notify all the handshake futures about the successfully handshake
1625 * @return {@code true} if {@link #handshakePromise} was set successfully and a {@link SslHandshakeCompletionEvent}
1626 * was fired. {@code false} otherwise.
1627 */
1628 private boolean setHandshakeSuccess() {
1629 // Our control flow may invoke this method multiple times for a single FINISHED event. For example
1630 // wrapNonAppData may drain pendingUnencryptedWrites in wrap which transitions to handshake from FINISHED to
1631 // NOT_HANDSHAKING which invokes setHandshakeSuccess, and then wrapNonAppData also directly invokes this method.
1632 final boolean notified = !handshakePromise.isDone() && handshakePromise.trySuccess(ctx.channel());
1633 if (notified) {
1634 if (logger.isDebugEnabled()) {
1635 SSLSession session = engine.getSession();
1636 logger.debug(
1637 "{} HANDSHAKEN: protocol:{} cipher suite:{}",
1638 ctx.channel(),
1639 session.getProtocol(),
1640 session.getCipherSuite());
1641 }
1642 ctx.fireChannelInboundEvent(
1643 new SslHandshakeCompletionEvent(engine().getSession(), applicationProtocol()));
1644 }
1645 if (isStateSet(STATE_READ_DURING_HANDSHAKE)) {
1646 clearState(STATE_READ_DURING_HANDSHAKE);
1647 if (!ctx.channel().getOption(ChannelOption.AUTO_READ)) {
1648 ctx.read();
1649 }
1650 }
1651 return notified;
1652 }
1653
1654 /**
1655 * Notify all the handshake futures about the failure during the handshake.
1656 */
1657 private void setHandshakeFailure(ChannelHandlerContext ctx, Throwable cause) {
1658 setHandshakeFailure(ctx, cause, true, true, false);
1659 }
1660
1661 /**
1662 * Notify all the handshake futures about the failure during the handshake.
1663 */
1664 private void setHandshakeFailure(ChannelHandlerContext ctx, Throwable cause, boolean closeInbound,
1665 boolean notify, boolean alwaysFlushAndClose) {
1666 try {
1667 // Release all resources such as internal buffers that SSLEngine is managing.
1668 setState(STATE_OUTBOUND_CLOSED);
1669 engine.closeOutbound();
1670
1671 if (closeInbound) {
1672 try {
1673 engine.closeInbound();
1674 } catch (SSLException e) {
1675 if (logger.isDebugEnabled()) {
1676 // only log in debug mode as it most likely harmless and latest chrome still trigger
1677 // this all the time.
1678 //
1679 // See https://github.com/netty/netty/issues/1340
1680 String msg = e.getMessage();
1681 if (msg == null || !(msg.contains("possible truncation attack") ||
1682 msg.contains("closing inbound before receiving peer's close_notify"))) {
1683 logger.debug("{} SSLEngine.closeInbound() raised an exception.", ctx.channel(), e);
1684 }
1685 }
1686 }
1687 }
1688 if (handshakePromise.tryFailure(cause) || alwaysFlushAndClose) {
1689 SslUtils.handleHandshakeFailure(
1690 ctx, engine().getSession(), applicationProtocol(), cause, notify);
1691 }
1692 } finally {
1693 // Ensure we remove and fail all pending writes in all cases and so release memory quickly.
1694 releaseAndFailAll(ctx, cause);
1695 }
1696 }
1697
1698 private void setHandshakeFailureTransportFailure(ChannelHandlerContext ctx, Throwable cause) {
1699 // If TLS control frames fail to write we are in an unknown state and may become out of
1700 // sync with our peer. We give up and close the channel. This will also take care of
1701 // cleaning up any outstanding state (e.g. handshake promise, queued unencrypted data).
1702 try {
1703 SSLException transportFailure = new SSLException("failure when writing TLS control frames", cause);
1704 releaseAndFailAll(ctx, transportFailure);
1705 if (handshakePromise.tryFailure(transportFailure)) {
1706 ctx.fireChannelInboundEvent(new SslHandshakeCompletionEvent(
1707 engine().getSession(), applicationProtocol(), transportFailure));
1708 }
1709 } finally {
1710 ctx.close();
1711 }
1712 }
1713
1714 private void releaseAndFailAll(ChannelHandlerContext ctx, Throwable cause) {
1715 if (pendingUnencryptedWrites != null) {
1716 pendingUnencryptedWrites.releaseAndFailAll(ctx, cause);
1717 }
1718 }
1719
1720 private void notifyClosePromise(Throwable cause) {
1721 if (cause == null) {
1722 if (sslClosePromise.trySuccess(ctx.channel())) {
1723 ctx.fireChannelInboundEvent(new SslCloseCompletionEvent(engine().getSession()));
1724 }
1725 } else {
1726 if (sslClosePromise.tryFailure(cause)) {
1727 ctx.fireChannelInboundEvent(new SslCloseCompletionEvent(engine().getSession(), cause));
1728 }
1729 }
1730 }
1731
1732 private Future<Void> closeOutboundAndChannel(
1733 final ChannelHandlerContext ctx, boolean disconnect) {
1734 setState(STATE_OUTBOUND_CLOSED);
1735 engine.closeOutbound();
1736
1737 if (!ctx.channel().isActive()) {
1738 if (disconnect) {
1739 return ctx.disconnect();
1740 }
1741 return ctx.close();
1742 }
1743 Promise<Void> promise = ctx.newPromise();
1744 Promise<Void> closeNotifyPromise = ctx.newPromise();
1745 try {
1746 flush(ctx, closeNotifyPromise);
1747 } finally {
1748 if (!isStateSet(STATE_CLOSE_NOTIFY)) {
1749 setState(STATE_CLOSE_NOTIFY);
1750 // It's important that we do not pass the original Promise to safeClose(...) as when flush(....)
1751 // throws an Exception it will be propagated to the AbstractChannelHandlerContext which will try
1752 // to fail the promise because of this. This will then fail as it was already completed by
1753 // safeClose(...). We create a new Promise and try to notify the original Promise
1754 // once it is complete. If we fail to do so we just ignore it as in this case it was failed already
1755 // because of a propagated Exception.
1756 //
1757 // See https://github.com/netty/netty/issues/5931
1758 Promise<Void> cascade = ctx.newPromise();
1759 cascade.asFuture().cascadeTo(promise);
1760 safeClose(ctx, closeNotifyPromise.asFuture(), cascade);
1761 } else {
1762 /// We already handling the close_notify so just attach the promise to the sslClosePromise.
1763 sslCloseFuture().addListener(future -> promise.setSuccess(null));
1764 }
1765 }
1766 return promise.asFuture();
1767 }
1768
1769 private void flush(ChannelHandlerContext ctx, Promise<Void> promise) {
1770 if (pendingUnencryptedWrites != null) {
1771 pendingUnencryptedWrites.add(ctx.bufferAllocator().allocate(0), promise);
1772 } else {
1773 promise.setFailure(newPendingWritesNullException());
1774 }
1775 flush(ctx);
1776 }
1777
1778 @Override
1779 public void handlerAdded0(final ChannelHandlerContext ctx) throws Exception {
1780 this.ctx = ctx;
1781
1782 Channel channel = ctx.channel();
1783 pendingUnencryptedWrites = new SslHandlerCoalescingBufferQueue(16);
1784
1785 setOpensslEngineSocketFd(channel);
1786 boolean fastOpen = channel.isOptionSupported(ChannelOption.TCP_FASTOPEN_CONNECT) &&
1787 Boolean.TRUE.equals(channel.getOption(ChannelOption.TCP_FASTOPEN_CONNECT));
1788 boolean active = channel.isActive();
1789 if (active || fastOpen) {
1790 // Explicitly flush the handshake only if the channel is already active.
1791 // With TCP Fast Open, we write to the outbound buffer before the TCP connect is established.
1792 // The buffer will then be flushed as part of establishing the connection, saving us a round-trip.
1793 startHandshakeProcessing(active);
1794 // If we weren't able to include client_hello in the TCP SYN (e.g. no token, disabled at the OS) we have to
1795 // flush pending data in the outbound buffer later in channelActive().
1796 if (fastOpen) {
1797 setState(STATE_NEEDS_FLUSH);
1798 }
1799 }
1800 }
1801
1802 private void startHandshakeProcessing(boolean flushAtEnd) {
1803 if (!isStateSet(STATE_HANDSHAKE_STARTED)) {
1804 setState(STATE_HANDSHAKE_STARTED);
1805 if (engine.getUseClientMode()) {
1806 // Begin the initial handshake.
1807 // channelActive() event has been fired already, which means this.channelActive() will
1808 // not be invoked. We have to initialize here instead.
1809 handshake(flushAtEnd);
1810 }
1811 applyHandshakeTimeout();
1812 } else if (isStateSet(STATE_NEEDS_FLUSH)) {
1813 forceFlush(ctx);
1814 }
1815 }
1816
1817 /**
1818 * Performs TLS renegotiation.
1819 */
1820 public Future<Channel> renegotiate() {
1821 ChannelHandlerContext ctx = this.ctx;
1822 if (ctx == null) {
1823 throw new IllegalStateException();
1824 }
1825
1826 return renegotiate(ctx.executor().newPromise());
1827 }
1828
1829 /**
1830 * Performs TLS renegotiation.
1831 */
1832 public Future<Channel> renegotiate(final Promise<Channel> promise) {
1833 requireNonNull(promise, "promise");
1834
1835 ChannelHandlerContext ctx = this.ctx;
1836 if (ctx == null) {
1837 throw new IllegalStateException();
1838 }
1839
1840 EventExecutor executor = ctx.executor();
1841 if (!executor.inEventLoop()) {
1842 executor.execute(() -> renegotiateOnEventLoop(promise));
1843 return promise.asFuture();
1844 }
1845
1846 renegotiateOnEventLoop(promise);
1847 return promise.asFuture();
1848 }
1849
1850 private void renegotiateOnEventLoop(final Promise<Channel> newHandshakePromise) {
1851 Future<Channel> oldHandshakePromise = handshakeFuture();
1852 if (!oldHandshakePromise.isDone()) {
1853 // There's no need to handshake because handshake is in progress already.
1854 // Merge the new promise into the old one.
1855 oldHandshakePromise.cascadeTo(newHandshakePromise);
1856 } else {
1857 handshakePromise = newHandshakePromise;
1858 handshake(true);
1859 applyHandshakeTimeout();
1860 }
1861 }
1862
1863 /**
1864 * Performs TLS (re)negotiation.
1865 * @param flushAtEnd Set to {@code true} if the outbound buffer should be flushed (written to the network) at the
1866 * end. Set to {@code false} if the handshake will be flushed later, e.g. as part of TCP Fast Open
1867 * connect.
1868 */
1869 private void handshake(boolean flushAtEnd) {
1870 if (engine.getHandshakeStatus() != HandshakeStatus.NOT_HANDSHAKING) {
1871 // Not all SSLEngine implementations support calling beginHandshake multiple times while a handshake
1872 // is in progress. See https://github.com/netty/netty/issues/4718.
1873 return;
1874 }
1875 if (handshakePromise.isDone()) {
1876 // If the handshake is done already lets just return directly as there is no need to trigger it again.
1877 // This can happen if the handshake(...) was triggered before we called channelActive(...) by a
1878 // flush() that was triggered by a FutureListener that was added to the Future returned
1879 // from the connect(...) method. In this case we will see the flush() happen before we had a chance to
1880 // call fireChannelActive() on the pipeline.
1881 return;
1882 }
1883
1884 // Begin handshake.
1885 final ChannelHandlerContext ctx = this.ctx;
1886 try {
1887 engine.beginHandshake();
1888 wrapNonAppData(ctx, false);
1889 } catch (Throwable e) {
1890 setHandshakeFailure(ctx, e);
1891 } finally {
1892 if (flushAtEnd) {
1893 forceFlush(ctx);
1894 }
1895 }
1896 }
1897
1898 private void applyHandshakeTimeout() {
1899 final Promise<Channel> localHandshakePromise = handshakePromise;
1900
1901 // Set timeout if necessary.
1902 final long handshakeTimeoutMillis = this.handshakeTimeoutMillis;
1903 if (handshakeTimeoutMillis <= 0 || localHandshakePromise.isDone()) {
1904 return;
1905 }
1906
1907 Future<?> timeoutFuture = ctx.executor().schedule(() -> {
1908 if (localHandshakePromise.isDone()) {
1909 return;
1910 }
1911
1912 SSLException exception =
1913 new SslHandshakeTimeoutException("handshake timed out after " + handshakeTimeoutMillis + "ms");
1914 try {
1915 if (localHandshakePromise.tryFailure(exception)) {
1916 SslUtils.handleHandshakeFailure(
1917 ctx, engine().getSession(), applicationProtocol(), exception, true);
1918 }
1919 } finally {
1920 releaseAndFailAll(ctx, exception);
1921 }
1922 }, handshakeTimeoutMillis, TimeUnit.MILLISECONDS);
1923
1924 // Cancel the handshake timeout when handshake is finished.
1925 localHandshakePromise.asFuture().addListener(f -> timeoutFuture.cancel());
1926 }
1927
1928 private void forceFlush(ChannelHandlerContext ctx) {
1929 clearState(STATE_NEEDS_FLUSH);
1930 ctx.flush();
1931 }
1932
1933 private void setOpensslEngineSocketFd(Channel c) {
1934 if (c instanceof UnixChannel && engine instanceof ReferenceCountedOpenSslEngine) {
1935 ((ReferenceCountedOpenSslEngine) engine).bioSetFd(((UnixChannel) c).fd().intValue());
1936 }
1937 }
1938
1939 /**
1940 * Issues an initial TLS handshake once connected when used in client-mode
1941 */
1942 @Override
1943 public void channelActive(final ChannelHandlerContext ctx) throws Exception {
1944 setOpensslEngineSocketFd(ctx.channel());
1945 if (!startTls) {
1946 startHandshakeProcessing(true);
1947 }
1948 ctx.fireChannelActive();
1949 }
1950
1951 private void safeClose(
1952 final ChannelHandlerContext ctx, final Future<Void> flushFuture,
1953 final Promise<Void> promise) {
1954 if (!ctx.channel().isActive()) {
1955 ctx.close().cascadeTo(promise);
1956 return;
1957 }
1958
1959 Future<?> timeoutFuture;
1960 if (!flushFuture.isDone()) {
1961 long closeNotifyTimeout = closeNotifyFlushTimeoutMillis;
1962 if (closeNotifyTimeout > 0) {
1963 // Force-close the connection if close_notify is not fully sent in time.
1964 timeoutFuture = ctx.executor().schedule(() -> {
1965 // May be done in the meantime as cancel(...) is only best effort.
1966 if (!flushFuture.isDone()) {
1967 logger.warn("{} Last write attempt timed out; force-closing the connection.",
1968 ctx.channel());
1969 addCloseListener(ctx.close(), promise);
1970 }
1971 }, closeNotifyTimeout, TimeUnit.MILLISECONDS);
1972 } else {
1973 timeoutFuture = null;
1974 }
1975 } else {
1976 timeoutFuture = null;
1977 }
1978
1979 // Close the connection if close_notify is sent in time.
1980 flushFuture.addListener(f -> {
1981 if (timeoutFuture != null) {
1982 timeoutFuture.cancel();
1983 }
1984 final long closeNotifyReadTimeout = closeNotifyReadTimeoutMillis;
1985 if (closeNotifyReadTimeout <= 0) {
1986 if (ctx.channel().isActive()) {
1987 // Trigger the close in all cases to make sure the promise is notified
1988 // See https://github.com/netty/netty/issues/2358
1989 addCloseListener(ctx.close(), promise);
1990 } else {
1991 promise.trySuccess(null);
1992 }
1993 } else {
1994 Future<?> closeNotifyReadTimeoutFuture;
1995
1996 Future<Channel> closeFuture = sslCloseFuture();
1997
1998 if (!closeFuture.isDone()) {
1999 closeNotifyReadTimeoutFuture = ctx.executor().schedule(() -> {
2000 if (!closeFuture.isDone()) {
2001 logger.debug(
2002 "{} did not receive close_notify in {}ms; force-closing the connection.",
2003 ctx.channel(), closeNotifyReadTimeout);
2004
2005 // Do the close now...
2006 addCloseListener(ctx.close(), promise);
2007 }
2008 }, closeNotifyReadTimeout, TimeUnit.MILLISECONDS);
2009 } else {
2010 closeNotifyReadTimeoutFuture = null;
2011 }
2012
2013 // Do the close once we received the close_notify.
2014 closeFuture.addListener(future -> {
2015 if (closeNotifyReadTimeoutFuture != null) {
2016 closeNotifyReadTimeoutFuture.cancel();
2017 }
2018 if (ctx.channel().isActive()) {
2019 addCloseListener(ctx.close(), promise);
2020 } else {
2021 promise.trySuccess(null);
2022 }
2023 });
2024 }
2025 });
2026 }
2027
2028 private static void addCloseListener(Future<Void> future, Promise<Void> promise) {
2029 // We notify the promise in the PromiseNotifier as there is a "race" where the close(...) call
2030 // by the timeoutFuture and the close call in the flushFuture listener will be called. Because of
2031 // this we need to use trySuccess() and tryFailure(...) as otherwise we can cause an
2032 // IllegalStateException.
2033 // Also we not want to log if the notification happens as this is expected in some cases.
2034 // See https://github.com/netty/netty/issues/5598
2035 future.cascadeTo(promise);
2036 }
2037
2038 /**
2039 * Allocate buffer of the right direct/not-direct type that the SSLEngine prefers.
2040 */
2041 private Buffer allocate(ChannelHandlerContext ctx, int capacity) {
2042 BufferAllocator alloc = ctx.bufferAllocator();
2043
2044 boolean hasDirect = alloc.getAllocationType().isDirect();
2045 boolean wantsDirect = engineType.wantsDirectBuffer;
2046 if (hasDirect && !wantsDirect) {
2047 alloc = DefaultBufferAllocators.onHeapAllocator();
2048 } else if (!hasDirect && wantsDirect) {
2049 alloc = DefaultBufferAllocators.offHeapAllocator();
2050 }
2051
2052 return alloc.allocate(capacity);
2053 }
2054
2055 /**
2056 * Allocates an outbound network buffer for {@link SSLEngine#wrap(ByteBuffer, ByteBuffer)} which can encrypt
2057 * the specified amount of pending bytes.
2058 */
2059 private Buffer allocateOutNetBuf(ChannelHandlerContext ctx, int pendingBytes, int numComponents) {
2060 return engineType.allocateWrapBuffer(this, ctx.bufferAllocator(), pendingBytes, numComponents);
2061 }
2062
2063 private boolean isStateSet(int bit) {
2064 return (state & bit) == bit;
2065 }
2066
2067 private void setState(int bit) {
2068 state |= bit;
2069 }
2070
2071 private void clearState(int bit) {
2072 state &= ~bit;
2073 }
2074
2075 @Override
2076 public long pendingOutboundBytes(ChannelHandlerContext ctx) {
2077 return pendingUnencryptedWrites == null ? 0 : pendingUnencryptedWrites.readableBytes();
2078 }
2079
2080 /**
2081 * Each call to SSL_write will introduce about ~100 bytes of overhead. This coalescing queue attempts to increase
2082 * goodput by aggregating the plaintext in chunks of {@link #wrapDataSize}. If many small chunks are written
2083 * this can increase goodput, decrease the amount of calls to SSL_write, and decrease overall encryption operations.
2084 */
2085 private final class SslHandlerCoalescingBufferQueue extends AbstractCoalescingBufferQueue {
2086
2087 SslHandlerCoalescingBufferQueue(int initSize) {
2088 super(initSize);
2089 }
2090
2091 @Override
2092 protected Buffer compose(BufferAllocator alloc, Buffer cumulation, Buffer next) {
2093 if (cumulation.writableBytes() >= next.readableBytes()) {
2094 cumulation.writeBytes(next);
2095 next.close();
2096 return cumulation;
2097 }
2098 if (cumulation instanceof CompositeBuffer) {
2099 CompositeBuffer composite = (CompositeBuffer) cumulation;
2100 if (next.readableBytes() < wrapDataSize / 2) {
2101 composite.ensureWritable(wrapDataSize);
2102 composite.writeBytes(next);
2103 next.close();
2104 } else {
2105 composite.extendWith(next.send());
2106 }
2107 return composite;
2108 }
2109 int minIncrement = 2 * wrapDataSize; // Amortise cost of allocation.
2110 cumulation = copyAndCompose(alloc, cumulation, next, minIncrement);
2111 return cumulation;
2112 }
2113
2114 @Override
2115 protected Buffer composeFirst(BufferAllocator allocator, Buffer first) {
2116 if (first instanceof CompositeBuffer) {
2117 CompositeBuffer composite = (CompositeBuffer) first;
2118 if (engineType.wantsDirectBuffer && !allocator.getAllocationType().isDirect()) {
2119 first = DefaultBufferAllocators.offHeapAllocator().allocate(composite.readableBytes());
2120 } else {
2121 first = allocator.allocate(composite.readableBytes());
2122 }
2123 try {
2124 first.writeBytes(composite);
2125 } catch (Throwable cause) {
2126 first.close();
2127 throw cause;
2128 }
2129 composite.close();
2130 }
2131 return first;
2132 }
2133
2134 @Override
2135 protected Buffer removeEmptyValue() {
2136 return null;
2137 }
2138 }
2139
2140 private final class LazyPromise extends DefaultPromise<Channel> {
2141
2142 LazyPromise() {
2143 super(ImmediateEventExecutor.INSTANCE);
2144 }
2145
2146 @Override
2147 protected void checkDeadLock() {
2148 if (ctx == null) {
2149 // If ctx is null the handlerAdded(...) callback was not called, in this case the checkDeadLock()
2150 // method was called from another Thread then the one that is used by ctx.executor(). We need to
2151 // guard against this as a user can see a race if handshakeFuture().sync() is called but the
2152 // handlerAdded(..) method was not yet as it is called from the EventExecutor of the
2153 // ChannelHandlerContext.
2154 return;
2155 }
2156 checkDeadLock(ctx.executor());
2157 }
2158 }
2159 }