1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.ssl;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.buffer.ByteBufUtil;
21 import io.netty.buffer.CompositeByteBuf;
22 import io.netty.buffer.Unpooled;
23 import io.netty.channel.Channel;
24 import io.netty.channel.ChannelConfig;
25 import io.netty.channel.ChannelException;
26 import io.netty.channel.ChannelFuture;
27 import io.netty.channel.ChannelFutureListener;
28 import io.netty.channel.ChannelHandlerContext;
29 import io.netty.channel.ChannelInboundHandler;
30 import io.netty.channel.ChannelOption;
31 import io.netty.channel.ChannelOutboundBuffer;
32 import io.netty.channel.ChannelOutboundHandler;
33 import io.netty.channel.ChannelPipeline;
34 import io.netty.channel.ChannelPromise;
35 import io.netty.channel.unix.UnixChannel;
36 import io.netty.handler.codec.ByteToMessageDecoder;
37 import io.netty.handler.codec.DecoderException;
38 import io.netty.handler.codec.UnsupportedMessageTypeException;
39 import io.netty.util.ReferenceCountUtil;
40 import io.netty.util.concurrent.DefaultPromise;
41 import io.netty.util.concurrent.EventExecutor;
42 import io.netty.util.concurrent.Future;
43 import io.netty.util.concurrent.FutureListener;
44 import io.netty.util.concurrent.ImmediateExecutor;
45 import io.netty.util.concurrent.Promise;
46 import io.netty.util.concurrent.PromiseNotifier;
47 import io.netty.util.internal.ObjectUtil;
48 import io.netty.util.internal.PlatformDependent;
49 import io.netty.util.internal.ThrowableUtil;
50 import io.netty.util.internal.UnstableApi;
51 import io.netty.util.internal.logging.InternalLogger;
52 import io.netty.util.internal.logging.InternalLoggerFactory;
53
54 import java.io.IOException;
55 import java.net.SocketAddress;
56 import java.nio.ByteBuffer;
57 import java.nio.channels.ClosedChannelException;
58 import java.nio.channels.DatagramChannel;
59 import java.nio.channels.SocketChannel;
60 import java.security.cert.CertificateException;
61 import java.util.List;
62 import java.util.concurrent.Executor;
63 import java.util.concurrent.RejectedExecutionException;
64 import java.util.concurrent.TimeUnit;
65 import java.util.regex.Pattern;
66 import javax.net.ssl.SSLEngine;
67 import javax.net.ssl.SSLEngineResult;
68 import javax.net.ssl.SSLEngineResult.HandshakeStatus;
69 import javax.net.ssl.SSLEngineResult.Status;
70 import javax.net.ssl.SSLException;
71 import javax.net.ssl.SSLHandshakeException;
72 import javax.net.ssl.SSLSession;
73
74 import static io.netty.handler.ssl.SslUtils.NOT_ENOUGH_DATA;
75 import static io.netty.handler.ssl.SslUtils.getEncryptedPacketLength;
76 import static io.netty.util.internal.ObjectUtil.checkNotNull;
77 import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169 public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundHandler {
170 private static final InternalLogger logger =
171 InternalLoggerFactory.getInstance(SslHandler.class);
172 private static final Pattern IGNORABLE_CLASS_IN_STACK = Pattern.compile(
173 "^.*(?:Socket|Datagram|Sctp|Udt)Channel.*$");
174 private static final Pattern IGNORABLE_ERROR_MESSAGE = Pattern.compile(
175 "^.*(?:connection.*(?:reset|closed|abort|broken)|broken.*pipe).*$", Pattern.CASE_INSENSITIVE);
176 private static final int STATE_SENT_FIRST_MESSAGE = 1;
177 private static final int STATE_FLUSHED_BEFORE_HANDSHAKE = 1 << 1;
178 private static final int STATE_READ_DURING_HANDSHAKE = 1 << 2;
179 private static final int STATE_HANDSHAKE_STARTED = 1 << 3;
180
181
182
183
184 private static final int STATE_NEEDS_FLUSH = 1 << 4;
185 private static final int STATE_OUTBOUND_CLOSED = 1 << 5;
186 private static final int STATE_CLOSE_NOTIFY = 1 << 6;
187 private static final int STATE_PROCESS_TASK = 1 << 7;
188
189
190
191
192 private static final int STATE_FIRE_CHANNEL_READ = 1 << 8;
193 private static final int STATE_UNWRAP_REENTRY = 1 << 9;
194
195
196
197
198
199 private static final int MAX_PLAINTEXT_LENGTH = 16 * 1024;
200
201 private enum SslEngineType {
202 TCNATIVE(true, COMPOSITE_CUMULATOR) {
203 @Override
204 SSLEngineResult unwrap(SslHandler handler, ByteBuf in, int len, ByteBuf out) throws SSLException {
205 int nioBufferCount = in.nioBufferCount();
206 int writerIndex = out.writerIndex();
207 final SSLEngineResult result;
208 if (nioBufferCount > 1) {
209
210
211
212
213
214 ReferenceCountedOpenSslEngine opensslEngine = (ReferenceCountedOpenSslEngine) handler.engine;
215 try {
216 handler.singleBuffer[0] = toByteBuffer(out, writerIndex, out.writableBytes());
217 result = opensslEngine.unwrap(in.nioBuffers(in.readerIndex(), len), handler.singleBuffer);
218 } finally {
219 handler.singleBuffer[0] = null;
220 }
221 } else {
222 result = handler.engine.unwrap(toByteBuffer(in, in.readerIndex(), len),
223 toByteBuffer(out, writerIndex, out.writableBytes()));
224 }
225 out.writerIndex(writerIndex + result.bytesProduced());
226 return result;
227 }
228
229 @Override
230 ByteBuf allocateWrapBuffer(SslHandler handler, ByteBufAllocator allocator,
231 int pendingBytes, int numComponents) {
232 return allocator.directBuffer(((ReferenceCountedOpenSslEngine) handler.engine)
233 .calculateOutNetBufSize(pendingBytes, numComponents));
234 }
235
236 @Override
237 int calculateRequiredOutBufSpace(SslHandler handler, int pendingBytes, int numComponents) {
238 return ((ReferenceCountedOpenSslEngine) handler.engine)
239 .calculateMaxLengthForWrap(pendingBytes, numComponents);
240 }
241
242 @Override
243 int calculatePendingData(SslHandler handler, int guess) {
244 int sslPending = ((ReferenceCountedOpenSslEngine) handler.engine).sslPending();
245 return sslPending > 0 ? sslPending : guess;
246 }
247
248 @Override
249 boolean jdkCompatibilityMode(SSLEngine engine) {
250 return ((ReferenceCountedOpenSslEngine) engine).jdkCompatibilityMode;
251 }
252 },
253 CONSCRYPT(true, COMPOSITE_CUMULATOR) {
254 @Override
255 SSLEngineResult unwrap(SslHandler handler, ByteBuf in, int len, ByteBuf out) throws SSLException {
256 int nioBufferCount = in.nioBufferCount();
257 int writerIndex = out.writerIndex();
258 final SSLEngineResult result;
259 if (nioBufferCount > 1) {
260
261
262
263 try {
264 handler.singleBuffer[0] = toByteBuffer(out, writerIndex, out.writableBytes());
265 result = ((ConscryptAlpnSslEngine) handler.engine).unwrap(
266 in.nioBuffers(in.readerIndex(), len),
267 handler.singleBuffer);
268 } finally {
269 handler.singleBuffer[0] = null;
270 }
271 } else {
272 result = handler.engine.unwrap(toByteBuffer(in, in.readerIndex(), len),
273 toByteBuffer(out, writerIndex, out.writableBytes()));
274 }
275 out.writerIndex(writerIndex + result.bytesProduced());
276 return result;
277 }
278
279 @Override
280 ByteBuf allocateWrapBuffer(SslHandler handler, ByteBufAllocator allocator,
281 int pendingBytes, int numComponents) {
282 return allocator.directBuffer(
283 ((ConscryptAlpnSslEngine) handler.engine).calculateOutNetBufSize(pendingBytes, numComponents));
284 }
285
286 @Override
287 int calculateRequiredOutBufSpace(SslHandler handler, int pendingBytes, int numComponents) {
288 return ((ConscryptAlpnSslEngine) handler.engine)
289 .calculateRequiredOutBufSpace(pendingBytes, numComponents);
290 }
291
292 @Override
293 int calculatePendingData(SslHandler handler, int guess) {
294 return guess;
295 }
296
297 @Override
298 boolean jdkCompatibilityMode(SSLEngine engine) {
299 return true;
300 }
301 },
302 JDK(false, MERGE_CUMULATOR) {
303 @Override
304 SSLEngineResult unwrap(SslHandler handler, ByteBuf in, int len, ByteBuf out) throws SSLException {
305 int writerIndex = out.writerIndex();
306 ByteBuffer inNioBuffer = toByteBuffer(in, in.readerIndex(), len);
307 int position = inNioBuffer.position();
308 final SSLEngineResult result = handler.engine.unwrap(inNioBuffer,
309 toByteBuffer(out, writerIndex, out.writableBytes()));
310 out.writerIndex(writerIndex + result.bytesProduced());
311
312
313
314
315
316
317
318 if (result.bytesConsumed() == 0) {
319 int consumed = inNioBuffer.position() - position;
320 if (consumed != result.bytesConsumed()) {
321
322 return new SSLEngineResult(
323 result.getStatus(), result.getHandshakeStatus(), consumed, result.bytesProduced());
324 }
325 }
326 return result;
327 }
328
329 @Override
330 ByteBuf allocateWrapBuffer(SslHandler handler, ByteBufAllocator allocator,
331 int pendingBytes, int numComponents) {
332
333
334 return allocator.heapBuffer(Math.max(pendingBytes, handler.engine.getSession().getPacketBufferSize()));
335 }
336
337 @Override
338 int calculateRequiredOutBufSpace(SslHandler handler, int pendingBytes, int numComponents) {
339
340
341
342
343
344
345 return handler.engine.getSession().getPacketBufferSize();
346 }
347
348 @Override
349 int calculatePendingData(SslHandler handler, int guess) {
350 return guess;
351 }
352
353 @Override
354 boolean jdkCompatibilityMode(SSLEngine engine) {
355 return true;
356 }
357 };
358
359 static SslEngineType forEngine(SSLEngine engine) {
360 return engine instanceof ReferenceCountedOpenSslEngine ? TCNATIVE :
361 engine instanceof ConscryptAlpnSslEngine ? CONSCRYPT : JDK;
362 }
363
364 SslEngineType(boolean wantsDirectBuffer, Cumulator cumulator) {
365 this.wantsDirectBuffer = wantsDirectBuffer;
366 this.cumulator = cumulator;
367 }
368
369 abstract SSLEngineResult unwrap(SslHandler handler, ByteBuf in, int len, ByteBuf out) throws SSLException;
370
371 abstract int calculatePendingData(SslHandler handler, int guess);
372
373 abstract boolean jdkCompatibilityMode(SSLEngine engine);
374
375 abstract ByteBuf allocateWrapBuffer(SslHandler handler, ByteBufAllocator allocator,
376 int pendingBytes, int numComponents);
377
378 abstract int calculateRequiredOutBufSpace(SslHandler handler, int pendingBytes, int numComponents);
379
380
381
382
383
384
385
386 final boolean wantsDirectBuffer;
387
388
389
390
391
392
393
394
395
396
397
398 final Cumulator cumulator;
399 }
400
401 private volatile ChannelHandlerContext ctx;
402 private final SSLEngine engine;
403 private final SslEngineType engineType;
404 private final Executor delegatedTaskExecutor;
405 private final boolean jdkCompatibilityMode;
406
407
408
409
410
411
412 private final ByteBuffer[] singleBuffer = new ByteBuffer[1];
413
414 private final boolean startTls;
415 private final ResumptionController resumptionController;
416
417 private final SslTasksRunner sslTaskRunnerForUnwrap = new SslTasksRunner(true);
418 private final SslTasksRunner sslTaskRunner = new SslTasksRunner(false);
419
420 private SslHandlerCoalescingBufferQueue pendingUnencryptedWrites;
421 private Promise<Channel> handshakePromise = new LazyChannelPromise();
422 private final LazyChannelPromise sslClosePromise = new LazyChannelPromise();
423
424 private int packetLength;
425 private short state;
426
427 private volatile long handshakeTimeoutMillis = 10000;
428 private volatile long closeNotifyFlushTimeoutMillis = 3000;
429 private volatile long closeNotifyReadTimeoutMillis;
430 volatile int wrapDataSize = MAX_PLAINTEXT_LENGTH;
431
432
433
434
435
436
437 public SslHandler(SSLEngine engine) {
438 this(engine, false);
439 }
440
441
442
443
444
445
446
447
448 public SslHandler(SSLEngine engine, boolean startTls) {
449 this(engine, startTls, ImmediateExecutor.INSTANCE);
450 }
451
452
453
454
455
456
457
458
459 public SslHandler(SSLEngine engine, Executor delegatedTaskExecutor) {
460 this(engine, false, delegatedTaskExecutor);
461 }
462
463
464
465
466
467
468
469
470
471
472 public SslHandler(SSLEngine engine, boolean startTls, Executor delegatedTaskExecutor) {
473 this(engine, startTls, delegatedTaskExecutor, null);
474 }
475
476 SslHandler(SSLEngine engine, boolean startTls, Executor delegatedTaskExecutor,
477 ResumptionController resumptionController) {
478 this.engine = ObjectUtil.checkNotNull(engine, "engine");
479 this.delegatedTaskExecutor = ObjectUtil.checkNotNull(delegatedTaskExecutor, "delegatedTaskExecutor");
480 engineType = SslEngineType.forEngine(engine);
481 this.startTls = startTls;
482 this.jdkCompatibilityMode = engineType.jdkCompatibilityMode(engine);
483 setCumulator(engineType.cumulator);
484 this.resumptionController = resumptionController;
485 }
486
487 public long getHandshakeTimeoutMillis() {
488 return handshakeTimeoutMillis;
489 }
490
491 public void setHandshakeTimeout(long handshakeTimeout, TimeUnit unit) {
492 checkNotNull(unit, "unit");
493 setHandshakeTimeoutMillis(unit.toMillis(handshakeTimeout));
494 }
495
496 public void setHandshakeTimeoutMillis(long handshakeTimeoutMillis) {
497 this.handshakeTimeoutMillis = checkPositiveOrZero(handshakeTimeoutMillis, "handshakeTimeoutMillis");
498 }
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520 @UnstableApi
521 public final void setWrapDataSize(int wrapDataSize) {
522 this.wrapDataSize = wrapDataSize;
523 }
524
525
526
527
528 @Deprecated
529 public long getCloseNotifyTimeoutMillis() {
530 return getCloseNotifyFlushTimeoutMillis();
531 }
532
533
534
535
536 @Deprecated
537 public void setCloseNotifyTimeout(long closeNotifyTimeout, TimeUnit unit) {
538 setCloseNotifyFlushTimeout(closeNotifyTimeout, unit);
539 }
540
541
542
543
544 @Deprecated
545 public void setCloseNotifyTimeoutMillis(long closeNotifyFlushTimeoutMillis) {
546 setCloseNotifyFlushTimeoutMillis(closeNotifyFlushTimeoutMillis);
547 }
548
549
550
551
552
553
554 public final long getCloseNotifyFlushTimeoutMillis() {
555 return closeNotifyFlushTimeoutMillis;
556 }
557
558
559
560
561
562
563 public final void setCloseNotifyFlushTimeout(long closeNotifyFlushTimeout, TimeUnit unit) {
564 setCloseNotifyFlushTimeoutMillis(unit.toMillis(closeNotifyFlushTimeout));
565 }
566
567
568
569
570 public final void setCloseNotifyFlushTimeoutMillis(long closeNotifyFlushTimeoutMillis) {
571 this.closeNotifyFlushTimeoutMillis = checkPositiveOrZero(closeNotifyFlushTimeoutMillis,
572 "closeNotifyFlushTimeoutMillis");
573 }
574
575
576
577
578
579
580 public final long getCloseNotifyReadTimeoutMillis() {
581 return closeNotifyReadTimeoutMillis;
582 }
583
584
585
586
587
588
589 public final void setCloseNotifyReadTimeout(long closeNotifyReadTimeout, TimeUnit unit) {
590 setCloseNotifyReadTimeoutMillis(unit.toMillis(closeNotifyReadTimeout));
591 }
592
593
594
595
596 public final void setCloseNotifyReadTimeoutMillis(long closeNotifyReadTimeoutMillis) {
597 this.closeNotifyReadTimeoutMillis = checkPositiveOrZero(closeNotifyReadTimeoutMillis,
598 "closeNotifyReadTimeoutMillis");
599 }
600
601
602
603
604 public SSLEngine engine() {
605 return engine;
606 }
607
608
609
610
611
612
613 public String applicationProtocol() {
614 SSLEngine engine = engine();
615 if (!(engine instanceof ApplicationProtocolAccessor)) {
616 return null;
617 }
618
619 return ((ApplicationProtocolAccessor) engine).getNegotiatedApplicationProtocol();
620 }
621
622
623
624
625
626
627
628 public Future<Channel> handshakeFuture() {
629 return handshakePromise;
630 }
631
632
633
634
635 @Deprecated
636 public ChannelFuture close() {
637 return closeOutbound();
638 }
639
640
641
642
643 @Deprecated
644 public ChannelFuture close(ChannelPromise promise) {
645 return closeOutbound(promise);
646 }
647
648
649
650
651
652
653
654 public ChannelFuture closeOutbound() {
655 return closeOutbound(ctx.newPromise());
656 }
657
658
659
660
661
662
663
664 public ChannelFuture closeOutbound(final ChannelPromise promise) {
665 final ChannelHandlerContext ctx = this.ctx;
666 if (ctx.executor().inEventLoop()) {
667 closeOutbound0(promise);
668 } else {
669 ctx.executor().execute(new Runnable() {
670 @Override
671 public void run() {
672 closeOutbound0(promise);
673 }
674 });
675 }
676 return promise;
677 }
678
679 private void closeOutbound0(ChannelPromise promise) {
680 setState(STATE_OUTBOUND_CLOSED);
681 engine.closeOutbound();
682 try {
683 flush(ctx, promise);
684 } catch (Exception e) {
685 if (!promise.tryFailure(e)) {
686 logger.warn("{} flush() raised a masked exception.", ctx.channel(), e);
687 }
688 }
689 }
690
691
692
693
694
695
696
697
698 public Future<Channel> sslCloseFuture() {
699 return sslClosePromise;
700 }
701
702 @Override
703 public void handlerRemoved0(ChannelHandlerContext ctx) throws Exception {
704 try {
705 if (pendingUnencryptedWrites != null && !pendingUnencryptedWrites.isEmpty()) {
706
707 pendingUnencryptedWrites.releaseAndFailAll(ctx,
708 new ChannelException("Pending write on removal of SslHandler"));
709 }
710 pendingUnencryptedWrites = null;
711
712 SSLException cause = null;
713
714
715
716
717 if (!handshakePromise.isDone()) {
718 cause = new SSLHandshakeException("SslHandler removed before handshake completed");
719 if (handshakePromise.tryFailure(cause)) {
720 ctx.fireUserEventTriggered(new SslHandshakeCompletionEvent(cause));
721 }
722 }
723 if (!sslClosePromise.isDone()) {
724 if (cause == null) {
725 cause = new SSLException("SslHandler removed before SSLEngine was closed");
726 }
727 notifyClosePromise(cause);
728 }
729 } finally {
730 ReferenceCountUtil.release(engine);
731 }
732 }
733
734 @Override
735 public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
736 ctx.bind(localAddress, promise);
737 }
738
739 @Override
740 public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress,
741 ChannelPromise promise) throws Exception {
742 ctx.connect(remoteAddress, localAddress, promise);
743 }
744
745 @Override
746 public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
747 ctx.deregister(promise);
748 }
749
750 @Override
751 public void disconnect(final ChannelHandlerContext ctx,
752 final ChannelPromise promise) throws Exception {
753 closeOutboundAndChannel(ctx, promise, true);
754 }
755
756 @Override
757 public void close(final ChannelHandlerContext ctx,
758 final ChannelPromise promise) throws Exception {
759 closeOutboundAndChannel(ctx, promise, false);
760 }
761
762 @Override
763 public void read(ChannelHandlerContext ctx) throws Exception {
764 if (!handshakePromise.isDone()) {
765 setState(STATE_READ_DURING_HANDSHAKE);
766 }
767
768 ctx.read();
769 }
770
771 private static IllegalStateException newPendingWritesNullException() {
772 return new IllegalStateException("pendingUnencryptedWrites is null, handlerRemoved0 called?");
773 }
774
775 @Override
776 public void write(final ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
777 if (!(msg instanceof ByteBuf)) {
778 UnsupportedMessageTypeException exception = new UnsupportedMessageTypeException(msg, ByteBuf.class);
779 ReferenceCountUtil.safeRelease(msg);
780 promise.setFailure(exception);
781 } else if (pendingUnencryptedWrites == null) {
782 ReferenceCountUtil.safeRelease(msg);
783 promise.setFailure(newPendingWritesNullException());
784 } else {
785 pendingUnencryptedWrites.add((ByteBuf) msg, promise);
786 }
787 }
788
789 @Override
790 public void flush(ChannelHandlerContext ctx) throws Exception {
791
792
793 if (startTls && !isStateSet(STATE_SENT_FIRST_MESSAGE)) {
794 setState(STATE_SENT_FIRST_MESSAGE);
795 pendingUnencryptedWrites.writeAndRemoveAll(ctx);
796 forceFlush(ctx);
797
798
799 startHandshakeProcessing(true);
800 return;
801 }
802
803 if (isStateSet(STATE_PROCESS_TASK)) {
804 return;
805 }
806
807 try {
808 wrapAndFlush(ctx);
809 } catch (Throwable cause) {
810 setHandshakeFailure(ctx, cause);
811 PlatformDependent.throwException(cause);
812 }
813 }
814
815 private void wrapAndFlush(ChannelHandlerContext ctx) throws SSLException {
816 if (pendingUnencryptedWrites.isEmpty()) {
817
818
819
820
821 pendingUnencryptedWrites.add(Unpooled.EMPTY_BUFFER, ctx.newPromise());
822 }
823 if (!handshakePromise.isDone()) {
824 setState(STATE_FLUSHED_BEFORE_HANDSHAKE);
825 }
826 try {
827 wrap(ctx, false);
828 } finally {
829
830
831 forceFlush(ctx);
832 }
833 }
834
835
836 private void wrap(ChannelHandlerContext ctx, boolean inUnwrap) throws SSLException {
837 ByteBuf out = null;
838 ByteBufAllocator alloc = ctx.alloc();
839 try {
840 final int wrapDataSize = this.wrapDataSize;
841
842
843 outer: while (!ctx.isRemoved()) {
844 ChannelPromise promise = ctx.newPromise();
845 ByteBuf buf = wrapDataSize > 0 ?
846 pendingUnencryptedWrites.remove(alloc, wrapDataSize, promise) :
847 pendingUnencryptedWrites.removeFirst(promise);
848 if (buf == null) {
849 break;
850 }
851
852 SSLEngineResult result;
853
854 if (buf.readableBytes() > MAX_PLAINTEXT_LENGTH) {
855
856
857
858
859
860 int readableBytes = buf.readableBytes();
861 int numPackets = readableBytes / MAX_PLAINTEXT_LENGTH;
862 if (readableBytes % MAX_PLAINTEXT_LENGTH != 0) {
863 numPackets += 1;
864 }
865
866 if (out == null) {
867 out = allocateOutNetBuf(ctx, readableBytes, buf.nioBufferCount() + numPackets);
868 }
869 result = wrapMultiple(alloc, engine, buf, out);
870 } else {
871 if (out == null) {
872 out = allocateOutNetBuf(ctx, buf.readableBytes(), buf.nioBufferCount());
873 }
874 result = wrap(alloc, engine, buf, out);
875 }
876
877 if (buf.isReadable()) {
878 pendingUnencryptedWrites.addFirst(buf, promise);
879
880
881 promise = null;
882 } else {
883 buf.release();
884 }
885
886
887
888 if (out.isReadable()) {
889 final ByteBuf b = out;
890 out = null;
891 if (promise != null) {
892 ctx.write(b, promise);
893 } else {
894 ctx.write(b);
895 }
896 } else if (promise != null) {
897 ctx.write(Unpooled.EMPTY_BUFFER, promise);
898 }
899
900
901 if (result.getStatus() == Status.CLOSED) {
902
903
904 if (!pendingUnencryptedWrites.isEmpty()) {
905
906
907 Throwable exception = handshakePromise.cause();
908 if (exception == null) {
909 exception = sslClosePromise.cause();
910 if (exception == null) {
911 exception = new SslClosedEngineException("SSLEngine closed already");
912 }
913 }
914 pendingUnencryptedWrites.releaseAndFailAll(ctx, exception);
915 }
916
917 return;
918 } else {
919 switch (result.getHandshakeStatus()) {
920 case NEED_TASK:
921 if (!runDelegatedTasks(inUnwrap)) {
922
923
924 break outer;
925 }
926 break;
927 case FINISHED:
928 case NOT_HANDSHAKING:
929 setHandshakeSuccess();
930 break;
931 case NEED_WRAP:
932
933
934
935
936 if (result.bytesProduced() > 0 && pendingUnencryptedWrites.isEmpty()) {
937 pendingUnencryptedWrites.add(Unpooled.EMPTY_BUFFER);
938 }
939 break;
940 case NEED_UNWRAP:
941
942
943 readIfNeeded(ctx);
944 return;
945 default:
946 throw new IllegalStateException(
947 "Unknown handshake status: " + result.getHandshakeStatus());
948 }
949 }
950 }
951 } finally {
952 if (out != null) {
953 out.release();
954 }
955 if (inUnwrap) {
956 setState(STATE_NEEDS_FLUSH);
957 }
958 }
959 }
960
961
962
963
964
965
966
967 private boolean wrapNonAppData(final ChannelHandlerContext ctx, boolean inUnwrap) throws SSLException {
968 ByteBuf out = null;
969 ByteBufAllocator alloc = ctx.alloc();
970 try {
971
972
973 outer: while (!ctx.isRemoved()) {
974 if (out == null) {
975
976
977
978 out = allocateOutNetBuf(ctx, 2048, 1);
979 }
980 SSLEngineResult result = wrap(alloc, engine, Unpooled.EMPTY_BUFFER, out);
981 if (result.bytesProduced() > 0) {
982 ctx.write(out).addListener(new ChannelFutureListener() {
983 @Override
984 public void operationComplete(ChannelFuture future) {
985 Throwable cause = future.cause();
986 if (cause != null) {
987 setHandshakeFailureTransportFailure(ctx, cause);
988 }
989 }
990 });
991 if (inUnwrap) {
992 setState(STATE_NEEDS_FLUSH);
993 }
994 out = null;
995 }
996
997 HandshakeStatus status = result.getHandshakeStatus();
998 switch (status) {
999 case FINISHED:
1000
1001
1002
1003
1004 if (setHandshakeSuccess() && inUnwrap && !pendingUnencryptedWrites.isEmpty()) {
1005 wrap(ctx, true);
1006 }
1007 return false;
1008 case NEED_TASK:
1009 if (!runDelegatedTasks(inUnwrap)) {
1010
1011
1012 break outer;
1013 }
1014 break;
1015 case NEED_UNWRAP:
1016 if (inUnwrap || unwrapNonAppData(ctx) <= 0) {
1017
1018
1019
1020 return false;
1021 }
1022 break;
1023 case NEED_WRAP:
1024 break;
1025 case NOT_HANDSHAKING:
1026 if (setHandshakeSuccess() && inUnwrap && !pendingUnencryptedWrites.isEmpty()) {
1027 wrap(ctx, true);
1028 }
1029
1030
1031 if (!inUnwrap) {
1032 unwrapNonAppData(ctx);
1033 }
1034 return true;
1035 default:
1036 throw new IllegalStateException("Unknown handshake status: " + result.getHandshakeStatus());
1037 }
1038
1039
1040
1041 if (result.bytesProduced() == 0 && status != HandshakeStatus.NEED_TASK) {
1042 break;
1043 }
1044
1045
1046
1047 if (result.bytesConsumed() == 0 && result.getHandshakeStatus() == HandshakeStatus.NOT_HANDSHAKING) {
1048 break;
1049 }
1050 }
1051 } finally {
1052 if (out != null) {
1053 out.release();
1054 }
1055 }
1056 return false;
1057 }
1058
1059 private SSLEngineResult wrapMultiple(ByteBufAllocator alloc, SSLEngine engine, ByteBuf in, ByteBuf out)
1060 throws SSLException {
1061 SSLEngineResult result = null;
1062
1063 do {
1064 int nextSliceSize = Math.min(MAX_PLAINTEXT_LENGTH, in.readableBytes());
1065
1066
1067 int nextOutSize = engineType.calculateRequiredOutBufSpace(this, nextSliceSize, in.nioBufferCount());
1068
1069 if (!out.isWritable(nextOutSize)) {
1070 if (result != null) {
1071
1072
1073 break;
1074 }
1075
1076
1077 out.ensureWritable(nextOutSize);
1078 }
1079
1080 ByteBuf wrapBuf = in.readSlice(nextSliceSize);
1081 result = wrap(alloc, engine, wrapBuf, out);
1082
1083 if (result.getStatus() == Status.CLOSED) {
1084
1085
1086 break;
1087 }
1088
1089 if (wrapBuf.isReadable()) {
1090
1091
1092 in.readerIndex(in.readerIndex() - wrapBuf.readableBytes());
1093 }
1094 } while (in.readableBytes() > 0);
1095
1096 return result;
1097 }
1098
1099 private SSLEngineResult wrap(ByteBufAllocator alloc, SSLEngine engine, ByteBuf in, ByteBuf out)
1100 throws SSLException {
1101 ByteBuf newDirectIn = null;
1102 try {
1103 int readerIndex = in.readerIndex();
1104 int readableBytes = in.readableBytes();
1105
1106
1107
1108 final ByteBuffer[] in0;
1109 if (in.isDirect() || !engineType.wantsDirectBuffer) {
1110
1111
1112
1113
1114 if (!(in instanceof CompositeByteBuf) && in.nioBufferCount() == 1) {
1115 in0 = singleBuffer;
1116
1117
1118 in0[0] = in.internalNioBuffer(readerIndex, readableBytes);
1119 } else {
1120 in0 = in.nioBuffers();
1121 }
1122 } else {
1123
1124
1125
1126 newDirectIn = alloc.directBuffer(readableBytes);
1127 newDirectIn.writeBytes(in, readerIndex, readableBytes);
1128 in0 = singleBuffer;
1129 in0[0] = newDirectIn.internalNioBuffer(newDirectIn.readerIndex(), readableBytes);
1130 }
1131
1132 for (;;) {
1133
1134
1135 ByteBuffer out0 = toByteBuffer(out, out.writerIndex(), out.writableBytes());
1136 SSLEngineResult result = engine.wrap(in0, out0);
1137 in.skipBytes(result.bytesConsumed());
1138 out.writerIndex(out.writerIndex() + result.bytesProduced());
1139
1140 if (result.getStatus() == Status.BUFFER_OVERFLOW) {
1141 out.ensureWritable(engine.getSession().getPacketBufferSize());
1142 } else {
1143 return result;
1144 }
1145 }
1146 } finally {
1147
1148 singleBuffer[0] = null;
1149
1150 if (newDirectIn != null) {
1151 newDirectIn.release();
1152 }
1153 }
1154 }
1155
1156 @Override
1157 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
1158 boolean handshakeFailed = handshakePromise.cause() != null;
1159
1160
1161 ClosedChannelException exception = new ClosedChannelException();
1162
1163
1164 if (isStateSet(STATE_HANDSHAKE_STARTED) && !handshakePromise.isDone()) {
1165 ThrowableUtil.addSuppressed(exception, StacklessSSLHandshakeException.newInstance(
1166 "Connection closed while SSL/TLS handshake was in progress",
1167 SslHandler.class, "channelInactive"));
1168 }
1169
1170
1171
1172 setHandshakeFailure(ctx, exception, !isStateSet(STATE_OUTBOUND_CLOSED), isStateSet(STATE_HANDSHAKE_STARTED),
1173 false);
1174
1175
1176 notifyClosePromise(exception);
1177
1178 try {
1179 super.channelInactive(ctx);
1180 } catch (DecoderException e) {
1181 if (!handshakeFailed || !(e.getCause() instanceof SSLException)) {
1182
1183
1184
1185
1186
1187 throw e;
1188 }
1189 }
1190 }
1191
1192 @Override
1193 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
1194 if (ignoreException(cause)) {
1195
1196
1197 if (logger.isDebugEnabled()) {
1198 logger.debug(
1199 "{} Swallowing a harmless 'connection reset by peer / broken pipe' error that occurred " +
1200 "while writing close_notify in response to the peer's close_notify", ctx.channel(), cause);
1201 }
1202
1203
1204
1205 if (ctx.channel().isActive()) {
1206 ctx.close();
1207 }
1208 } else {
1209 ctx.fireExceptionCaught(cause);
1210 }
1211 }
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222 private boolean ignoreException(Throwable t) {
1223 if (!(t instanceof SSLException) && t instanceof IOException && sslClosePromise.isDone()) {
1224 String message = t.getMessage();
1225
1226
1227
1228 if (message != null && IGNORABLE_ERROR_MESSAGE.matcher(message).matches()) {
1229 return true;
1230 }
1231
1232
1233 StackTraceElement[] elements = t.getStackTrace();
1234 for (StackTraceElement element: elements) {
1235 String classname = element.getClassName();
1236 String methodname = element.getMethodName();
1237
1238
1239 if (classname.startsWith("io.netty.")) {
1240 continue;
1241 }
1242
1243
1244 if (!"read".equals(methodname)) {
1245 continue;
1246 }
1247
1248
1249
1250 if (IGNORABLE_CLASS_IN_STACK.matcher(classname).matches()) {
1251 return true;
1252 }
1253
1254 try {
1255
1256
1257
1258 Class<?> clazz = PlatformDependent.getClassLoader(getClass()).loadClass(classname);
1259
1260 if (SocketChannel.class.isAssignableFrom(clazz)
1261 || DatagramChannel.class.isAssignableFrom(clazz)) {
1262 return true;
1263 }
1264
1265
1266 if ("com.sun.nio.sctp.SctpChannel".equals(clazz.getSuperclass().getName())) {
1267 return true;
1268 }
1269 } catch (Throwable cause) {
1270 if (logger.isDebugEnabled()) {
1271 logger.debug("Unexpected exception while loading class {} classname {}",
1272 getClass(), classname, cause);
1273 }
1274 }
1275 }
1276 }
1277
1278 return false;
1279 }
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294 @Deprecated
1295 public static boolean isEncrypted(ByteBuf buffer) {
1296 return isEncrypted(buffer, false);
1297 }
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315 public static boolean isEncrypted(ByteBuf buffer, boolean probeSSLv2) {
1316 if (buffer.readableBytes() < SslUtils.SSL_RECORD_HEADER_LENGTH) {
1317 throw new IllegalArgumentException(
1318 "buffer must have at least " + SslUtils.SSL_RECORD_HEADER_LENGTH + " readable bytes");
1319 }
1320 return getEncryptedPacketLength(buffer, buffer.readerIndex(), probeSSLv2) != SslUtils.NOT_ENCRYPTED;
1321 }
1322
1323 private void decodeJdkCompatible(ChannelHandlerContext ctx, ByteBuf in) throws NotSslRecordException {
1324 int packetLength = this.packetLength;
1325
1326 if (packetLength > 0) {
1327 if (in.readableBytes() < packetLength) {
1328 return;
1329 }
1330 } else {
1331
1332 final int readableBytes = in.readableBytes();
1333 if (readableBytes < SslUtils.SSL_RECORD_HEADER_LENGTH) {
1334 return;
1335 }
1336 packetLength = getEncryptedPacketLength(in, in.readerIndex(), true);
1337 if (packetLength == SslUtils.NOT_ENCRYPTED) {
1338
1339 NotSslRecordException e = new NotSslRecordException(
1340 "not an SSL/TLS record: " + ByteBufUtil.hexDump(in));
1341 in.skipBytes(in.readableBytes());
1342
1343
1344
1345 setHandshakeFailure(ctx, e);
1346
1347 throw e;
1348 }
1349 if (packetLength == NOT_ENOUGH_DATA) {
1350 return;
1351 }
1352 assert packetLength > 0;
1353 if (packetLength > readableBytes) {
1354
1355 this.packetLength = packetLength;
1356 return;
1357 }
1358 }
1359
1360
1361
1362 this.packetLength = 0;
1363 try {
1364 final int bytesConsumed = unwrap(ctx, in, packetLength);
1365 assert bytesConsumed == packetLength || engine.isInboundDone() :
1366 "we feed the SSLEngine a packets worth of data: " + packetLength + " but it only consumed: " +
1367 bytesConsumed;
1368 } catch (Throwable cause) {
1369 handleUnwrapThrowable(ctx, cause);
1370 }
1371 }
1372
1373 private void decodeNonJdkCompatible(ChannelHandlerContext ctx, ByteBuf in) {
1374 try {
1375 unwrap(ctx, in, in.readableBytes());
1376 } catch (Throwable cause) {
1377 handleUnwrapThrowable(ctx, cause);
1378 }
1379 }
1380
1381 private void handleUnwrapThrowable(ChannelHandlerContext ctx, Throwable cause) {
1382 try {
1383
1384
1385
1386
1387 if (handshakePromise.tryFailure(cause)) {
1388 ctx.fireUserEventTriggered(new SslHandshakeCompletionEvent(cause));
1389 }
1390
1391
1392 if (pendingUnencryptedWrites != null) {
1393
1394
1395 wrapAndFlush(ctx);
1396 }
1397 } catch (SSLException ex) {
1398 logger.debug("SSLException during trying to call SSLEngine.wrap(...)" +
1399 " because of an previous SSLException, ignoring...", ex);
1400 } finally {
1401
1402 setHandshakeFailure(ctx, cause, true, false, true);
1403 }
1404 PlatformDependent.throwException(cause);
1405 }
1406
1407 @Override
1408 protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws SSLException {
1409 if (isStateSet(STATE_PROCESS_TASK)) {
1410 return;
1411 }
1412 if (jdkCompatibilityMode) {
1413 decodeJdkCompatible(ctx, in);
1414 } else {
1415 decodeNonJdkCompatible(ctx, in);
1416 }
1417 }
1418
1419 @Override
1420 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
1421 channelReadComplete0(ctx);
1422 }
1423
1424 private void channelReadComplete0(ChannelHandlerContext ctx) {
1425
1426 discardSomeReadBytes();
1427
1428 flushIfNeeded(ctx);
1429 readIfNeeded(ctx);
1430
1431 clearState(STATE_FIRE_CHANNEL_READ);
1432 ctx.fireChannelReadComplete();
1433 }
1434
1435 private void readIfNeeded(ChannelHandlerContext ctx) {
1436
1437 if (!ctx.channel().config().isAutoRead() &&
1438 (!isStateSet(STATE_FIRE_CHANNEL_READ) || !handshakePromise.isDone())) {
1439
1440
1441 ctx.read();
1442 }
1443 }
1444
1445 private void flushIfNeeded(ChannelHandlerContext ctx) {
1446 if (isStateSet(STATE_NEEDS_FLUSH)) {
1447 forceFlush(ctx);
1448 }
1449 }
1450
1451
1452
1453
1454 private int unwrapNonAppData(ChannelHandlerContext ctx) throws SSLException {
1455 return unwrap(ctx, Unpooled.EMPTY_BUFFER, 0);
1456 }
1457
1458
1459
1460
1461 private int unwrap(ChannelHandlerContext ctx, ByteBuf packet, int length) throws SSLException {
1462 final int originalLength = length;
1463 boolean wrapLater = false;
1464 boolean notifyClosure = false;
1465 boolean executedRead = false;
1466 ByteBuf decodeOut = allocate(ctx, length);
1467 try {
1468
1469
1470 do {
1471 final SSLEngineResult result = engineType.unwrap(this, packet, length, decodeOut);
1472 final Status status = result.getStatus();
1473 final HandshakeStatus handshakeStatus = result.getHandshakeStatus();
1474 final int produced = result.bytesProduced();
1475 final int consumed = result.bytesConsumed();
1476
1477
1478
1479
1480 packet.skipBytes(consumed);
1481 length -= consumed;
1482
1483
1484
1485
1486 if (handshakeStatus == HandshakeStatus.FINISHED || handshakeStatus == HandshakeStatus.NOT_HANDSHAKING) {
1487 wrapLater |= (decodeOut.isReadable() ?
1488 setHandshakeSuccessUnwrapMarkReentry() : setHandshakeSuccess()) ||
1489 handshakeStatus == HandshakeStatus.FINISHED || !pendingUnencryptedWrites.isEmpty();
1490 }
1491
1492
1493
1494
1495 if (decodeOut.isReadable()) {
1496 setState(STATE_FIRE_CHANNEL_READ);
1497 if (isStateSet(STATE_UNWRAP_REENTRY)) {
1498 executedRead = true;
1499 executeChannelRead(ctx, decodeOut);
1500 } else {
1501 ctx.fireChannelRead(decodeOut);
1502 }
1503 decodeOut = null;
1504 }
1505
1506 if (status == Status.CLOSED) {
1507 notifyClosure = true;
1508 } else if (status == Status.BUFFER_OVERFLOW) {
1509 if (decodeOut != null) {
1510 decodeOut.release();
1511 }
1512 final int applicationBufferSize = engine.getSession().getApplicationBufferSize();
1513
1514
1515
1516
1517 decodeOut = allocate(ctx, engineType.calculatePendingData(this, applicationBufferSize < produced ?
1518 applicationBufferSize : applicationBufferSize - produced));
1519 continue;
1520 }
1521
1522 if (handshakeStatus == HandshakeStatus.NEED_TASK) {
1523 boolean pending = runDelegatedTasks(true);
1524 if (!pending) {
1525
1526
1527
1528
1529
1530 wrapLater = false;
1531 break;
1532 }
1533 } else if (handshakeStatus == HandshakeStatus.NEED_WRAP) {
1534
1535
1536
1537 if (wrapNonAppData(ctx, true) && length == 0) {
1538 break;
1539 }
1540 }
1541
1542 if (status == Status.BUFFER_UNDERFLOW ||
1543
1544 handshakeStatus != HandshakeStatus.NEED_TASK && (consumed == 0 && produced == 0 ||
1545 (length == 0 && handshakeStatus == HandshakeStatus.NOT_HANDSHAKING))) {
1546 if (handshakeStatus == HandshakeStatus.NEED_UNWRAP) {
1547
1548
1549 readIfNeeded(ctx);
1550 }
1551
1552 break;
1553 } else if (decodeOut == null) {
1554 decodeOut = allocate(ctx, length);
1555 }
1556 } while (!ctx.isRemoved());
1557
1558 if (isStateSet(STATE_FLUSHED_BEFORE_HANDSHAKE) && handshakePromise.isDone()) {
1559
1560
1561
1562
1563 clearState(STATE_FLUSHED_BEFORE_HANDSHAKE);
1564 wrapLater = true;
1565 }
1566
1567 if (wrapLater) {
1568 wrap(ctx, true);
1569 }
1570 } finally {
1571 if (decodeOut != null) {
1572 decodeOut.release();
1573 }
1574
1575 if (notifyClosure) {
1576 if (executedRead) {
1577 executeNotifyClosePromise(ctx);
1578 } else {
1579 notifyClosePromise(null);
1580 }
1581 }
1582 }
1583 return originalLength - length;
1584 }
1585
1586 private boolean setHandshakeSuccessUnwrapMarkReentry() throws SSLException {
1587
1588
1589 final boolean setReentryState = !isStateSet(STATE_UNWRAP_REENTRY);
1590 if (setReentryState) {
1591 setState(STATE_UNWRAP_REENTRY);
1592 }
1593 try {
1594 return setHandshakeSuccess();
1595 } finally {
1596
1597
1598 if (setReentryState) {
1599 clearState(STATE_UNWRAP_REENTRY);
1600 }
1601 }
1602 }
1603
1604 private void executeNotifyClosePromise(final ChannelHandlerContext ctx) {
1605 try {
1606 ctx.executor().execute(new Runnable() {
1607 @Override
1608 public void run() {
1609 notifyClosePromise(null);
1610 }
1611 });
1612 } catch (RejectedExecutionException e) {
1613 notifyClosePromise(e);
1614 }
1615 }
1616
1617 private void executeChannelRead(final ChannelHandlerContext ctx, final ByteBuf decodedOut) {
1618 try {
1619 ctx.executor().execute(new Runnable() {
1620 @Override
1621 public void run() {
1622 ctx.fireChannelRead(decodedOut);
1623 }
1624 });
1625 } catch (RejectedExecutionException e) {
1626 decodedOut.release();
1627 throw e;
1628 }
1629 }
1630
1631 private static ByteBuffer toByteBuffer(ByteBuf out, int index, int len) {
1632 return out.nioBufferCount() == 1 ? out.internalNioBuffer(index, len) :
1633 out.nioBuffer(index, len);
1634 }
1635
1636 private static boolean inEventLoop(Executor executor) {
1637 return executor instanceof EventExecutor && ((EventExecutor) executor).inEventLoop();
1638 }
1639
1640
1641
1642
1643
1644
1645
1646
1647 private boolean runDelegatedTasks(boolean inUnwrap) {
1648 if (delegatedTaskExecutor == ImmediateExecutor.INSTANCE || inEventLoop(delegatedTaskExecutor)) {
1649
1650
1651 for (;;) {
1652 Runnable task = engine.getDelegatedTask();
1653 if (task == null) {
1654 return true;
1655 }
1656 setState(STATE_PROCESS_TASK);
1657 if (task instanceof AsyncRunnable) {
1658
1659 boolean pending = false;
1660 try {
1661 AsyncRunnable asyncTask = (AsyncRunnable) task;
1662 AsyncTaskCompletionHandler completionHandler = new AsyncTaskCompletionHandler(inUnwrap);
1663 asyncTask.run(completionHandler);
1664 pending = completionHandler.resumeLater();
1665 if (pending) {
1666 return false;
1667 }
1668 } finally {
1669 if (!pending) {
1670
1671
1672 clearState(STATE_PROCESS_TASK);
1673 }
1674 }
1675 } else {
1676 try {
1677 task.run();
1678 } finally {
1679 clearState(STATE_PROCESS_TASK);
1680 }
1681 }
1682 }
1683 } else {
1684 executeDelegatedTask(inUnwrap);
1685 return false;
1686 }
1687 }
1688
1689 private SslTasksRunner getTaskRunner(boolean inUnwrap) {
1690 return inUnwrap ? sslTaskRunnerForUnwrap : sslTaskRunner;
1691 }
1692
1693 private void executeDelegatedTask(boolean inUnwrap) {
1694 executeDelegatedTask(getTaskRunner(inUnwrap));
1695 }
1696
1697 private void executeDelegatedTask(SslTasksRunner task) {
1698 setState(STATE_PROCESS_TASK);
1699 try {
1700 delegatedTaskExecutor.execute(task);
1701 } catch (RejectedExecutionException e) {
1702 clearState(STATE_PROCESS_TASK);
1703 throw e;
1704 }
1705 }
1706
1707 private final class AsyncTaskCompletionHandler implements Runnable {
1708 private final boolean inUnwrap;
1709 boolean didRun;
1710 boolean resumeLater;
1711
1712 AsyncTaskCompletionHandler(boolean inUnwrap) {
1713 this.inUnwrap = inUnwrap;
1714 }
1715
1716 @Override
1717 public void run() {
1718 didRun = true;
1719 if (resumeLater) {
1720 getTaskRunner(inUnwrap).runComplete();
1721 }
1722 }
1723
1724 boolean resumeLater() {
1725 if (!didRun) {
1726 resumeLater = true;
1727 return true;
1728 }
1729 return false;
1730 }
1731 }
1732
1733
1734
1735
1736
1737 private final class SslTasksRunner implements Runnable {
1738 private final boolean inUnwrap;
1739 private final Runnable runCompleteTask = new Runnable() {
1740 @Override
1741 public void run() {
1742 runComplete();
1743 }
1744 };
1745
1746 SslTasksRunner(boolean inUnwrap) {
1747 this.inUnwrap = inUnwrap;
1748 }
1749
1750
1751 private void taskError(Throwable e) {
1752 if (inUnwrap) {
1753
1754
1755
1756
1757 try {
1758 handleUnwrapThrowable(ctx, e);
1759 } catch (Throwable cause) {
1760 safeExceptionCaught(cause);
1761 }
1762 } else {
1763 setHandshakeFailure(ctx, e);
1764 forceFlush(ctx);
1765 }
1766 }
1767
1768
1769 private void safeExceptionCaught(Throwable cause) {
1770 try {
1771 exceptionCaught(ctx, wrapIfNeeded(cause));
1772 } catch (Throwable error) {
1773 ctx.fireExceptionCaught(error);
1774 }
1775 }
1776
1777 private Throwable wrapIfNeeded(Throwable cause) {
1778 if (!inUnwrap) {
1779
1780 return cause;
1781 }
1782
1783
1784 return cause instanceof DecoderException ? cause : new DecoderException(cause);
1785 }
1786
1787 private void tryDecodeAgain() {
1788 try {
1789 channelRead(ctx, Unpooled.EMPTY_BUFFER);
1790 } catch (Throwable cause) {
1791 safeExceptionCaught(cause);
1792 } finally {
1793
1794
1795
1796 channelReadComplete0(ctx);
1797 }
1798 }
1799
1800
1801
1802
1803
1804 private void resumeOnEventExecutor() {
1805 assert ctx.executor().inEventLoop();
1806 clearState(STATE_PROCESS_TASK);
1807 try {
1808 HandshakeStatus status = engine.getHandshakeStatus();
1809 switch (status) {
1810
1811
1812 case NEED_TASK:
1813 executeDelegatedTask(this);
1814
1815 break;
1816
1817
1818 case FINISHED:
1819
1820 case NOT_HANDSHAKING:
1821 setHandshakeSuccess();
1822 try {
1823
1824
1825 wrap(ctx, inUnwrap);
1826 } catch (Throwable e) {
1827 taskError(e);
1828 return;
1829 }
1830 if (inUnwrap) {
1831
1832
1833 unwrapNonAppData(ctx);
1834 }
1835
1836
1837 forceFlush(ctx);
1838
1839 tryDecodeAgain();
1840 break;
1841
1842
1843
1844 case NEED_UNWRAP:
1845 try {
1846 unwrapNonAppData(ctx);
1847 } catch (SSLException e) {
1848 handleUnwrapThrowable(ctx, e);
1849 return;
1850 }
1851 tryDecodeAgain();
1852 break;
1853
1854
1855
1856 case NEED_WRAP:
1857 try {
1858 if (!wrapNonAppData(ctx, false) && inUnwrap) {
1859
1860
1861
1862
1863 unwrapNonAppData(ctx);
1864 }
1865
1866
1867 forceFlush(ctx);
1868 } catch (Throwable e) {
1869 taskError(e);
1870 return;
1871 }
1872
1873
1874 tryDecodeAgain();
1875 break;
1876
1877 default:
1878
1879 throw new AssertionError();
1880 }
1881 } catch (Throwable cause) {
1882 safeExceptionCaught(cause);
1883 }
1884 }
1885
1886 void runComplete() {
1887 EventExecutor executor = ctx.executor();
1888
1889
1890
1891
1892
1893
1894
1895 executor.execute(new Runnable() {
1896 @Override
1897 public void run() {
1898 resumeOnEventExecutor();
1899 }
1900 });
1901 }
1902
1903 @Override
1904 public void run() {
1905 try {
1906 Runnable task = engine.getDelegatedTask();
1907 if (task == null) {
1908
1909 return;
1910 }
1911 if (task instanceof AsyncRunnable) {
1912 AsyncRunnable asyncTask = (AsyncRunnable) task;
1913 asyncTask.run(runCompleteTask);
1914 } else {
1915 task.run();
1916 runComplete();
1917 }
1918 } catch (final Throwable cause) {
1919 handleException(cause);
1920 }
1921 }
1922
1923 private void handleException(final Throwable cause) {
1924 EventExecutor executor = ctx.executor();
1925 if (executor.inEventLoop()) {
1926 clearState(STATE_PROCESS_TASK);
1927 safeExceptionCaught(cause);
1928 } else {
1929 try {
1930 executor.execute(new Runnable() {
1931 @Override
1932 public void run() {
1933 clearState(STATE_PROCESS_TASK);
1934 safeExceptionCaught(cause);
1935 }
1936 });
1937 } catch (RejectedExecutionException ignore) {
1938 clearState(STATE_PROCESS_TASK);
1939
1940
1941 ctx.fireExceptionCaught(cause);
1942 }
1943 }
1944 }
1945 }
1946
1947
1948
1949
1950
1951
1952 private boolean setHandshakeSuccess() throws SSLException {
1953
1954
1955
1956 final SSLSession session = engine.getSession();
1957 if (resumptionController != null && !handshakePromise.isDone()) {
1958 try {
1959 if (resumptionController.validateResumeIfNeeded(engine) && logger.isDebugEnabled()) {
1960 logger.debug("{} Resumed and reauthenticated session", ctx.channel());
1961 }
1962 } catch (CertificateException e) {
1963 SSLHandshakeException exception = new SSLHandshakeException(e.getMessage());
1964 exception.initCause(e);
1965 throw exception;
1966 }
1967 }
1968 final boolean notified;
1969 if (notified = !handshakePromise.isDone() && handshakePromise.trySuccess(ctx.channel())) {
1970 if (logger.isDebugEnabled()) {
1971 logger.debug(
1972 "{} HANDSHAKEN: protocol:{} cipher suite:{}",
1973 ctx.channel(),
1974 session.getProtocol(),
1975 session.getCipherSuite());
1976 }
1977 ctx.fireUserEventTriggered(SslHandshakeCompletionEvent.SUCCESS);
1978 }
1979 if (isStateSet(STATE_READ_DURING_HANDSHAKE)) {
1980 clearState(STATE_READ_DURING_HANDSHAKE);
1981 if (!ctx.channel().config().isAutoRead()) {
1982 ctx.read();
1983 }
1984 }
1985 return notified;
1986 }
1987
1988
1989
1990
1991 private void setHandshakeFailure(ChannelHandlerContext ctx, Throwable cause) {
1992 setHandshakeFailure(ctx, cause, true, true, false);
1993 }
1994
1995
1996
1997
1998 private void setHandshakeFailure(ChannelHandlerContext ctx, Throwable cause, boolean closeInbound,
1999 boolean notify, boolean alwaysFlushAndClose) {
2000 try {
2001
2002 setState(STATE_OUTBOUND_CLOSED);
2003 engine.closeOutbound();
2004
2005 if (closeInbound) {
2006 try {
2007 engine.closeInbound();
2008 } catch (SSLException e) {
2009 if (logger.isDebugEnabled()) {
2010
2011
2012
2013
2014 String msg = e.getMessage();
2015 if (msg == null || !(msg.contains("possible truncation attack") ||
2016 msg.contains("closing inbound before receiving peer's close_notify"))) {
2017 logger.debug("{} SSLEngine.closeInbound() raised an exception.", ctx.channel(), e);
2018 }
2019 }
2020 }
2021 }
2022 if (handshakePromise.tryFailure(cause) || alwaysFlushAndClose) {
2023 SslUtils.handleHandshakeFailure(ctx, cause, notify);
2024 }
2025 } finally {
2026
2027 releaseAndFailAll(ctx, cause);
2028 }
2029 }
2030
2031 private void setHandshakeFailureTransportFailure(ChannelHandlerContext ctx, Throwable cause) {
2032
2033
2034
2035 try {
2036 SSLException transportFailure = new SSLException("failure when writing TLS control frames", cause);
2037 releaseAndFailAll(ctx, transportFailure);
2038 if (handshakePromise.tryFailure(transportFailure)) {
2039 ctx.fireUserEventTriggered(new SslHandshakeCompletionEvent(transportFailure));
2040 }
2041 } finally {
2042 ctx.close();
2043 }
2044 }
2045
2046 private void releaseAndFailAll(ChannelHandlerContext ctx, Throwable cause) {
2047 if (resumptionController != null &&
2048 (!engine.getSession().isValid() || cause instanceof SSLHandshakeException)) {
2049 resumptionController.remove(engine());
2050 }
2051 if (pendingUnencryptedWrites != null) {
2052 pendingUnencryptedWrites.releaseAndFailAll(ctx, cause);
2053 }
2054 }
2055
2056 private void notifyClosePromise(Throwable cause) {
2057 if (cause == null) {
2058 if (sslClosePromise.trySuccess(ctx.channel())) {
2059 ctx.fireUserEventTriggered(SslCloseCompletionEvent.SUCCESS);
2060 }
2061 } else {
2062 if (sslClosePromise.tryFailure(cause)) {
2063 ctx.fireUserEventTriggered(new SslCloseCompletionEvent(cause));
2064 }
2065 }
2066 }
2067
2068 private void closeOutboundAndChannel(
2069 final ChannelHandlerContext ctx, final ChannelPromise promise, boolean disconnect) throws Exception {
2070 setState(STATE_OUTBOUND_CLOSED);
2071 engine.closeOutbound();
2072
2073 if (!ctx.channel().isActive()) {
2074 if (disconnect) {
2075 ctx.disconnect(promise);
2076 } else {
2077 ctx.close(promise);
2078 }
2079 return;
2080 }
2081
2082 ChannelPromise closeNotifyPromise = ctx.newPromise();
2083 try {
2084 flush(ctx, closeNotifyPromise);
2085 } finally {
2086 if (!isStateSet(STATE_CLOSE_NOTIFY)) {
2087 setState(STATE_CLOSE_NOTIFY);
2088
2089
2090
2091
2092
2093
2094
2095
2096 safeClose(ctx, closeNotifyPromise, PromiseNotifier.cascade(false, ctx.newPromise(), promise));
2097 } else {
2098
2099 sslClosePromise.addListener(new FutureListener<Channel>() {
2100 @Override
2101 public void operationComplete(Future<Channel> future) {
2102 promise.setSuccess();
2103 }
2104 });
2105 }
2106 }
2107 }
2108
2109 private void flush(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
2110 if (pendingUnencryptedWrites != null) {
2111 pendingUnencryptedWrites.add(Unpooled.EMPTY_BUFFER, promise);
2112 } else {
2113 promise.setFailure(newPendingWritesNullException());
2114 }
2115 flush(ctx);
2116 }
2117
2118 @Override
2119 public void handlerAdded(final ChannelHandlerContext ctx) throws Exception {
2120 this.ctx = ctx;
2121 Channel channel = ctx.channel();
2122 pendingUnencryptedWrites = new SslHandlerCoalescingBufferQueue(channel, 16, engineType.wantsDirectBuffer) {
2123 @Override
2124 protected int wrapDataSize() {
2125 return SslHandler.this.wrapDataSize;
2126 }
2127 };
2128
2129 setOpensslEngineSocketFd(channel);
2130 boolean fastOpen = Boolean.TRUE.equals(channel.config().getOption(ChannelOption.TCP_FASTOPEN_CONNECT));
2131 boolean active = channel.isActive();
2132 if (active || fastOpen) {
2133
2134
2135
2136 startHandshakeProcessing(active);
2137
2138
2139 final ChannelOutboundBuffer outboundBuffer;
2140 if (fastOpen && ((outboundBuffer = channel.unsafe().outboundBuffer()) == null ||
2141 outboundBuffer.totalPendingWriteBytes() > 0)) {
2142 setState(STATE_NEEDS_FLUSH);
2143 }
2144 }
2145 }
2146
2147 private void startHandshakeProcessing(boolean flushAtEnd) {
2148 if (!isStateSet(STATE_HANDSHAKE_STARTED)) {
2149 setState(STATE_HANDSHAKE_STARTED);
2150 if (engine.getUseClientMode()) {
2151
2152
2153
2154 handshake(flushAtEnd);
2155 }
2156 applyHandshakeTimeout();
2157 } else if (isStateSet(STATE_NEEDS_FLUSH)) {
2158 forceFlush(ctx);
2159 }
2160 }
2161
2162
2163
2164
2165 public Future<Channel> renegotiate() {
2166 ChannelHandlerContext ctx = this.ctx;
2167 if (ctx == null) {
2168 throw new IllegalStateException();
2169 }
2170
2171 return renegotiate(ctx.executor().<Channel>newPromise());
2172 }
2173
2174
2175
2176
2177 public Future<Channel> renegotiate(final Promise<Channel> promise) {
2178 ObjectUtil.checkNotNull(promise, "promise");
2179
2180 ChannelHandlerContext ctx = this.ctx;
2181 if (ctx == null) {
2182 throw new IllegalStateException();
2183 }
2184
2185 EventExecutor executor = ctx.executor();
2186 if (!executor.inEventLoop()) {
2187 executor.execute(new Runnable() {
2188 @Override
2189 public void run() {
2190 renegotiateOnEventLoop(promise);
2191 }
2192 });
2193 return promise;
2194 }
2195
2196 renegotiateOnEventLoop(promise);
2197 return promise;
2198 }
2199
2200 private void renegotiateOnEventLoop(final Promise<Channel> newHandshakePromise) {
2201 final Promise<Channel> oldHandshakePromise = handshakePromise;
2202 if (!oldHandshakePromise.isDone()) {
2203
2204
2205 PromiseNotifier.cascade(oldHandshakePromise, newHandshakePromise);
2206 } else {
2207 handshakePromise = newHandshakePromise;
2208 handshake(true);
2209 applyHandshakeTimeout();
2210 }
2211 }
2212
2213
2214
2215
2216
2217
2218
2219 private void handshake(boolean flushAtEnd) {
2220 if (engine.getHandshakeStatus() != HandshakeStatus.NOT_HANDSHAKING) {
2221
2222
2223 return;
2224 }
2225 if (handshakePromise.isDone()) {
2226
2227
2228
2229
2230
2231 return;
2232 }
2233
2234
2235 final ChannelHandlerContext ctx = this.ctx;
2236 try {
2237 engine.beginHandshake();
2238 wrapNonAppData(ctx, false);
2239 } catch (Throwable e) {
2240 setHandshakeFailure(ctx, e);
2241 } finally {
2242 if (flushAtEnd) {
2243 forceFlush(ctx);
2244 }
2245 }
2246 }
2247
2248 private void applyHandshakeTimeout() {
2249 final Promise<Channel> localHandshakePromise = this.handshakePromise;
2250
2251
2252 final long handshakeTimeoutMillis = this.handshakeTimeoutMillis;
2253 if (handshakeTimeoutMillis <= 0 || localHandshakePromise.isDone()) {
2254 return;
2255 }
2256
2257 final Future<?> timeoutFuture = ctx.executor().schedule(new Runnable() {
2258 @Override
2259 public void run() {
2260 if (localHandshakePromise.isDone()) {
2261 return;
2262 }
2263 SSLException exception =
2264 new SslHandshakeTimeoutException("handshake timed out after " + handshakeTimeoutMillis + "ms");
2265 try {
2266 if (localHandshakePromise.tryFailure(exception)) {
2267 SslUtils.handleHandshakeFailure(ctx, exception, true);
2268 }
2269 } finally {
2270 releaseAndFailAll(ctx, exception);
2271 }
2272 }
2273 }, handshakeTimeoutMillis, TimeUnit.MILLISECONDS);
2274
2275
2276 localHandshakePromise.addListener(new FutureListener<Channel>() {
2277 @Override
2278 public void operationComplete(Future<Channel> f) throws Exception {
2279 timeoutFuture.cancel(false);
2280 }
2281 });
2282 }
2283
2284 private void forceFlush(ChannelHandlerContext ctx) {
2285 clearState(STATE_NEEDS_FLUSH);
2286 ctx.flush();
2287 }
2288
2289 private void setOpensslEngineSocketFd(Channel c) {
2290 if (c instanceof UnixChannel && engine instanceof ReferenceCountedOpenSslEngine) {
2291 ((ReferenceCountedOpenSslEngine) engine).bioSetFd(((UnixChannel) c).fd().intValue());
2292 }
2293 }
2294
2295
2296
2297
2298 @Override
2299 public void channelActive(final ChannelHandlerContext ctx) throws Exception {
2300 setOpensslEngineSocketFd(ctx.channel());
2301 if (!startTls) {
2302 startHandshakeProcessing(true);
2303 }
2304 ctx.fireChannelActive();
2305 }
2306
2307 private void safeClose(
2308 final ChannelHandlerContext ctx, final ChannelFuture flushFuture,
2309 final ChannelPromise promise) {
2310 if (!ctx.channel().isActive()) {
2311 ctx.close(promise);
2312 return;
2313 }
2314
2315 final Future<?> timeoutFuture;
2316 if (!flushFuture.isDone()) {
2317 long closeNotifyTimeout = closeNotifyFlushTimeoutMillis;
2318 if (closeNotifyTimeout > 0) {
2319
2320 timeoutFuture = ctx.executor().schedule(new Runnable() {
2321 @Override
2322 public void run() {
2323
2324 if (!flushFuture.isDone()) {
2325 logger.warn("{} Last write attempt timed out; force-closing the connection.",
2326 ctx.channel());
2327 addCloseListener(ctx.close(ctx.newPromise()), promise);
2328 }
2329 }
2330 }, closeNotifyTimeout, TimeUnit.MILLISECONDS);
2331 } else {
2332 timeoutFuture = null;
2333 }
2334 } else {
2335 timeoutFuture = null;
2336 }
2337
2338
2339 flushFuture.addListener(new ChannelFutureListener() {
2340 @Override
2341 public void operationComplete(ChannelFuture f) {
2342 if (timeoutFuture != null) {
2343 timeoutFuture.cancel(false);
2344 }
2345 final long closeNotifyReadTimeout = closeNotifyReadTimeoutMillis;
2346 if (closeNotifyReadTimeout <= 0) {
2347
2348
2349 addCloseListener(ctx.close(ctx.newPromise()), promise);
2350 } else {
2351 final Future<?> closeNotifyReadTimeoutFuture;
2352
2353 if (!sslClosePromise.isDone()) {
2354 closeNotifyReadTimeoutFuture = ctx.executor().schedule(new Runnable() {
2355 @Override
2356 public void run() {
2357 if (!sslClosePromise.isDone()) {
2358 logger.debug(
2359 "{} did not receive close_notify in {}ms; force-closing the connection.",
2360 ctx.channel(), closeNotifyReadTimeout);
2361
2362
2363 addCloseListener(ctx.close(ctx.newPromise()), promise);
2364 }
2365 }
2366 }, closeNotifyReadTimeout, TimeUnit.MILLISECONDS);
2367 } else {
2368 closeNotifyReadTimeoutFuture = null;
2369 }
2370
2371
2372 sslClosePromise.addListener(new FutureListener<Channel>() {
2373 @Override
2374 public void operationComplete(Future<Channel> future) throws Exception {
2375 if (closeNotifyReadTimeoutFuture != null) {
2376 closeNotifyReadTimeoutFuture.cancel(false);
2377 }
2378 addCloseListener(ctx.close(ctx.newPromise()), promise);
2379 }
2380 });
2381 }
2382 }
2383 });
2384 }
2385
2386 private static void addCloseListener(ChannelFuture future, ChannelPromise promise) {
2387
2388
2389
2390
2391
2392
2393 PromiseNotifier.cascade(false, future, promise);
2394 }
2395
2396
2397
2398
2399
2400 private ByteBuf allocate(ChannelHandlerContext ctx, int capacity) {
2401 ByteBufAllocator alloc = ctx.alloc();
2402 if (engineType.wantsDirectBuffer) {
2403 return alloc.directBuffer(capacity);
2404 } else {
2405 return alloc.buffer(capacity);
2406 }
2407 }
2408
2409
2410
2411
2412
2413 private ByteBuf allocateOutNetBuf(ChannelHandlerContext ctx, int pendingBytes, int numComponents) {
2414 return engineType.allocateWrapBuffer(this, ctx.alloc(), pendingBytes, numComponents);
2415 }
2416
2417 private boolean isStateSet(int bit) {
2418 return (state & bit) == bit;
2419 }
2420
2421 private void setState(int bit) {
2422 state |= bit;
2423 }
2424
2425 private void clearState(int bit) {
2426 state &= ~bit;
2427 }
2428
2429 private final class LazyChannelPromise extends DefaultPromise<Channel> {
2430
2431 @Override
2432 protected EventExecutor executor() {
2433 if (ctx == null) {
2434 throw new IllegalStateException();
2435 }
2436 return ctx.executor();
2437 }
2438
2439 @Override
2440 protected void checkDeadLock() {
2441 if (ctx == null) {
2442
2443
2444
2445
2446
2447
2448 return;
2449 }
2450 super.checkDeadLock();
2451 }
2452 }
2453 }