1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.ssl;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.buffer.ByteBufUtil;
21 import io.netty.buffer.CompositeByteBuf;
22 import io.netty.buffer.Unpooled;
23 import io.netty.channel.Channel;
24 import io.netty.channel.ChannelConfig;
25 import io.netty.channel.ChannelException;
26 import io.netty.channel.ChannelFuture;
27 import io.netty.channel.ChannelFutureListener;
28 import io.netty.channel.ChannelHandlerContext;
29 import io.netty.channel.ChannelInboundHandler;
30 import io.netty.channel.ChannelOption;
31 import io.netty.channel.ChannelOutboundBuffer;
32 import io.netty.channel.ChannelOutboundHandler;
33 import io.netty.channel.ChannelPipeline;
34 import io.netty.channel.ChannelPromise;
35 import io.netty.channel.unix.UnixChannel;
36 import io.netty.handler.codec.ByteToMessageDecoder;
37 import io.netty.handler.codec.DecoderException;
38 import io.netty.handler.codec.UnsupportedMessageTypeException;
39 import io.netty.util.ReferenceCountUtil;
40 import io.netty.util.concurrent.DefaultPromise;
41 import io.netty.util.concurrent.EventExecutor;
42 import io.netty.util.concurrent.Future;
43 import io.netty.util.concurrent.FutureListener;
44 import io.netty.util.concurrent.ImmediateExecutor;
45 import io.netty.util.concurrent.Promise;
46 import io.netty.util.concurrent.PromiseNotifier;
47 import io.netty.util.internal.ObjectUtil;
48 import io.netty.util.internal.PlatformDependent;
49 import io.netty.util.internal.ThrowableUtil;
50 import io.netty.util.internal.UnstableApi;
51 import io.netty.util.internal.logging.InternalLogger;
52 import io.netty.util.internal.logging.InternalLoggerFactory;
53
54 import java.io.IOException;
55 import java.net.SocketAddress;
56 import java.nio.ByteBuffer;
57 import java.nio.channels.ClosedChannelException;
58 import java.nio.channels.DatagramChannel;
59 import java.nio.channels.SocketChannel;
60 import java.security.cert.CertificateException;
61 import java.util.List;
62 import java.util.concurrent.Executor;
63 import java.util.concurrent.RejectedExecutionException;
64 import java.util.concurrent.TimeUnit;
65 import java.util.regex.Pattern;
66
67 import javax.net.ssl.SSLEngine;
68 import javax.net.ssl.SSLEngineResult;
69 import javax.net.ssl.SSLEngineResult.HandshakeStatus;
70 import javax.net.ssl.SSLEngineResult.Status;
71 import javax.net.ssl.SSLException;
72 import javax.net.ssl.SSLHandshakeException;
73 import javax.net.ssl.SSLSession;
74
75 import static io.netty.handler.ssl.SslUtils.NOT_ENOUGH_DATA;
76 import static io.netty.handler.ssl.SslUtils.getEncryptedPacketLength;
77 import static io.netty.util.internal.ObjectUtil.checkNotNull;
78 import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170 public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundHandler {
171 private static final InternalLogger logger =
172 InternalLoggerFactory.getInstance(SslHandler.class);
173 private static final Pattern IGNORABLE_CLASS_IN_STACK = Pattern.compile(
174 "^.*(?:Socket|Datagram|Sctp|Udt)Channel.*$");
175 private static final Pattern IGNORABLE_ERROR_MESSAGE = Pattern.compile(
176 "^.*(?:connection.*(?:reset|closed|abort|broken)|broken.*pipe).*$", Pattern.CASE_INSENSITIVE);
177 private static final int STATE_SENT_FIRST_MESSAGE = 1;
178 private static final int STATE_FLUSHED_BEFORE_HANDSHAKE = 1 << 1;
179 private static final int STATE_READ_DURING_HANDSHAKE = 1 << 2;
180 private static final int STATE_HANDSHAKE_STARTED = 1 << 3;
181
182
183
184
185 private static final int STATE_NEEDS_FLUSH = 1 << 4;
186 private static final int STATE_OUTBOUND_CLOSED = 1 << 5;
187 private static final int STATE_CLOSE_NOTIFY = 1 << 6;
188 private static final int STATE_PROCESS_TASK = 1 << 7;
189
190
191
192
193 private static final int STATE_FIRE_CHANNEL_READ = 1 << 8;
194 private static final int STATE_UNWRAP_REENTRY = 1 << 9;
195
196
197
198
199
200 private static final int MAX_PLAINTEXT_LENGTH = 16 * 1024;
201
202 private enum SslEngineType {
203 TCNATIVE(true, COMPOSITE_CUMULATOR) {
204 @Override
205 SSLEngineResult unwrap(SslHandler handler, ByteBuf in, int len, ByteBuf out) throws SSLException {
206 int nioBufferCount = in.nioBufferCount();
207 int writerIndex = out.writerIndex();
208 final SSLEngineResult result;
209 if (nioBufferCount > 1) {
210
211
212
213
214
215 ReferenceCountedOpenSslEngine opensslEngine = (ReferenceCountedOpenSslEngine) handler.engine;
216 try {
217 handler.singleBuffer[0] = toByteBuffer(out, writerIndex, out.writableBytes());
218 result = opensslEngine.unwrap(in.nioBuffers(in.readerIndex(), len), handler.singleBuffer);
219 } finally {
220 handler.singleBuffer[0] = null;
221 }
222 } else {
223 result = handler.engine.unwrap(toByteBuffer(in, in.readerIndex(), len),
224 toByteBuffer(out, writerIndex, out.writableBytes()));
225 }
226 out.writerIndex(writerIndex + result.bytesProduced());
227 return result;
228 }
229
230 @Override
231 ByteBuf allocateWrapBuffer(SslHandler handler, ByteBufAllocator allocator,
232 int pendingBytes, int numComponents) {
233 return allocator.directBuffer(((ReferenceCountedOpenSslEngine) handler.engine)
234 .calculateOutNetBufSize(pendingBytes, numComponents));
235 }
236
237 @Override
238 int calculateRequiredOutBufSpace(SslHandler handler, int pendingBytes, int numComponents) {
239 return ((ReferenceCountedOpenSslEngine) handler.engine)
240 .calculateMaxLengthForWrap(pendingBytes, numComponents);
241 }
242
243 @Override
244 int calculatePendingData(SslHandler handler, int guess) {
245 int sslPending = ((ReferenceCountedOpenSslEngine) handler.engine).sslPending();
246 return sslPending > 0 ? sslPending : guess;
247 }
248
249 @Override
250 boolean jdkCompatibilityMode(SSLEngine engine) {
251 return ((ReferenceCountedOpenSslEngine) engine).jdkCompatibilityMode;
252 }
253 },
254 CONSCRYPT(true, COMPOSITE_CUMULATOR) {
255 @Override
256 SSLEngineResult unwrap(SslHandler handler, ByteBuf in, int len, ByteBuf out) throws SSLException {
257 int nioBufferCount = in.nioBufferCount();
258 int writerIndex = out.writerIndex();
259 final SSLEngineResult result;
260 if (nioBufferCount > 1) {
261
262
263
264 try {
265 handler.singleBuffer[0] = toByteBuffer(out, writerIndex, out.writableBytes());
266 result = ((ConscryptAlpnSslEngine) handler.engine).unwrap(
267 in.nioBuffers(in.readerIndex(), len),
268 handler.singleBuffer);
269 } finally {
270 handler.singleBuffer[0] = null;
271 }
272 } else {
273 result = handler.engine.unwrap(toByteBuffer(in, in.readerIndex(), len),
274 toByteBuffer(out, writerIndex, out.writableBytes()));
275 }
276 out.writerIndex(writerIndex + result.bytesProduced());
277 return result;
278 }
279
280 @Override
281 ByteBuf allocateWrapBuffer(SslHandler handler, ByteBufAllocator allocator,
282 int pendingBytes, int numComponents) {
283 return allocator.directBuffer(
284 ((ConscryptAlpnSslEngine) handler.engine).calculateOutNetBufSize(pendingBytes, numComponents));
285 }
286
287 @Override
288 int calculateRequiredOutBufSpace(SslHandler handler, int pendingBytes, int numComponents) {
289 return ((ConscryptAlpnSslEngine) handler.engine)
290 .calculateRequiredOutBufSpace(pendingBytes, numComponents);
291 }
292
293 @Override
294 int calculatePendingData(SslHandler handler, int guess) {
295 return guess;
296 }
297
298 @Override
299 boolean jdkCompatibilityMode(SSLEngine engine) {
300 return true;
301 }
302 },
303 JDK(false, MERGE_CUMULATOR) {
304 @Override
305 SSLEngineResult unwrap(SslHandler handler, ByteBuf in, int len, ByteBuf out) throws SSLException {
306 int writerIndex = out.writerIndex();
307 ByteBuffer inNioBuffer = toByteBuffer(in, in.readerIndex(), len);
308 int position = inNioBuffer.position();
309 final SSLEngineResult result = handler.engine.unwrap(inNioBuffer,
310 toByteBuffer(out, writerIndex, out.writableBytes()));
311 out.writerIndex(writerIndex + result.bytesProduced());
312
313
314
315
316
317
318
319 if (result.bytesConsumed() == 0) {
320 int consumed = inNioBuffer.position() - position;
321 if (consumed != result.bytesConsumed()) {
322
323 return new SSLEngineResult(
324 result.getStatus(), result.getHandshakeStatus(), consumed, result.bytesProduced());
325 }
326 }
327 return result;
328 }
329
330 @Override
331 ByteBuf allocateWrapBuffer(SslHandler handler, ByteBufAllocator allocator,
332 int pendingBytes, int numComponents) {
333
334
335 return allocator.heapBuffer(Math.max(pendingBytes, handler.engine.getSession().getPacketBufferSize()));
336 }
337
338 @Override
339 int calculateRequiredOutBufSpace(SslHandler handler, int pendingBytes, int numComponents) {
340
341
342
343
344
345
346 return handler.engine.getSession().getPacketBufferSize();
347 }
348
349 @Override
350 int calculatePendingData(SslHandler handler, int guess) {
351 return guess;
352 }
353
354 @Override
355 boolean jdkCompatibilityMode(SSLEngine engine) {
356 return true;
357 }
358 };
359
360 static SslEngineType forEngine(SSLEngine engine) {
361 return engine instanceof ReferenceCountedOpenSslEngine ? TCNATIVE :
362 engine instanceof ConscryptAlpnSslEngine ? CONSCRYPT : JDK;
363 }
364
365 SslEngineType(boolean wantsDirectBuffer, Cumulator cumulator) {
366 this.wantsDirectBuffer = wantsDirectBuffer;
367 this.cumulator = cumulator;
368 }
369
370 abstract SSLEngineResult unwrap(SslHandler handler, ByteBuf in, int len, ByteBuf out) throws SSLException;
371
372 abstract int calculatePendingData(SslHandler handler, int guess);
373
374 abstract boolean jdkCompatibilityMode(SSLEngine engine);
375
376 abstract ByteBuf allocateWrapBuffer(SslHandler handler, ByteBufAllocator allocator,
377 int pendingBytes, int numComponents);
378
379 abstract int calculateRequiredOutBufSpace(SslHandler handler, int pendingBytes, int numComponents);
380
381
382
383
384
385
386
387 final boolean wantsDirectBuffer;
388
389
390
391
392
393
394
395
396
397
398
399 final Cumulator cumulator;
400 }
401
402 private volatile ChannelHandlerContext ctx;
403 private final SSLEngine engine;
404 private final SslEngineType engineType;
405 private final Executor delegatedTaskExecutor;
406 private final boolean jdkCompatibilityMode;
407
408
409
410
411
412
413 private final ByteBuffer[] singleBuffer = new ByteBuffer[1];
414
415 private final boolean startTls;
416 private final ResumptionController resumptionController;
417
418 private final SslTasksRunner sslTaskRunnerForUnwrap = new SslTasksRunner(true);
419 private final SslTasksRunner sslTaskRunner = new SslTasksRunner(false);
420
421 private SslHandlerCoalescingBufferQueue pendingUnencryptedWrites;
422 private Promise<Channel> handshakePromise = new LazyChannelPromise();
423 private final LazyChannelPromise sslClosePromise = new LazyChannelPromise();
424
425 private int packetLength;
426 private short state;
427
428 private volatile long handshakeTimeoutMillis = 10000;
429 private volatile long closeNotifyFlushTimeoutMillis = 3000;
430 private volatile long closeNotifyReadTimeoutMillis;
431 volatile int wrapDataSize = MAX_PLAINTEXT_LENGTH;
432
433
434
435
436
437
438 public SslHandler(SSLEngine engine) {
439 this(engine, false);
440 }
441
442
443
444
445
446
447
448
449 public SslHandler(SSLEngine engine, boolean startTls) {
450 this(engine, startTls, ImmediateExecutor.INSTANCE);
451 }
452
453
454
455
456
457
458
459
460 public SslHandler(SSLEngine engine, Executor delegatedTaskExecutor) {
461 this(engine, false, delegatedTaskExecutor);
462 }
463
464
465
466
467
468
469
470
471
472
473 public SslHandler(SSLEngine engine, boolean startTls, Executor delegatedTaskExecutor) {
474 this(engine, startTls, delegatedTaskExecutor, null);
475 }
476
477 SslHandler(SSLEngine engine, boolean startTls, Executor delegatedTaskExecutor,
478 ResumptionController resumptionController) {
479 this.engine = ObjectUtil.checkNotNull(engine, "engine");
480 this.delegatedTaskExecutor = ObjectUtil.checkNotNull(delegatedTaskExecutor, "delegatedTaskExecutor");
481 engineType = SslEngineType.forEngine(engine);
482 this.startTls = startTls;
483 this.jdkCompatibilityMode = engineType.jdkCompatibilityMode(engine);
484 setCumulator(engineType.cumulator);
485 this.resumptionController = resumptionController;
486 }
487
488 public long getHandshakeTimeoutMillis() {
489 return handshakeTimeoutMillis;
490 }
491
492 public void setHandshakeTimeout(long handshakeTimeout, TimeUnit unit) {
493 checkNotNull(unit, "unit");
494 setHandshakeTimeoutMillis(unit.toMillis(handshakeTimeout));
495 }
496
497 public void setHandshakeTimeoutMillis(long handshakeTimeoutMillis) {
498 this.handshakeTimeoutMillis = checkPositiveOrZero(handshakeTimeoutMillis, "handshakeTimeoutMillis");
499 }
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521 @UnstableApi
522 public final void setWrapDataSize(int wrapDataSize) {
523 this.wrapDataSize = wrapDataSize;
524 }
525
526
527
528
529 @Deprecated
530 public long getCloseNotifyTimeoutMillis() {
531 return getCloseNotifyFlushTimeoutMillis();
532 }
533
534
535
536
537 @Deprecated
538 public void setCloseNotifyTimeout(long closeNotifyTimeout, TimeUnit unit) {
539 setCloseNotifyFlushTimeout(closeNotifyTimeout, unit);
540 }
541
542
543
544
545 @Deprecated
546 public void setCloseNotifyTimeoutMillis(long closeNotifyFlushTimeoutMillis) {
547 setCloseNotifyFlushTimeoutMillis(closeNotifyFlushTimeoutMillis);
548 }
549
550
551
552
553
554
555 public final long getCloseNotifyFlushTimeoutMillis() {
556 return closeNotifyFlushTimeoutMillis;
557 }
558
559
560
561
562
563
564 public final void setCloseNotifyFlushTimeout(long closeNotifyFlushTimeout, TimeUnit unit) {
565 setCloseNotifyFlushTimeoutMillis(unit.toMillis(closeNotifyFlushTimeout));
566 }
567
568
569
570
571 public final void setCloseNotifyFlushTimeoutMillis(long closeNotifyFlushTimeoutMillis) {
572 this.closeNotifyFlushTimeoutMillis = checkPositiveOrZero(closeNotifyFlushTimeoutMillis,
573 "closeNotifyFlushTimeoutMillis");
574 }
575
576
577
578
579
580
581 public final long getCloseNotifyReadTimeoutMillis() {
582 return closeNotifyReadTimeoutMillis;
583 }
584
585
586
587
588
589
590 public final void setCloseNotifyReadTimeout(long closeNotifyReadTimeout, TimeUnit unit) {
591 setCloseNotifyReadTimeoutMillis(unit.toMillis(closeNotifyReadTimeout));
592 }
593
594
595
596
597 public final void setCloseNotifyReadTimeoutMillis(long closeNotifyReadTimeoutMillis) {
598 this.closeNotifyReadTimeoutMillis = checkPositiveOrZero(closeNotifyReadTimeoutMillis,
599 "closeNotifyReadTimeoutMillis");
600 }
601
602
603
604
605 public SSLEngine engine() {
606 return engine;
607 }
608
609
610
611
612
613
614 public String applicationProtocol() {
615 SSLEngine engine = engine();
616 if (!(engine instanceof ApplicationProtocolAccessor)) {
617 return null;
618 }
619
620 return ((ApplicationProtocolAccessor) engine).getNegotiatedApplicationProtocol();
621 }
622
623
624
625
626
627
628
629 public Future<Channel> handshakeFuture() {
630 return handshakePromise;
631 }
632
633
634
635
636 @Deprecated
637 public ChannelFuture close() {
638 return closeOutbound();
639 }
640
641
642
643
644 @Deprecated
645 public ChannelFuture close(ChannelPromise promise) {
646 return closeOutbound(promise);
647 }
648
649
650
651
652
653
654
655 public ChannelFuture closeOutbound() {
656 return closeOutbound(ctx.newPromise());
657 }
658
659
660
661
662
663
664
665 public ChannelFuture closeOutbound(final ChannelPromise promise) {
666 final ChannelHandlerContext ctx = this.ctx;
667 if (ctx.executor().inEventLoop()) {
668 closeOutbound0(promise);
669 } else {
670 ctx.executor().execute(new Runnable() {
671 @Override
672 public void run() {
673 closeOutbound0(promise);
674 }
675 });
676 }
677 return promise;
678 }
679
680 private void closeOutbound0(ChannelPromise promise) {
681 setState(STATE_OUTBOUND_CLOSED);
682 engine.closeOutbound();
683 try {
684 flush(ctx, promise);
685 } catch (Exception e) {
686 if (!promise.tryFailure(e)) {
687 logger.warn("{} flush() raised a masked exception.", ctx.channel(), e);
688 }
689 }
690 }
691
692
693
694
695
696
697
698
699 public Future<Channel> sslCloseFuture() {
700 return sslClosePromise;
701 }
702
703 @Override
704 public void handlerRemoved0(ChannelHandlerContext ctx) throws Exception {
705 try {
706 if (pendingUnencryptedWrites != null && !pendingUnencryptedWrites.isEmpty()) {
707
708 pendingUnencryptedWrites.releaseAndFailAll(ctx,
709 new ChannelException("Pending write on removal of SslHandler"));
710 }
711 pendingUnencryptedWrites = null;
712
713 SSLException cause = null;
714
715
716
717
718 if (!handshakePromise.isDone()) {
719 cause = new SSLHandshakeException("SslHandler removed before handshake completed");
720 if (handshakePromise.tryFailure(cause)) {
721 ctx.fireUserEventTriggered(new SslHandshakeCompletionEvent(cause));
722 }
723 }
724 if (!sslClosePromise.isDone()) {
725 if (cause == null) {
726 cause = new SSLException("SslHandler removed before SSLEngine was closed");
727 }
728 notifyClosePromise(cause);
729 }
730 } finally {
731 ReferenceCountUtil.release(engine);
732 }
733 }
734
735 @Override
736 public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
737 ctx.bind(localAddress, promise);
738 }
739
740 @Override
741 public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress,
742 ChannelPromise promise) throws Exception {
743 ctx.connect(remoteAddress, localAddress, promise);
744 }
745
746 @Override
747 public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
748 ctx.deregister(promise);
749 }
750
751 @Override
752 public void disconnect(final ChannelHandlerContext ctx,
753 final ChannelPromise promise) throws Exception {
754 closeOutboundAndChannel(ctx, promise, true);
755 }
756
757 @Override
758 public void close(final ChannelHandlerContext ctx,
759 final ChannelPromise promise) throws Exception {
760 closeOutboundAndChannel(ctx, promise, false);
761 }
762
763 @Override
764 public void read(ChannelHandlerContext ctx) throws Exception {
765 if (!handshakePromise.isDone()) {
766 setState(STATE_READ_DURING_HANDSHAKE);
767 }
768
769 ctx.read();
770 }
771
772 private static IllegalStateException newPendingWritesNullException() {
773 return new IllegalStateException("pendingUnencryptedWrites is null, handlerRemoved0 called?");
774 }
775
776 @Override
777 public void write(final ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
778 if (!(msg instanceof ByteBuf)) {
779 UnsupportedMessageTypeException exception = new UnsupportedMessageTypeException(msg, ByteBuf.class);
780 ReferenceCountUtil.safeRelease(msg);
781 promise.setFailure(exception);
782 } else if (pendingUnencryptedWrites == null) {
783 ReferenceCountUtil.safeRelease(msg);
784 promise.setFailure(newPendingWritesNullException());
785 } else {
786 pendingUnencryptedWrites.add((ByteBuf) msg, promise);
787 }
788 }
789
790 @Override
791 public void flush(ChannelHandlerContext ctx) throws Exception {
792
793
794 if (startTls && !isStateSet(STATE_SENT_FIRST_MESSAGE)) {
795 setState(STATE_SENT_FIRST_MESSAGE);
796 pendingUnencryptedWrites.writeAndRemoveAll(ctx);
797 forceFlush(ctx);
798
799
800 startHandshakeProcessing(true);
801 return;
802 }
803
804 if (isStateSet(STATE_PROCESS_TASK)) {
805 return;
806 }
807
808 try {
809 wrapAndFlush(ctx);
810 } catch (Throwable cause) {
811 setHandshakeFailure(ctx, cause);
812 PlatformDependent.throwException(cause);
813 }
814 }
815
816 private void wrapAndFlush(ChannelHandlerContext ctx) throws SSLException {
817 if (pendingUnencryptedWrites.isEmpty()) {
818
819
820
821
822 pendingUnencryptedWrites.add(Unpooled.EMPTY_BUFFER, ctx.newPromise());
823 }
824 if (!handshakePromise.isDone()) {
825 setState(STATE_FLUSHED_BEFORE_HANDSHAKE);
826 }
827 try {
828 wrap(ctx, false);
829 } finally {
830
831
832 forceFlush(ctx);
833 }
834 }
835
836
837 private void wrap(ChannelHandlerContext ctx, boolean inUnwrap) throws SSLException {
838 ByteBuf out = null;
839 ByteBufAllocator alloc = ctx.alloc();
840 try {
841 final int wrapDataSize = this.wrapDataSize;
842
843
844 outer: while (!ctx.isRemoved()) {
845 ChannelPromise promise = ctx.newPromise();
846 ByteBuf buf = wrapDataSize > 0 ?
847 pendingUnencryptedWrites.remove(alloc, wrapDataSize, promise) :
848 pendingUnencryptedWrites.removeFirst(promise);
849 if (buf == null) {
850 break;
851 }
852
853 SSLEngineResult result;
854
855 try {
856 if (buf.readableBytes() > MAX_PLAINTEXT_LENGTH) {
857
858
859
860
861
862 int readableBytes = buf.readableBytes();
863 int numPackets = readableBytes / MAX_PLAINTEXT_LENGTH;
864 if (readableBytes % MAX_PLAINTEXT_LENGTH != 0) {
865 numPackets += 1;
866 }
867
868 if (out == null) {
869 out = allocateOutNetBuf(ctx, readableBytes, buf.nioBufferCount() + numPackets);
870 }
871 result = wrapMultiple(alloc, engine, buf, out);
872 } else {
873 if (out == null) {
874 out = allocateOutNetBuf(ctx, buf.readableBytes(), buf.nioBufferCount());
875 }
876 result = wrap(alloc, engine, buf, out);
877 }
878 } catch (SSLException e) {
879
880
881
882
883
884
885 buf.release();
886 promise.setFailure(e);
887 throw e;
888 }
889
890 if (buf.isReadable()) {
891 pendingUnencryptedWrites.addFirst(buf, promise);
892
893
894 promise = null;
895 } else {
896 buf.release();
897 }
898
899
900
901 if (out.isReadable()) {
902 final ByteBuf b = out;
903 out = null;
904 if (promise != null) {
905 ctx.write(b, promise);
906 } else {
907 ctx.write(b);
908 }
909 } else if (promise != null) {
910 ctx.write(Unpooled.EMPTY_BUFFER, promise);
911 }
912
913
914 if (result.getStatus() == Status.CLOSED) {
915
916
917 if (!pendingUnencryptedWrites.isEmpty()) {
918
919
920 Throwable exception = handshakePromise.cause();
921 if (exception == null) {
922 exception = sslClosePromise.cause();
923 if (exception == null) {
924 exception = new SslClosedEngineException("SSLEngine closed already");
925 }
926 }
927 pendingUnencryptedWrites.releaseAndFailAll(ctx, exception);
928 }
929
930 return;
931 } else {
932 switch (result.getHandshakeStatus()) {
933 case NEED_TASK:
934 if (!runDelegatedTasks(inUnwrap)) {
935
936
937 break outer;
938 }
939 break;
940 case FINISHED:
941 case NOT_HANDSHAKING:
942 setHandshakeSuccess();
943 break;
944 case NEED_WRAP:
945
946
947
948
949 if (result.bytesProduced() > 0 && pendingUnencryptedWrites.isEmpty()) {
950 pendingUnencryptedWrites.add(Unpooled.EMPTY_BUFFER);
951 }
952 break;
953 case NEED_UNWRAP:
954
955
956 readIfNeeded(ctx);
957 return;
958 default:
959 throw new IllegalStateException(
960 "Unknown handshake status: " + result.getHandshakeStatus());
961 }
962 }
963 }
964 } finally {
965 if (out != null) {
966 out.release();
967 }
968 if (inUnwrap) {
969 setState(STATE_NEEDS_FLUSH);
970 }
971 }
972 }
973
974
975
976
977
978
979
980 private boolean wrapNonAppData(final ChannelHandlerContext ctx, boolean inUnwrap) throws SSLException {
981 ByteBuf out = null;
982 ByteBufAllocator alloc = ctx.alloc();
983 try {
984
985
986 outer: while (!ctx.isRemoved()) {
987 if (out == null) {
988
989
990
991 out = allocateOutNetBuf(ctx, 2048, 1);
992 }
993 SSLEngineResult result = wrap(alloc, engine, Unpooled.EMPTY_BUFFER, out);
994 if (result.bytesProduced() > 0) {
995 ctx.write(out).addListener(new ChannelFutureListener() {
996 @Override
997 public void operationComplete(ChannelFuture future) {
998 Throwable cause = future.cause();
999 if (cause != null) {
1000 setHandshakeFailureTransportFailure(ctx, cause);
1001 }
1002 }
1003 });
1004 if (inUnwrap) {
1005 setState(STATE_NEEDS_FLUSH);
1006 }
1007 out = null;
1008 }
1009
1010 HandshakeStatus status = result.getHandshakeStatus();
1011 switch (status) {
1012 case FINISHED:
1013
1014
1015
1016
1017 if (setHandshakeSuccess() && inUnwrap && !pendingUnencryptedWrites.isEmpty()) {
1018 wrap(ctx, true);
1019 }
1020 return false;
1021 case NEED_TASK:
1022 if (!runDelegatedTasks(inUnwrap)) {
1023
1024
1025 break outer;
1026 }
1027 break;
1028 case NEED_UNWRAP:
1029 if (inUnwrap || unwrapNonAppData(ctx) <= 0) {
1030
1031
1032
1033 return false;
1034 }
1035 break;
1036 case NEED_WRAP:
1037 break;
1038 case NOT_HANDSHAKING:
1039 if (setHandshakeSuccess() && inUnwrap && !pendingUnencryptedWrites.isEmpty()) {
1040 wrap(ctx, true);
1041 }
1042
1043
1044 if (!inUnwrap) {
1045 unwrapNonAppData(ctx);
1046 }
1047 return true;
1048 default:
1049 throw new IllegalStateException("Unknown handshake status: " + result.getHandshakeStatus());
1050 }
1051
1052
1053
1054 if (result.bytesProduced() == 0 && status != HandshakeStatus.NEED_TASK) {
1055 break;
1056 }
1057
1058
1059
1060 if (result.bytesConsumed() == 0 && result.getHandshakeStatus() == HandshakeStatus.NOT_HANDSHAKING) {
1061 break;
1062 }
1063 }
1064 } finally {
1065 if (out != null) {
1066 out.release();
1067 }
1068 }
1069 return false;
1070 }
1071
1072 private SSLEngineResult wrapMultiple(ByteBufAllocator alloc, SSLEngine engine, ByteBuf in, ByteBuf out)
1073 throws SSLException {
1074 SSLEngineResult result = null;
1075
1076 do {
1077 int nextSliceSize = Math.min(MAX_PLAINTEXT_LENGTH, in.readableBytes());
1078
1079
1080 int nextOutSize = engineType.calculateRequiredOutBufSpace(this, nextSliceSize, in.nioBufferCount());
1081
1082 if (!out.isWritable(nextOutSize)) {
1083 if (result != null) {
1084
1085
1086 break;
1087 }
1088
1089
1090 out.ensureWritable(nextOutSize);
1091 }
1092
1093 ByteBuf wrapBuf = in.readSlice(nextSliceSize);
1094 result = wrap(alloc, engine, wrapBuf, out);
1095
1096 if (result.getStatus() == Status.CLOSED) {
1097
1098
1099 break;
1100 }
1101
1102 if (wrapBuf.isReadable()) {
1103
1104
1105 in.readerIndex(in.readerIndex() - wrapBuf.readableBytes());
1106 }
1107 } while (in.readableBytes() > 0);
1108
1109 return result;
1110 }
1111
1112 private SSLEngineResult wrap(ByteBufAllocator alloc, SSLEngine engine, ByteBuf in, ByteBuf out)
1113 throws SSLException {
1114 ByteBuf newDirectIn = null;
1115 try {
1116 int readerIndex = in.readerIndex();
1117 int readableBytes = in.readableBytes();
1118
1119
1120
1121 final ByteBuffer[] in0;
1122 if (in.isDirect() || !engineType.wantsDirectBuffer) {
1123
1124
1125
1126
1127 if (!(in instanceof CompositeByteBuf) && in.nioBufferCount() == 1) {
1128 in0 = singleBuffer;
1129
1130
1131 in0[0] = in.internalNioBuffer(readerIndex, readableBytes);
1132 } else {
1133 in0 = in.nioBuffers();
1134 }
1135 } else {
1136
1137
1138
1139 newDirectIn = alloc.directBuffer(readableBytes);
1140 newDirectIn.writeBytes(in, readerIndex, readableBytes);
1141 in0 = singleBuffer;
1142 in0[0] = newDirectIn.internalNioBuffer(newDirectIn.readerIndex(), readableBytes);
1143 }
1144
1145 for (;;) {
1146
1147
1148 ByteBuffer out0 = toByteBuffer(out, out.writerIndex(), out.writableBytes());
1149 SSLEngineResult result = engine.wrap(in0, out0);
1150 in.skipBytes(result.bytesConsumed());
1151 out.writerIndex(out.writerIndex() + result.bytesProduced());
1152
1153 if (result.getStatus() == Status.BUFFER_OVERFLOW) {
1154 out.ensureWritable(engine.getSession().getPacketBufferSize());
1155 } else {
1156 return result;
1157 }
1158 }
1159 } finally {
1160
1161 singleBuffer[0] = null;
1162
1163 if (newDirectIn != null) {
1164 newDirectIn.release();
1165 }
1166 }
1167 }
1168
1169 @Override
1170 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
1171 boolean handshakeFailed = handshakePromise.cause() != null;
1172
1173
1174 ClosedChannelException exception = new ClosedChannelException();
1175
1176
1177 if (isStateSet(STATE_HANDSHAKE_STARTED) && !handshakePromise.isDone()) {
1178 ThrowableUtil.addSuppressed(exception, StacklessSSLHandshakeException.newInstance(
1179 "Connection closed while SSL/TLS handshake was in progress",
1180 SslHandler.class, "channelInactive"));
1181 }
1182
1183
1184
1185 setHandshakeFailure(ctx, exception, !isStateSet(STATE_OUTBOUND_CLOSED), isStateSet(STATE_HANDSHAKE_STARTED),
1186 false);
1187
1188
1189 notifyClosePromise(exception);
1190
1191 try {
1192 super.channelInactive(ctx);
1193 } catch (DecoderException e) {
1194 if (!handshakeFailed || !(e.getCause() instanceof SSLException)) {
1195
1196
1197
1198
1199
1200 throw e;
1201 }
1202 }
1203 }
1204
1205 @Override
1206 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
1207 if (ignoreException(cause)) {
1208
1209
1210 if (logger.isDebugEnabled()) {
1211 logger.debug(
1212 "{} Swallowing a harmless 'connection reset by peer / broken pipe' error that occurred " +
1213 "while writing close_notify in response to the peer's close_notify", ctx.channel(), cause);
1214 }
1215
1216
1217
1218 if (ctx.channel().isActive()) {
1219 ctx.close();
1220 }
1221 } else {
1222 ctx.fireExceptionCaught(cause);
1223 }
1224 }
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235 private boolean ignoreException(Throwable t) {
1236 if (!(t instanceof SSLException) && t instanceof IOException && sslClosePromise.isDone()) {
1237 String message = t.getMessage();
1238
1239
1240
1241 if (message != null && IGNORABLE_ERROR_MESSAGE.matcher(message).matches()) {
1242 return true;
1243 }
1244
1245
1246 StackTraceElement[] elements = t.getStackTrace();
1247 for (StackTraceElement element: elements) {
1248 String classname = element.getClassName();
1249 String methodname = element.getMethodName();
1250
1251
1252 if (classname.startsWith("io.netty.")) {
1253 continue;
1254 }
1255
1256
1257 if (!"read".equals(methodname)) {
1258 continue;
1259 }
1260
1261
1262
1263 if (IGNORABLE_CLASS_IN_STACK.matcher(classname).matches()) {
1264 return true;
1265 }
1266
1267 try {
1268
1269
1270
1271 Class<?> clazz = PlatformDependent.getClassLoader(getClass()).loadClass(classname);
1272
1273 if (SocketChannel.class.isAssignableFrom(clazz)
1274 || DatagramChannel.class.isAssignableFrom(clazz)) {
1275 return true;
1276 }
1277
1278
1279 if ("com.sun.nio.sctp.SctpChannel".equals(clazz.getSuperclass().getName())) {
1280 return true;
1281 }
1282 } catch (Throwable cause) {
1283 if (logger.isDebugEnabled()) {
1284 logger.debug("Unexpected exception while loading class {} classname {}",
1285 getClass(), classname, cause);
1286 }
1287 }
1288 }
1289 }
1290
1291 return false;
1292 }
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307 @Deprecated
1308 public static boolean isEncrypted(ByteBuf buffer) {
1309 return isEncrypted(buffer, false);
1310 }
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328 public static boolean isEncrypted(ByteBuf buffer, boolean probeSSLv2) {
1329 if (buffer.readableBytes() < SslUtils.SSL_RECORD_HEADER_LENGTH) {
1330 throw new IllegalArgumentException(
1331 "buffer must have at least " + SslUtils.SSL_RECORD_HEADER_LENGTH + " readable bytes");
1332 }
1333 return getEncryptedPacketLength(buffer, buffer.readerIndex(), probeSSLv2) != SslUtils.NOT_ENCRYPTED;
1334 }
1335
1336 private void decodeJdkCompatible(ChannelHandlerContext ctx, ByteBuf in) throws NotSslRecordException {
1337 int packetLength = this.packetLength;
1338
1339 if (packetLength > 0) {
1340 if (in.readableBytes() < packetLength) {
1341 return;
1342 }
1343 } else {
1344
1345 final int readableBytes = in.readableBytes();
1346 if (readableBytes < SslUtils.SSL_RECORD_HEADER_LENGTH) {
1347 return;
1348 }
1349 packetLength = getEncryptedPacketLength(in, in.readerIndex(), true);
1350 if (packetLength == SslUtils.NOT_ENCRYPTED) {
1351
1352 NotSslRecordException e = new NotSslRecordException(
1353 "not an SSL/TLS record: " + ByteBufUtil.hexDump(in));
1354 in.skipBytes(in.readableBytes());
1355
1356
1357
1358 setHandshakeFailure(ctx, e);
1359
1360 throw e;
1361 }
1362 if (packetLength == NOT_ENOUGH_DATA) {
1363 return;
1364 }
1365 assert packetLength > 0;
1366 if (packetLength > readableBytes) {
1367
1368 this.packetLength = packetLength;
1369 return;
1370 }
1371 }
1372
1373
1374
1375 this.packetLength = 0;
1376 try {
1377 final int bytesConsumed = unwrap(ctx, in, packetLength);
1378 if (bytesConsumed != packetLength && !engine.isInboundDone()) {
1379
1380
1381 throw new NotSslRecordException();
1382 }
1383 } catch (Throwable cause) {
1384 handleUnwrapThrowable(ctx, cause);
1385 }
1386 }
1387
1388 private void decodeNonJdkCompatible(ChannelHandlerContext ctx, ByteBuf in) {
1389 try {
1390 unwrap(ctx, in, in.readableBytes());
1391 } catch (Throwable cause) {
1392 handleUnwrapThrowable(ctx, cause);
1393 }
1394 }
1395
1396 private void handleUnwrapThrowable(ChannelHandlerContext ctx, Throwable cause) {
1397 try {
1398
1399
1400
1401
1402 if (handshakePromise.tryFailure(cause)) {
1403 ctx.fireUserEventTriggered(new SslHandshakeCompletionEvent(cause));
1404 }
1405
1406
1407 if (pendingUnencryptedWrites != null) {
1408
1409
1410 wrapAndFlush(ctx);
1411 }
1412 } catch (SSLException ex) {
1413 logger.debug("SSLException during trying to call SSLEngine.wrap(...)" +
1414 " because of an previous SSLException, ignoring...", ex);
1415 } finally {
1416
1417 setHandshakeFailure(ctx, cause, true, false, true);
1418 }
1419 PlatformDependent.throwException(cause);
1420 }
1421
1422 @Override
1423 protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws SSLException {
1424 if (isStateSet(STATE_PROCESS_TASK)) {
1425 return;
1426 }
1427 if (jdkCompatibilityMode) {
1428 decodeJdkCompatible(ctx, in);
1429 } else {
1430 decodeNonJdkCompatible(ctx, in);
1431 }
1432 }
1433
1434 @Override
1435 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
1436 channelReadComplete0(ctx);
1437 }
1438
1439 private void channelReadComplete0(ChannelHandlerContext ctx) {
1440
1441 discardSomeReadBytes();
1442
1443 flushIfNeeded(ctx);
1444 readIfNeeded(ctx);
1445
1446 clearState(STATE_FIRE_CHANNEL_READ);
1447 ctx.fireChannelReadComplete();
1448 }
1449
1450 private void readIfNeeded(ChannelHandlerContext ctx) {
1451
1452 if (!ctx.channel().config().isAutoRead() &&
1453 (!isStateSet(STATE_FIRE_CHANNEL_READ) || !handshakePromise.isDone())) {
1454
1455
1456 ctx.read();
1457 }
1458 }
1459
1460 private void flushIfNeeded(ChannelHandlerContext ctx) {
1461 if (isStateSet(STATE_NEEDS_FLUSH)) {
1462 forceFlush(ctx);
1463 }
1464 }
1465
1466
1467
1468
1469 private int unwrapNonAppData(ChannelHandlerContext ctx) throws SSLException {
1470 return unwrap(ctx, Unpooled.EMPTY_BUFFER, 0);
1471 }
1472
1473
1474
1475
1476 private int unwrap(ChannelHandlerContext ctx, ByteBuf packet, int length) throws SSLException {
1477 final int originalLength = length;
1478 boolean wrapLater = false;
1479 boolean notifyClosure = false;
1480 boolean executedRead = false;
1481 ByteBuf decodeOut = allocate(ctx, length);
1482 try {
1483
1484
1485 do {
1486 final SSLEngineResult result = engineType.unwrap(this, packet, length, decodeOut);
1487 final Status status = result.getStatus();
1488 final HandshakeStatus handshakeStatus = result.getHandshakeStatus();
1489 final int produced = result.bytesProduced();
1490 final int consumed = result.bytesConsumed();
1491
1492
1493
1494
1495 packet.skipBytes(consumed);
1496 length -= consumed;
1497
1498
1499
1500
1501 if (handshakeStatus == HandshakeStatus.FINISHED || handshakeStatus == HandshakeStatus.NOT_HANDSHAKING) {
1502 wrapLater |= (decodeOut.isReadable() ?
1503 setHandshakeSuccessUnwrapMarkReentry() : setHandshakeSuccess()) ||
1504 handshakeStatus == HandshakeStatus.FINISHED ||
1505
1506
1507 (pendingUnencryptedWrites != null && !pendingUnencryptedWrites.isEmpty());
1508 }
1509
1510
1511
1512
1513 if (decodeOut.isReadable()) {
1514 setState(STATE_FIRE_CHANNEL_READ);
1515 if (isStateSet(STATE_UNWRAP_REENTRY)) {
1516 executedRead = true;
1517 executeChannelRead(ctx, decodeOut);
1518 } else {
1519 ctx.fireChannelRead(decodeOut);
1520 }
1521 decodeOut = null;
1522 }
1523
1524 if (status == Status.CLOSED) {
1525 notifyClosure = true;
1526 } else if (status == Status.BUFFER_OVERFLOW) {
1527 if (decodeOut != null) {
1528 decodeOut.release();
1529 }
1530 final int applicationBufferSize = engine.getSession().getApplicationBufferSize();
1531
1532
1533
1534
1535 decodeOut = allocate(ctx, engineType.calculatePendingData(this, applicationBufferSize < produced ?
1536 applicationBufferSize : applicationBufferSize - produced));
1537 continue;
1538 }
1539
1540 if (handshakeStatus == HandshakeStatus.NEED_TASK) {
1541 boolean pending = runDelegatedTasks(true);
1542 if (!pending) {
1543
1544
1545
1546
1547
1548 wrapLater = false;
1549 break;
1550 }
1551 } else if (handshakeStatus == HandshakeStatus.NEED_WRAP) {
1552
1553
1554
1555 if (wrapNonAppData(ctx, true) && length == 0) {
1556 break;
1557 }
1558 }
1559
1560 if (status == Status.BUFFER_UNDERFLOW ||
1561
1562 handshakeStatus != HandshakeStatus.NEED_TASK && (consumed == 0 && produced == 0 ||
1563 (length == 0 && handshakeStatus == HandshakeStatus.NOT_HANDSHAKING))) {
1564 if (handshakeStatus == HandshakeStatus.NEED_UNWRAP) {
1565
1566
1567 readIfNeeded(ctx);
1568 }
1569
1570 break;
1571 } else if (decodeOut == null) {
1572 decodeOut = allocate(ctx, length);
1573 }
1574 } while (!ctx.isRemoved());
1575
1576 if (isStateSet(STATE_FLUSHED_BEFORE_HANDSHAKE) && handshakePromise.isDone()) {
1577
1578
1579
1580
1581 clearState(STATE_FLUSHED_BEFORE_HANDSHAKE);
1582 wrapLater = true;
1583 }
1584
1585 if (wrapLater) {
1586 wrap(ctx, true);
1587 }
1588 } finally {
1589 if (decodeOut != null) {
1590 decodeOut.release();
1591 }
1592
1593 if (notifyClosure) {
1594 if (executedRead) {
1595 executeNotifyClosePromise(ctx);
1596 } else {
1597 notifyClosePromise(null);
1598 }
1599 }
1600 }
1601 return originalLength - length;
1602 }
1603
1604 private boolean setHandshakeSuccessUnwrapMarkReentry() throws SSLException {
1605
1606
1607 final boolean setReentryState = !isStateSet(STATE_UNWRAP_REENTRY);
1608 if (setReentryState) {
1609 setState(STATE_UNWRAP_REENTRY);
1610 }
1611 try {
1612 return setHandshakeSuccess();
1613 } finally {
1614
1615
1616 if (setReentryState) {
1617 clearState(STATE_UNWRAP_REENTRY);
1618 }
1619 }
1620 }
1621
1622 private void executeNotifyClosePromise(final ChannelHandlerContext ctx) {
1623 try {
1624 ctx.executor().execute(new Runnable() {
1625 @Override
1626 public void run() {
1627 notifyClosePromise(null);
1628 }
1629 });
1630 } catch (RejectedExecutionException e) {
1631 notifyClosePromise(e);
1632 }
1633 }
1634
1635 private void executeChannelRead(final ChannelHandlerContext ctx, final ByteBuf decodedOut) {
1636 try {
1637 ctx.executor().execute(new Runnable() {
1638 @Override
1639 public void run() {
1640 ctx.fireChannelRead(decodedOut);
1641 }
1642 });
1643 } catch (RejectedExecutionException e) {
1644 decodedOut.release();
1645 throw e;
1646 }
1647 }
1648
1649 private static ByteBuffer toByteBuffer(ByteBuf out, int index, int len) {
1650 return out.nioBufferCount() == 1 ? out.internalNioBuffer(index, len) :
1651 out.nioBuffer(index, len);
1652 }
1653
1654 private static boolean inEventLoop(Executor executor) {
1655 return executor instanceof EventExecutor && ((EventExecutor) executor).inEventLoop();
1656 }
1657
1658
1659
1660
1661
1662
1663
1664
1665 private boolean runDelegatedTasks(boolean inUnwrap) {
1666 if (delegatedTaskExecutor == ImmediateExecutor.INSTANCE || inEventLoop(delegatedTaskExecutor)) {
1667
1668
1669 for (;;) {
1670 Runnable task = engine.getDelegatedTask();
1671 if (task == null) {
1672 return true;
1673 }
1674 setState(STATE_PROCESS_TASK);
1675 if (task instanceof AsyncRunnable) {
1676
1677 boolean pending = false;
1678 try {
1679 AsyncRunnable asyncTask = (AsyncRunnable) task;
1680 AsyncTaskCompletionHandler completionHandler = new AsyncTaskCompletionHandler(inUnwrap);
1681 asyncTask.run(completionHandler);
1682 pending = completionHandler.resumeLater();
1683 if (pending) {
1684 return false;
1685 }
1686 } finally {
1687 if (!pending) {
1688
1689
1690 clearState(STATE_PROCESS_TASK);
1691 }
1692 }
1693 } else {
1694 try {
1695 task.run();
1696 } finally {
1697 clearState(STATE_PROCESS_TASK);
1698 }
1699 }
1700 }
1701 } else {
1702 executeDelegatedTask(inUnwrap);
1703 return false;
1704 }
1705 }
1706
1707 private SslTasksRunner getTaskRunner(boolean inUnwrap) {
1708 return inUnwrap ? sslTaskRunnerForUnwrap : sslTaskRunner;
1709 }
1710
1711 private void executeDelegatedTask(boolean inUnwrap) {
1712 executeDelegatedTask(getTaskRunner(inUnwrap));
1713 }
1714
1715 private void executeDelegatedTask(SslTasksRunner task) {
1716 setState(STATE_PROCESS_TASK);
1717 try {
1718 delegatedTaskExecutor.execute(task);
1719 } catch (RejectedExecutionException e) {
1720 clearState(STATE_PROCESS_TASK);
1721 throw e;
1722 }
1723 }
1724
1725 private final class AsyncTaskCompletionHandler implements Runnable {
1726 private final boolean inUnwrap;
1727 boolean didRun;
1728 boolean resumeLater;
1729
1730 AsyncTaskCompletionHandler(boolean inUnwrap) {
1731 this.inUnwrap = inUnwrap;
1732 }
1733
1734 @Override
1735 public void run() {
1736 didRun = true;
1737 if (resumeLater) {
1738 getTaskRunner(inUnwrap).runComplete();
1739 }
1740 }
1741
1742 boolean resumeLater() {
1743 if (!didRun) {
1744 resumeLater = true;
1745 return true;
1746 }
1747 return false;
1748 }
1749 }
1750
1751
1752
1753
1754
1755 private final class SslTasksRunner implements Runnable {
1756 private final boolean inUnwrap;
1757 private final Runnable runCompleteTask = new Runnable() {
1758 @Override
1759 public void run() {
1760 runComplete();
1761 }
1762 };
1763
1764 SslTasksRunner(boolean inUnwrap) {
1765 this.inUnwrap = inUnwrap;
1766 }
1767
1768
1769 private void taskError(Throwable e) {
1770 if (inUnwrap) {
1771
1772
1773
1774
1775 try {
1776 handleUnwrapThrowable(ctx, e);
1777 } catch (Throwable cause) {
1778 safeExceptionCaught(cause);
1779 }
1780 } else {
1781 setHandshakeFailure(ctx, e);
1782 forceFlush(ctx);
1783 }
1784 }
1785
1786
1787 private void safeExceptionCaught(Throwable cause) {
1788 try {
1789 exceptionCaught(ctx, wrapIfNeeded(cause));
1790 } catch (Throwable error) {
1791 ctx.fireExceptionCaught(error);
1792 }
1793 }
1794
1795 private Throwable wrapIfNeeded(Throwable cause) {
1796 if (!inUnwrap) {
1797
1798 return cause;
1799 }
1800
1801
1802 return cause instanceof DecoderException ? cause : new DecoderException(cause);
1803 }
1804
1805 private void tryDecodeAgain() {
1806 try {
1807 channelRead(ctx, Unpooled.EMPTY_BUFFER);
1808 } catch (Throwable cause) {
1809 safeExceptionCaught(cause);
1810 } finally {
1811
1812
1813
1814 channelReadComplete0(ctx);
1815 }
1816 }
1817
1818
1819
1820
1821
1822 private void resumeOnEventExecutor() {
1823 assert ctx.executor().inEventLoop();
1824 clearState(STATE_PROCESS_TASK);
1825 try {
1826 HandshakeStatus status = engine.getHandshakeStatus();
1827 switch (status) {
1828
1829
1830 case NEED_TASK:
1831 executeDelegatedTask(this);
1832
1833 break;
1834
1835
1836 case FINISHED:
1837
1838 case NOT_HANDSHAKING:
1839 setHandshakeSuccess();
1840 try {
1841
1842
1843 wrap(ctx, inUnwrap);
1844 } catch (Throwable e) {
1845 taskError(e);
1846 return;
1847 }
1848 if (inUnwrap) {
1849
1850
1851 unwrapNonAppData(ctx);
1852 }
1853
1854
1855 forceFlush(ctx);
1856
1857 tryDecodeAgain();
1858 break;
1859
1860
1861
1862 case NEED_UNWRAP:
1863 try {
1864 unwrapNonAppData(ctx);
1865 } catch (SSLException e) {
1866 handleUnwrapThrowable(ctx, e);
1867 return;
1868 }
1869 tryDecodeAgain();
1870 break;
1871
1872
1873
1874 case NEED_WRAP:
1875 try {
1876 if (!wrapNonAppData(ctx, false) && inUnwrap) {
1877
1878
1879
1880
1881 unwrapNonAppData(ctx);
1882 }
1883
1884
1885 forceFlush(ctx);
1886 } catch (Throwable e) {
1887 taskError(e);
1888 return;
1889 }
1890
1891
1892 tryDecodeAgain();
1893 break;
1894
1895 default:
1896
1897 throw new AssertionError();
1898 }
1899 } catch (Throwable cause) {
1900 safeExceptionCaught(cause);
1901 }
1902 }
1903
1904 void runComplete() {
1905 EventExecutor executor = ctx.executor();
1906
1907
1908
1909
1910
1911
1912
1913 executor.execute(new Runnable() {
1914 @Override
1915 public void run() {
1916 resumeOnEventExecutor();
1917 }
1918 });
1919 }
1920
1921 @Override
1922 public void run() {
1923 try {
1924 Runnable task = engine.getDelegatedTask();
1925 if (task == null) {
1926
1927 return;
1928 }
1929 if (task instanceof AsyncRunnable) {
1930 AsyncRunnable asyncTask = (AsyncRunnable) task;
1931 asyncTask.run(runCompleteTask);
1932 } else {
1933 task.run();
1934 runComplete();
1935 }
1936 } catch (final Throwable cause) {
1937 handleException(cause);
1938 }
1939 }
1940
1941 private void handleException(final Throwable cause) {
1942 EventExecutor executor = ctx.executor();
1943 if (executor.inEventLoop()) {
1944 clearState(STATE_PROCESS_TASK);
1945 safeExceptionCaught(cause);
1946 } else {
1947 try {
1948 executor.execute(new Runnable() {
1949 @Override
1950 public void run() {
1951 clearState(STATE_PROCESS_TASK);
1952 safeExceptionCaught(cause);
1953 }
1954 });
1955 } catch (RejectedExecutionException ignore) {
1956 clearState(STATE_PROCESS_TASK);
1957
1958
1959 ctx.fireExceptionCaught(cause);
1960 }
1961 }
1962 }
1963 }
1964
1965
1966
1967
1968
1969
1970 private boolean setHandshakeSuccess() throws SSLException {
1971
1972
1973
1974 final SSLSession session = engine.getSession();
1975 if (resumptionController != null && !handshakePromise.isDone()) {
1976 try {
1977 if (resumptionController.validateResumeIfNeeded(engine) && logger.isDebugEnabled()) {
1978 logger.debug("{} Resumed and reauthenticated session", ctx.channel());
1979 }
1980 } catch (CertificateException e) {
1981 SSLHandshakeException exception = new SSLHandshakeException(e.getMessage());
1982 exception.initCause(e);
1983 throw exception;
1984 }
1985 }
1986 final boolean notified;
1987 if (notified = !handshakePromise.isDone() && handshakePromise.trySuccess(ctx.channel())) {
1988 if (logger.isDebugEnabled()) {
1989 logger.debug(
1990 "{} HANDSHAKEN: protocol:{} cipher suite:{}",
1991 ctx.channel(),
1992 session.getProtocol(),
1993 session.getCipherSuite());
1994 }
1995 ctx.fireUserEventTriggered(SslHandshakeCompletionEvent.SUCCESS);
1996 }
1997 if (isStateSet(STATE_READ_DURING_HANDSHAKE)) {
1998 clearState(STATE_READ_DURING_HANDSHAKE);
1999 if (!ctx.channel().config().isAutoRead()) {
2000 ctx.read();
2001 }
2002 }
2003 return notified;
2004 }
2005
2006
2007
2008
2009 private void setHandshakeFailure(ChannelHandlerContext ctx, Throwable cause) {
2010 setHandshakeFailure(ctx, cause, true, true, false);
2011 }
2012
2013
2014
2015
2016 private void setHandshakeFailure(ChannelHandlerContext ctx, Throwable cause, boolean closeInbound,
2017 boolean notify, boolean alwaysFlushAndClose) {
2018 try {
2019
2020 setState(STATE_OUTBOUND_CLOSED);
2021 engine.closeOutbound();
2022
2023 if (closeInbound) {
2024 try {
2025 engine.closeInbound();
2026 } catch (SSLException e) {
2027 if (logger.isDebugEnabled()) {
2028
2029
2030
2031
2032 String msg = e.getMessage();
2033 if (msg == null || !(msg.contains("possible truncation attack") ||
2034 msg.contains("closing inbound before receiving peer's close_notify"))) {
2035 logger.debug("{} SSLEngine.closeInbound() raised an exception.", ctx.channel(), e);
2036 }
2037 }
2038 }
2039 }
2040 if (handshakePromise.tryFailure(cause) || alwaysFlushAndClose) {
2041 SslUtils.handleHandshakeFailure(ctx, cause, notify);
2042 }
2043 } finally {
2044
2045 releaseAndFailAll(ctx, cause);
2046 }
2047 }
2048
2049 private void setHandshakeFailureTransportFailure(ChannelHandlerContext ctx, Throwable cause) {
2050
2051
2052
2053 try {
2054 SSLException transportFailure = new SSLException("failure when writing TLS control frames", cause);
2055 releaseAndFailAll(ctx, transportFailure);
2056 if (handshakePromise.tryFailure(transportFailure)) {
2057 ctx.fireUserEventTriggered(new SslHandshakeCompletionEvent(transportFailure));
2058 }
2059 } finally {
2060 ctx.close();
2061 }
2062 }
2063
2064 private void releaseAndFailAll(ChannelHandlerContext ctx, Throwable cause) {
2065 if (resumptionController != null &&
2066 (!engine.getSession().isValid() || cause instanceof SSLHandshakeException)) {
2067 resumptionController.remove(engine());
2068 }
2069 if (pendingUnencryptedWrites != null) {
2070 pendingUnencryptedWrites.releaseAndFailAll(ctx, cause);
2071 }
2072 }
2073
2074 private void notifyClosePromise(Throwable cause) {
2075 if (cause == null) {
2076 if (sslClosePromise.trySuccess(ctx.channel())) {
2077 ctx.fireUserEventTriggered(SslCloseCompletionEvent.SUCCESS);
2078 }
2079 } else {
2080 if (sslClosePromise.tryFailure(cause)) {
2081 ctx.fireUserEventTriggered(new SslCloseCompletionEvent(cause));
2082 }
2083 }
2084 }
2085
2086 private void closeOutboundAndChannel(
2087 final ChannelHandlerContext ctx, final ChannelPromise promise, boolean disconnect) throws Exception {
2088 setState(STATE_OUTBOUND_CLOSED);
2089 engine.closeOutbound();
2090
2091 if (!ctx.channel().isActive()) {
2092 if (disconnect) {
2093 ctx.disconnect(promise);
2094 } else {
2095 ctx.close(promise);
2096 }
2097 return;
2098 }
2099
2100 ChannelPromise closeNotifyPromise = ctx.newPromise();
2101 try {
2102 flush(ctx, closeNotifyPromise);
2103 } finally {
2104 if (!isStateSet(STATE_CLOSE_NOTIFY)) {
2105 setState(STATE_CLOSE_NOTIFY);
2106
2107
2108
2109
2110
2111
2112
2113
2114 safeClose(ctx, closeNotifyPromise, PromiseNotifier.cascade(false, ctx.newPromise(), promise));
2115 } else {
2116
2117 sslClosePromise.addListener(new FutureListener<Channel>() {
2118 @Override
2119 public void operationComplete(Future<Channel> future) {
2120 promise.setSuccess();
2121 }
2122 });
2123 }
2124 }
2125 }
2126
2127 private void flush(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
2128 if (pendingUnencryptedWrites != null) {
2129 pendingUnencryptedWrites.add(Unpooled.EMPTY_BUFFER, promise);
2130 } else {
2131 promise.setFailure(newPendingWritesNullException());
2132 }
2133 flush(ctx);
2134 }
2135
2136 @Override
2137 public void handlerAdded(final ChannelHandlerContext ctx) throws Exception {
2138 this.ctx = ctx;
2139 Channel channel = ctx.channel();
2140 pendingUnencryptedWrites = new SslHandlerCoalescingBufferQueue(channel, 16, engineType.wantsDirectBuffer) {
2141 @Override
2142 protected int wrapDataSize() {
2143 return SslHandler.this.wrapDataSize;
2144 }
2145 };
2146
2147 setOpensslEngineSocketFd(channel);
2148 boolean fastOpen = Boolean.TRUE.equals(channel.config().getOption(ChannelOption.TCP_FASTOPEN_CONNECT));
2149 boolean active = channel.isActive();
2150 if (active || fastOpen) {
2151
2152
2153
2154 startHandshakeProcessing(active);
2155
2156
2157 final ChannelOutboundBuffer outboundBuffer;
2158 if (fastOpen && ((outboundBuffer = channel.unsafe().outboundBuffer()) == null ||
2159 outboundBuffer.totalPendingWriteBytes() > 0)) {
2160 setState(STATE_NEEDS_FLUSH);
2161 }
2162 }
2163 }
2164
2165 private void startHandshakeProcessing(boolean flushAtEnd) {
2166 if (!isStateSet(STATE_HANDSHAKE_STARTED)) {
2167 setState(STATE_HANDSHAKE_STARTED);
2168 if (engine.getUseClientMode()) {
2169
2170
2171
2172 handshake(flushAtEnd);
2173 }
2174 applyHandshakeTimeout();
2175 } else if (isStateSet(STATE_NEEDS_FLUSH)) {
2176 forceFlush(ctx);
2177 }
2178 }
2179
2180
2181
2182
2183 public Future<Channel> renegotiate() {
2184 ChannelHandlerContext ctx = this.ctx;
2185 if (ctx == null) {
2186 throw new IllegalStateException();
2187 }
2188
2189 return renegotiate(ctx.executor().<Channel>newPromise());
2190 }
2191
2192
2193
2194
2195 public Future<Channel> renegotiate(final Promise<Channel> promise) {
2196 ObjectUtil.checkNotNull(promise, "promise");
2197
2198 ChannelHandlerContext ctx = this.ctx;
2199 if (ctx == null) {
2200 throw new IllegalStateException();
2201 }
2202
2203 EventExecutor executor = ctx.executor();
2204 if (!executor.inEventLoop()) {
2205 executor.execute(new Runnable() {
2206 @Override
2207 public void run() {
2208 renegotiateOnEventLoop(promise);
2209 }
2210 });
2211 return promise;
2212 }
2213
2214 renegotiateOnEventLoop(promise);
2215 return promise;
2216 }
2217
2218 private void renegotiateOnEventLoop(final Promise<Channel> newHandshakePromise) {
2219 final Promise<Channel> oldHandshakePromise = handshakePromise;
2220 if (!oldHandshakePromise.isDone()) {
2221
2222
2223 PromiseNotifier.cascade(oldHandshakePromise, newHandshakePromise);
2224 } else {
2225 handshakePromise = newHandshakePromise;
2226 handshake(true);
2227 applyHandshakeTimeout();
2228 }
2229 }
2230
2231
2232
2233
2234
2235
2236
2237 private void handshake(boolean flushAtEnd) {
2238 if (engine.getHandshakeStatus() != HandshakeStatus.NOT_HANDSHAKING) {
2239
2240
2241 return;
2242 }
2243 if (handshakePromise.isDone()) {
2244
2245
2246
2247
2248
2249 return;
2250 }
2251
2252
2253 final ChannelHandlerContext ctx = this.ctx;
2254 try {
2255 engine.beginHandshake();
2256 wrapNonAppData(ctx, false);
2257 } catch (Throwable e) {
2258 setHandshakeFailure(ctx, e);
2259 } finally {
2260 if (flushAtEnd) {
2261 forceFlush(ctx);
2262 }
2263 }
2264 }
2265
2266 private void applyHandshakeTimeout() {
2267 final Promise<Channel> localHandshakePromise = this.handshakePromise;
2268
2269
2270 final long handshakeTimeoutMillis = this.handshakeTimeoutMillis;
2271 if (handshakeTimeoutMillis <= 0 || localHandshakePromise.isDone()) {
2272 return;
2273 }
2274
2275 final Future<?> timeoutFuture = ctx.executor().schedule(new Runnable() {
2276 @Override
2277 public void run() {
2278 if (localHandshakePromise.isDone()) {
2279 return;
2280 }
2281 SSLException exception =
2282 new SslHandshakeTimeoutException("handshake timed out after " + handshakeTimeoutMillis + "ms");
2283 try {
2284 if (localHandshakePromise.tryFailure(exception)) {
2285 SslUtils.handleHandshakeFailure(ctx, exception, true);
2286 }
2287 } finally {
2288 releaseAndFailAll(ctx, exception);
2289 }
2290 }
2291 }, handshakeTimeoutMillis, TimeUnit.MILLISECONDS);
2292
2293
2294 localHandshakePromise.addListener(new FutureListener<Channel>() {
2295 @Override
2296 public void operationComplete(Future<Channel> f) throws Exception {
2297 timeoutFuture.cancel(false);
2298 }
2299 });
2300 }
2301
2302 private void forceFlush(ChannelHandlerContext ctx) {
2303 clearState(STATE_NEEDS_FLUSH);
2304 ctx.flush();
2305 }
2306
2307 private void setOpensslEngineSocketFd(Channel c) {
2308 if (c instanceof UnixChannel && engine instanceof ReferenceCountedOpenSslEngine) {
2309 ((ReferenceCountedOpenSslEngine) engine).bioSetFd(((UnixChannel) c).fd().intValue());
2310 }
2311 }
2312
2313
2314
2315
2316 @Override
2317 public void channelActive(final ChannelHandlerContext ctx) throws Exception {
2318 setOpensslEngineSocketFd(ctx.channel());
2319 if (!startTls) {
2320 startHandshakeProcessing(true);
2321 }
2322 ctx.fireChannelActive();
2323 }
2324
2325 private void safeClose(
2326 final ChannelHandlerContext ctx, final ChannelFuture flushFuture,
2327 final ChannelPromise promise) {
2328 if (!ctx.channel().isActive()) {
2329 ctx.close(promise);
2330 return;
2331 }
2332
2333 final Future<?> timeoutFuture;
2334 if (!flushFuture.isDone()) {
2335 long closeNotifyTimeout = closeNotifyFlushTimeoutMillis;
2336 if (closeNotifyTimeout > 0) {
2337
2338 timeoutFuture = ctx.executor().schedule(new Runnable() {
2339 @Override
2340 public void run() {
2341
2342 if (!flushFuture.isDone()) {
2343 logger.warn("{} Last write attempt timed out; force-closing the connection.",
2344 ctx.channel());
2345 addCloseListener(ctx.close(ctx.newPromise()), promise);
2346 }
2347 }
2348 }, closeNotifyTimeout, TimeUnit.MILLISECONDS);
2349 } else {
2350 timeoutFuture = null;
2351 }
2352 } else {
2353 timeoutFuture = null;
2354 }
2355
2356
2357 flushFuture.addListener(new ChannelFutureListener() {
2358 @Override
2359 public void operationComplete(ChannelFuture f) {
2360 if (timeoutFuture != null) {
2361 timeoutFuture.cancel(false);
2362 }
2363 final long closeNotifyReadTimeout = closeNotifyReadTimeoutMillis;
2364 if (closeNotifyReadTimeout <= 0) {
2365
2366
2367 addCloseListener(ctx.close(ctx.newPromise()), promise);
2368 } else {
2369 final Future<?> closeNotifyReadTimeoutFuture;
2370
2371 if (!sslClosePromise.isDone()) {
2372 closeNotifyReadTimeoutFuture = ctx.executor().schedule(new Runnable() {
2373 @Override
2374 public void run() {
2375 if (!sslClosePromise.isDone()) {
2376 logger.debug(
2377 "{} did not receive close_notify in {}ms; force-closing the connection.",
2378 ctx.channel(), closeNotifyReadTimeout);
2379
2380
2381 addCloseListener(ctx.close(ctx.newPromise()), promise);
2382 }
2383 }
2384 }, closeNotifyReadTimeout, TimeUnit.MILLISECONDS);
2385 } else {
2386 closeNotifyReadTimeoutFuture = null;
2387 }
2388
2389
2390 sslClosePromise.addListener(new FutureListener<Channel>() {
2391 @Override
2392 public void operationComplete(Future<Channel> future) throws Exception {
2393 if (closeNotifyReadTimeoutFuture != null) {
2394 closeNotifyReadTimeoutFuture.cancel(false);
2395 }
2396 addCloseListener(ctx.close(ctx.newPromise()), promise);
2397 }
2398 });
2399 }
2400 }
2401 });
2402 }
2403
2404 private static void addCloseListener(ChannelFuture future, ChannelPromise promise) {
2405
2406
2407
2408
2409
2410
2411 PromiseNotifier.cascade(false, future, promise);
2412 }
2413
2414
2415
2416
2417
2418 private ByteBuf allocate(ChannelHandlerContext ctx, int capacity) {
2419 ByteBufAllocator alloc = ctx.alloc();
2420 if (engineType.wantsDirectBuffer) {
2421 return alloc.directBuffer(capacity);
2422 } else {
2423 return alloc.buffer(capacity);
2424 }
2425 }
2426
2427
2428
2429
2430
2431 private ByteBuf allocateOutNetBuf(ChannelHandlerContext ctx, int pendingBytes, int numComponents) {
2432 return engineType.allocateWrapBuffer(this, ctx.alloc(), pendingBytes, numComponents);
2433 }
2434
2435 private boolean isStateSet(int bit) {
2436 return (state & bit) == bit;
2437 }
2438
2439 private void setState(int bit) {
2440 state |= bit;
2441 }
2442
2443 private void clearState(int bit) {
2444 state &= ~bit;
2445 }
2446
2447 private final class LazyChannelPromise extends DefaultPromise<Channel> {
2448
2449 @Override
2450 protected EventExecutor executor() {
2451 if (ctx == null) {
2452 throw new IllegalStateException();
2453 }
2454 return ctx.executor();
2455 }
2456
2457 @Override
2458 protected void checkDeadLock() {
2459 if (ctx == null) {
2460
2461
2462
2463
2464
2465
2466 return;
2467 }
2468 super.checkDeadLock();
2469 }
2470 }
2471 }