1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.ssl;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.buffer.ByteBufUtil;
21 import io.netty.buffer.CompositeByteBuf;
22 import io.netty.buffer.Unpooled;
23 import io.netty.channel.Channel;
24 import io.netty.channel.ChannelConfig;
25 import io.netty.channel.ChannelException;
26 import io.netty.channel.ChannelFuture;
27 import io.netty.channel.ChannelFutureListener;
28 import io.netty.channel.ChannelHandlerContext;
29 import io.netty.channel.ChannelInboundHandler;
30 import io.netty.channel.ChannelOption;
31 import io.netty.channel.ChannelOutboundBuffer;
32 import io.netty.channel.ChannelOutboundHandler;
33 import io.netty.channel.ChannelPipeline;
34 import io.netty.channel.ChannelPromise;
35 import io.netty.channel.unix.UnixChannel;
36 import io.netty.handler.codec.ByteToMessageDecoder;
37 import io.netty.handler.codec.DecoderException;
38 import io.netty.handler.codec.UnsupportedMessageTypeException;
39 import io.netty.util.ReferenceCountUtil;
40 import io.netty.util.concurrent.DefaultPromise;
41 import io.netty.util.concurrent.EventExecutor;
42 import io.netty.util.concurrent.Future;
43 import io.netty.util.concurrent.FutureListener;
44 import io.netty.util.concurrent.ImmediateExecutor;
45 import io.netty.util.concurrent.Promise;
46 import io.netty.util.concurrent.PromiseNotifier;
47 import io.netty.util.internal.ObjectUtil;
48 import io.netty.util.internal.PlatformDependent;
49 import io.netty.util.internal.ThrowableUtil;
50 import io.netty.util.internal.UnstableApi;
51 import io.netty.util.internal.logging.InternalLogger;
52 import io.netty.util.internal.logging.InternalLoggerFactory;
53
54 import java.io.IOException;
55 import java.net.SocketAddress;
56 import java.nio.ByteBuffer;
57 import java.nio.channels.ClosedChannelException;
58 import java.nio.channels.DatagramChannel;
59 import java.nio.channels.SocketChannel;
60 import java.util.List;
61 import java.util.concurrent.Executor;
62 import java.util.concurrent.RejectedExecutionException;
63 import java.util.concurrent.TimeUnit;
64 import java.util.regex.Pattern;
65 import javax.net.ssl.SSLEngine;
66 import javax.net.ssl.SSLEngineResult;
67 import javax.net.ssl.SSLEngineResult.HandshakeStatus;
68 import javax.net.ssl.SSLEngineResult.Status;
69 import javax.net.ssl.SSLException;
70 import javax.net.ssl.SSLHandshakeException;
71 import javax.net.ssl.SSLSession;
72
73 import static io.netty.handler.ssl.SslUtils.NOT_ENOUGH_DATA;
74 import static io.netty.handler.ssl.SslUtils.getEncryptedPacketLength;
75 import static io.netty.util.internal.ObjectUtil.checkNotNull;
76 import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168 public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundHandler {
169 private static final InternalLogger logger =
170 InternalLoggerFactory.getInstance(SslHandler.class);
171 private static final Pattern IGNORABLE_CLASS_IN_STACK = Pattern.compile(
172 "^.*(?:Socket|Datagram|Sctp|Udt)Channel.*$");
173 private static final Pattern IGNORABLE_ERROR_MESSAGE = Pattern.compile(
174 "^.*(?:connection.*(?:reset|closed|abort|broken)|broken.*pipe).*$", Pattern.CASE_INSENSITIVE);
175 private static final int STATE_SENT_FIRST_MESSAGE = 1;
176 private static final int STATE_FLUSHED_BEFORE_HANDSHAKE = 1 << 1;
177 private static final int STATE_READ_DURING_HANDSHAKE = 1 << 2;
178 private static final int STATE_HANDSHAKE_STARTED = 1 << 3;
179
180
181
182
183 private static final int STATE_NEEDS_FLUSH = 1 << 4;
184 private static final int STATE_OUTBOUND_CLOSED = 1 << 5;
185 private static final int STATE_CLOSE_NOTIFY = 1 << 6;
186 private static final int STATE_PROCESS_TASK = 1 << 7;
187
188
189
190
191 private static final int STATE_FIRE_CHANNEL_READ = 1 << 8;
192 private static final int STATE_UNWRAP_REENTRY = 1 << 9;
193
194
195
196
197
198 private static final int MAX_PLAINTEXT_LENGTH = 16 * 1024;
199
200 private enum SslEngineType {
201 TCNATIVE(true, COMPOSITE_CUMULATOR) {
202 @Override
203 SSLEngineResult unwrap(SslHandler handler, ByteBuf in, int len, ByteBuf out) throws SSLException {
204 int nioBufferCount = in.nioBufferCount();
205 int writerIndex = out.writerIndex();
206 final SSLEngineResult result;
207 if (nioBufferCount > 1) {
208
209
210
211
212
213 ReferenceCountedOpenSslEngine opensslEngine = (ReferenceCountedOpenSslEngine) handler.engine;
214 try {
215 handler.singleBuffer[0] = toByteBuffer(out, writerIndex, out.writableBytes());
216 result = opensslEngine.unwrap(in.nioBuffers(in.readerIndex(), len), handler.singleBuffer);
217 } finally {
218 handler.singleBuffer[0] = null;
219 }
220 } else {
221 result = handler.engine.unwrap(toByteBuffer(in, in.readerIndex(), len),
222 toByteBuffer(out, writerIndex, out.writableBytes()));
223 }
224 out.writerIndex(writerIndex + result.bytesProduced());
225 return result;
226 }
227
228 @Override
229 ByteBuf allocateWrapBuffer(SslHandler handler, ByteBufAllocator allocator,
230 int pendingBytes, int numComponents) {
231 return allocator.directBuffer(((ReferenceCountedOpenSslEngine) handler.engine)
232 .calculateOutNetBufSize(pendingBytes, numComponents));
233 }
234
235 @Override
236 int calculateRequiredOutBufSpace(SslHandler handler, int pendingBytes, int numComponents) {
237 return ((ReferenceCountedOpenSslEngine) handler.engine)
238 .calculateMaxLengthForWrap(pendingBytes, numComponents);
239 }
240
241 @Override
242 int calculatePendingData(SslHandler handler, int guess) {
243 int sslPending = ((ReferenceCountedOpenSslEngine) handler.engine).sslPending();
244 return sslPending > 0 ? sslPending : guess;
245 }
246
247 @Override
248 boolean jdkCompatibilityMode(SSLEngine engine) {
249 return ((ReferenceCountedOpenSslEngine) engine).jdkCompatibilityMode;
250 }
251 },
252 CONSCRYPT(true, COMPOSITE_CUMULATOR) {
253 @Override
254 SSLEngineResult unwrap(SslHandler handler, ByteBuf in, int len, ByteBuf out) throws SSLException {
255 int nioBufferCount = in.nioBufferCount();
256 int writerIndex = out.writerIndex();
257 final SSLEngineResult result;
258 if (nioBufferCount > 1) {
259
260
261
262 try {
263 handler.singleBuffer[0] = toByteBuffer(out, writerIndex, out.writableBytes());
264 result = ((ConscryptAlpnSslEngine) handler.engine).unwrap(
265 in.nioBuffers(in.readerIndex(), len),
266 handler.singleBuffer);
267 } finally {
268 handler.singleBuffer[0] = null;
269 }
270 } else {
271 result = handler.engine.unwrap(toByteBuffer(in, in.readerIndex(), len),
272 toByteBuffer(out, writerIndex, out.writableBytes()));
273 }
274 out.writerIndex(writerIndex + result.bytesProduced());
275 return result;
276 }
277
278 @Override
279 ByteBuf allocateWrapBuffer(SslHandler handler, ByteBufAllocator allocator,
280 int pendingBytes, int numComponents) {
281 return allocator.directBuffer(
282 ((ConscryptAlpnSslEngine) handler.engine).calculateOutNetBufSize(pendingBytes, numComponents));
283 }
284
285 @Override
286 int calculateRequiredOutBufSpace(SslHandler handler, int pendingBytes, int numComponents) {
287 return ((ConscryptAlpnSslEngine) handler.engine)
288 .calculateRequiredOutBufSpace(pendingBytes, numComponents);
289 }
290
291 @Override
292 int calculatePendingData(SslHandler handler, int guess) {
293 return guess;
294 }
295
296 @Override
297 boolean jdkCompatibilityMode(SSLEngine engine) {
298 return true;
299 }
300 },
301 JDK(false, MERGE_CUMULATOR) {
302 @Override
303 SSLEngineResult unwrap(SslHandler handler, ByteBuf in, int len, ByteBuf out) throws SSLException {
304 int writerIndex = out.writerIndex();
305 ByteBuffer inNioBuffer = toByteBuffer(in, in.readerIndex(), len);
306 int position = inNioBuffer.position();
307 final SSLEngineResult result = handler.engine.unwrap(inNioBuffer,
308 toByteBuffer(out, writerIndex, out.writableBytes()));
309 out.writerIndex(writerIndex + result.bytesProduced());
310
311
312
313
314
315
316
317 if (result.bytesConsumed() == 0) {
318 int consumed = inNioBuffer.position() - position;
319 if (consumed != result.bytesConsumed()) {
320
321 return new SSLEngineResult(
322 result.getStatus(), result.getHandshakeStatus(), consumed, result.bytesProduced());
323 }
324 }
325 return result;
326 }
327
328 @Override
329 ByteBuf allocateWrapBuffer(SslHandler handler, ByteBufAllocator allocator,
330 int pendingBytes, int numComponents) {
331
332
333 return allocator.heapBuffer(Math.max(pendingBytes, handler.engine.getSession().getPacketBufferSize()));
334 }
335
336 @Override
337 int calculateRequiredOutBufSpace(SslHandler handler, int pendingBytes, int numComponents) {
338
339
340
341
342
343
344 return handler.engine.getSession().getPacketBufferSize();
345 }
346
347 @Override
348 int calculatePendingData(SslHandler handler, int guess) {
349 return guess;
350 }
351
352 @Override
353 boolean jdkCompatibilityMode(SSLEngine engine) {
354 return true;
355 }
356 };
357
358 static SslEngineType forEngine(SSLEngine engine) {
359 return engine instanceof ReferenceCountedOpenSslEngine ? TCNATIVE :
360 engine instanceof ConscryptAlpnSslEngine ? CONSCRYPT : JDK;
361 }
362
363 SslEngineType(boolean wantsDirectBuffer, Cumulator cumulator) {
364 this.wantsDirectBuffer = wantsDirectBuffer;
365 this.cumulator = cumulator;
366 }
367
368 abstract SSLEngineResult unwrap(SslHandler handler, ByteBuf in, int len, ByteBuf out) throws SSLException;
369
370 abstract int calculatePendingData(SslHandler handler, int guess);
371
372 abstract boolean jdkCompatibilityMode(SSLEngine engine);
373
374 abstract ByteBuf allocateWrapBuffer(SslHandler handler, ByteBufAllocator allocator,
375 int pendingBytes, int numComponents);
376
377 abstract int calculateRequiredOutBufSpace(SslHandler handler, int pendingBytes, int numComponents);
378
379
380
381
382
383
384
385 final boolean wantsDirectBuffer;
386
387
388
389
390
391
392
393
394
395
396
397 final Cumulator cumulator;
398 }
399
400 private volatile ChannelHandlerContext ctx;
401 private final SSLEngine engine;
402 private final SslEngineType engineType;
403 private final Executor delegatedTaskExecutor;
404 private final boolean jdkCompatibilityMode;
405
406
407
408
409
410
411 private final ByteBuffer[] singleBuffer = new ByteBuffer[1];
412
413 private final boolean startTls;
414
415 private final SslTasksRunner sslTaskRunnerForUnwrap = new SslTasksRunner(true);
416 private final SslTasksRunner sslTaskRunner = new SslTasksRunner(false);
417
418 private SslHandlerCoalescingBufferQueue pendingUnencryptedWrites;
419 private Promise<Channel> handshakePromise = new LazyChannelPromise();
420 private final LazyChannelPromise sslClosePromise = new LazyChannelPromise();
421
422 private int packetLength;
423 private short state;
424
425 private volatile long handshakeTimeoutMillis = 10000;
426 private volatile long closeNotifyFlushTimeoutMillis = 3000;
427 private volatile long closeNotifyReadTimeoutMillis;
428 volatile int wrapDataSize = MAX_PLAINTEXT_LENGTH;
429
430
431
432
433
434
435 public SslHandler(SSLEngine engine) {
436 this(engine, false);
437 }
438
439
440
441
442
443
444
445
446 public SslHandler(SSLEngine engine, boolean startTls) {
447 this(engine, startTls, ImmediateExecutor.INSTANCE);
448 }
449
450
451
452
453
454
455
456
457 public SslHandler(SSLEngine engine, Executor delegatedTaskExecutor) {
458 this(engine, false, delegatedTaskExecutor);
459 }
460
461
462
463
464
465
466
467
468
469
470 public SslHandler(SSLEngine engine, boolean startTls, Executor delegatedTaskExecutor) {
471 this.engine = ObjectUtil.checkNotNull(engine, "engine");
472 this.delegatedTaskExecutor = ObjectUtil.checkNotNull(delegatedTaskExecutor, "delegatedTaskExecutor");
473 engineType = SslEngineType.forEngine(engine);
474 this.startTls = startTls;
475 this.jdkCompatibilityMode = engineType.jdkCompatibilityMode(engine);
476 setCumulator(engineType.cumulator);
477 }
478
479 public long getHandshakeTimeoutMillis() {
480 return handshakeTimeoutMillis;
481 }
482
483 public void setHandshakeTimeout(long handshakeTimeout, TimeUnit unit) {
484 checkNotNull(unit, "unit");
485 setHandshakeTimeoutMillis(unit.toMillis(handshakeTimeout));
486 }
487
488 public void setHandshakeTimeoutMillis(long handshakeTimeoutMillis) {
489 this.handshakeTimeoutMillis = checkPositiveOrZero(handshakeTimeoutMillis, "handshakeTimeoutMillis");
490 }
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512 @UnstableApi
513 public final void setWrapDataSize(int wrapDataSize) {
514 this.wrapDataSize = wrapDataSize;
515 }
516
517
518
519
520 @Deprecated
521 public long getCloseNotifyTimeoutMillis() {
522 return getCloseNotifyFlushTimeoutMillis();
523 }
524
525
526
527
528 @Deprecated
529 public void setCloseNotifyTimeout(long closeNotifyTimeout, TimeUnit unit) {
530 setCloseNotifyFlushTimeout(closeNotifyTimeout, unit);
531 }
532
533
534
535
536 @Deprecated
537 public void setCloseNotifyTimeoutMillis(long closeNotifyFlushTimeoutMillis) {
538 setCloseNotifyFlushTimeoutMillis(closeNotifyFlushTimeoutMillis);
539 }
540
541
542
543
544
545
546 public final long getCloseNotifyFlushTimeoutMillis() {
547 return closeNotifyFlushTimeoutMillis;
548 }
549
550
551
552
553
554
555 public final void setCloseNotifyFlushTimeout(long closeNotifyFlushTimeout, TimeUnit unit) {
556 setCloseNotifyFlushTimeoutMillis(unit.toMillis(closeNotifyFlushTimeout));
557 }
558
559
560
561
562 public final void setCloseNotifyFlushTimeoutMillis(long closeNotifyFlushTimeoutMillis) {
563 this.closeNotifyFlushTimeoutMillis = checkPositiveOrZero(closeNotifyFlushTimeoutMillis,
564 "closeNotifyFlushTimeoutMillis");
565 }
566
567
568
569
570
571
572 public final long getCloseNotifyReadTimeoutMillis() {
573 return closeNotifyReadTimeoutMillis;
574 }
575
576
577
578
579
580
581 public final void setCloseNotifyReadTimeout(long closeNotifyReadTimeout, TimeUnit unit) {
582 setCloseNotifyReadTimeoutMillis(unit.toMillis(closeNotifyReadTimeout));
583 }
584
585
586
587
588 public final void setCloseNotifyReadTimeoutMillis(long closeNotifyReadTimeoutMillis) {
589 this.closeNotifyReadTimeoutMillis = checkPositiveOrZero(closeNotifyReadTimeoutMillis,
590 "closeNotifyReadTimeoutMillis");
591 }
592
593
594
595
596 public SSLEngine engine() {
597 return engine;
598 }
599
600
601
602
603
604
605 public String applicationProtocol() {
606 SSLEngine engine = engine();
607 if (!(engine instanceof ApplicationProtocolAccessor)) {
608 return null;
609 }
610
611 return ((ApplicationProtocolAccessor) engine).getNegotiatedApplicationProtocol();
612 }
613
614
615
616
617
618
619
620 public Future<Channel> handshakeFuture() {
621 return handshakePromise;
622 }
623
624
625
626
627 @Deprecated
628 public ChannelFuture close() {
629 return closeOutbound();
630 }
631
632
633
634
635 @Deprecated
636 public ChannelFuture close(ChannelPromise promise) {
637 return closeOutbound(promise);
638 }
639
640
641
642
643
644
645
646 public ChannelFuture closeOutbound() {
647 return closeOutbound(ctx.newPromise());
648 }
649
650
651
652
653
654
655
656 public ChannelFuture closeOutbound(final ChannelPromise promise) {
657 final ChannelHandlerContext ctx = this.ctx;
658 if (ctx.executor().inEventLoop()) {
659 closeOutbound0(promise);
660 } else {
661 ctx.executor().execute(new Runnable() {
662 @Override
663 public void run() {
664 closeOutbound0(promise);
665 }
666 });
667 }
668 return promise;
669 }
670
671 private void closeOutbound0(ChannelPromise promise) {
672 setState(STATE_OUTBOUND_CLOSED);
673 engine.closeOutbound();
674 try {
675 flush(ctx, promise);
676 } catch (Exception e) {
677 if (!promise.tryFailure(e)) {
678 logger.warn("{} flush() raised a masked exception.", ctx.channel(), e);
679 }
680 }
681 }
682
683
684
685
686
687
688
689
690 public Future<Channel> sslCloseFuture() {
691 return sslClosePromise;
692 }
693
694 @Override
695 public void handlerRemoved0(ChannelHandlerContext ctx) throws Exception {
696 try {
697 if (pendingUnencryptedWrites != null && !pendingUnencryptedWrites.isEmpty()) {
698
699 pendingUnencryptedWrites.releaseAndFailAll(ctx,
700 new ChannelException("Pending write on removal of SslHandler"));
701 }
702 pendingUnencryptedWrites = null;
703
704 SSLException cause = null;
705
706
707
708
709 if (!handshakePromise.isDone()) {
710 cause = new SSLHandshakeException("SslHandler removed before handshake completed");
711 if (handshakePromise.tryFailure(cause)) {
712 ctx.fireUserEventTriggered(new SslHandshakeCompletionEvent(cause));
713 }
714 }
715 if (!sslClosePromise.isDone()) {
716 if (cause == null) {
717 cause = new SSLException("SslHandler removed before SSLEngine was closed");
718 }
719 notifyClosePromise(cause);
720 }
721 } finally {
722 ReferenceCountUtil.release(engine);
723 }
724 }
725
726 @Override
727 public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
728 ctx.bind(localAddress, promise);
729 }
730
731 @Override
732 public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress,
733 ChannelPromise promise) throws Exception {
734 ctx.connect(remoteAddress, localAddress, promise);
735 }
736
737 @Override
738 public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
739 ctx.deregister(promise);
740 }
741
742 @Override
743 public void disconnect(final ChannelHandlerContext ctx,
744 final ChannelPromise promise) throws Exception {
745 closeOutboundAndChannel(ctx, promise, true);
746 }
747
748 @Override
749 public void close(final ChannelHandlerContext ctx,
750 final ChannelPromise promise) throws Exception {
751 closeOutboundAndChannel(ctx, promise, false);
752 }
753
754 @Override
755 public void read(ChannelHandlerContext ctx) throws Exception {
756 if (!handshakePromise.isDone()) {
757 setState(STATE_READ_DURING_HANDSHAKE);
758 }
759
760 ctx.read();
761 }
762
763 private static IllegalStateException newPendingWritesNullException() {
764 return new IllegalStateException("pendingUnencryptedWrites is null, handlerRemoved0 called?");
765 }
766
767 @Override
768 public void write(final ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
769 if (!(msg instanceof ByteBuf)) {
770 UnsupportedMessageTypeException exception = new UnsupportedMessageTypeException(msg, ByteBuf.class);
771 ReferenceCountUtil.safeRelease(msg);
772 promise.setFailure(exception);
773 } else if (pendingUnencryptedWrites == null) {
774 ReferenceCountUtil.safeRelease(msg);
775 promise.setFailure(newPendingWritesNullException());
776 } else {
777 pendingUnencryptedWrites.add((ByteBuf) msg, promise);
778 }
779 }
780
781 @Override
782 public void flush(ChannelHandlerContext ctx) throws Exception {
783
784
785 if (startTls && !isStateSet(STATE_SENT_FIRST_MESSAGE)) {
786 setState(STATE_SENT_FIRST_MESSAGE);
787 pendingUnencryptedWrites.writeAndRemoveAll(ctx);
788 forceFlush(ctx);
789
790
791 startHandshakeProcessing(true);
792 return;
793 }
794
795 if (isStateSet(STATE_PROCESS_TASK)) {
796 return;
797 }
798
799 try {
800 wrapAndFlush(ctx);
801 } catch (Throwable cause) {
802 setHandshakeFailure(ctx, cause);
803 PlatformDependent.throwException(cause);
804 }
805 }
806
807 private void wrapAndFlush(ChannelHandlerContext ctx) throws SSLException {
808 if (pendingUnencryptedWrites.isEmpty()) {
809
810
811
812
813 pendingUnencryptedWrites.add(Unpooled.EMPTY_BUFFER, ctx.newPromise());
814 }
815 if (!handshakePromise.isDone()) {
816 setState(STATE_FLUSHED_BEFORE_HANDSHAKE);
817 }
818 try {
819 wrap(ctx, false);
820 } finally {
821
822
823 forceFlush(ctx);
824 }
825 }
826
827
828 private void wrap(ChannelHandlerContext ctx, boolean inUnwrap) throws SSLException {
829 ByteBuf out = null;
830 ByteBufAllocator alloc = ctx.alloc();
831 try {
832 final int wrapDataSize = this.wrapDataSize;
833
834
835 outer: while (!ctx.isRemoved()) {
836 ChannelPromise promise = ctx.newPromise();
837 ByteBuf buf = wrapDataSize > 0 ?
838 pendingUnencryptedWrites.remove(alloc, wrapDataSize, promise) :
839 pendingUnencryptedWrites.removeFirst(promise);
840 if (buf == null) {
841 break;
842 }
843
844 SSLEngineResult result;
845
846 if (buf.readableBytes() > MAX_PLAINTEXT_LENGTH) {
847
848
849
850
851
852 int readableBytes = buf.readableBytes();
853 int numPackets = readableBytes / MAX_PLAINTEXT_LENGTH;
854 if (readableBytes % MAX_PLAINTEXT_LENGTH != 0) {
855 numPackets += 1;
856 }
857
858 if (out == null) {
859 out = allocateOutNetBuf(ctx, readableBytes, buf.nioBufferCount() + numPackets);
860 }
861 result = wrapMultiple(alloc, engine, buf, out);
862 } else {
863 if (out == null) {
864 out = allocateOutNetBuf(ctx, buf.readableBytes(), buf.nioBufferCount());
865 }
866 result = wrap(alloc, engine, buf, out);
867 }
868
869 if (buf.isReadable()) {
870 pendingUnencryptedWrites.addFirst(buf, promise);
871
872
873 promise = null;
874 } else {
875 buf.release();
876 }
877
878
879
880 if (out.isReadable()) {
881 final ByteBuf b = out;
882 out = null;
883 if (promise != null) {
884 ctx.write(b, promise);
885 } else {
886 ctx.write(b);
887 }
888 } else if (promise != null) {
889 ctx.write(Unpooled.EMPTY_BUFFER, promise);
890 }
891
892
893 if (result.getStatus() == Status.CLOSED) {
894
895
896 if (!pendingUnencryptedWrites.isEmpty()) {
897
898
899 Throwable exception = handshakePromise.cause();
900 if (exception == null) {
901 exception = sslClosePromise.cause();
902 if (exception == null) {
903 exception = new SslClosedEngineException("SSLEngine closed already");
904 }
905 }
906 pendingUnencryptedWrites.releaseAndFailAll(ctx, exception);
907 }
908
909 return;
910 } else {
911 switch (result.getHandshakeStatus()) {
912 case NEED_TASK:
913 if (!runDelegatedTasks(inUnwrap)) {
914
915
916 break outer;
917 }
918 break;
919 case FINISHED:
920 case NOT_HANDSHAKING:
921 setHandshakeSuccess();
922 break;
923 case NEED_WRAP:
924
925
926
927
928 if (result.bytesProduced() > 0 && pendingUnencryptedWrites.isEmpty()) {
929 pendingUnencryptedWrites.add(Unpooled.EMPTY_BUFFER);
930 }
931 break;
932 case NEED_UNWRAP:
933
934
935 readIfNeeded(ctx);
936 return;
937 default:
938 throw new IllegalStateException(
939 "Unknown handshake status: " + result.getHandshakeStatus());
940 }
941 }
942 }
943 } finally {
944 if (out != null) {
945 out.release();
946 }
947 if (inUnwrap) {
948 setState(STATE_NEEDS_FLUSH);
949 }
950 }
951 }
952
953
954
955
956
957
958
959 private boolean wrapNonAppData(final ChannelHandlerContext ctx, boolean inUnwrap) throws SSLException {
960 ByteBuf out = null;
961 ByteBufAllocator alloc = ctx.alloc();
962 try {
963
964
965 outer: while (!ctx.isRemoved()) {
966 if (out == null) {
967
968
969
970 out = allocateOutNetBuf(ctx, 2048, 1);
971 }
972 SSLEngineResult result = wrap(alloc, engine, Unpooled.EMPTY_BUFFER, out);
973 if (result.bytesProduced() > 0) {
974 ctx.write(out).addListener(new ChannelFutureListener() {
975 @Override
976 public void operationComplete(ChannelFuture future) {
977 Throwable cause = future.cause();
978 if (cause != null) {
979 setHandshakeFailureTransportFailure(ctx, cause);
980 }
981 }
982 });
983 if (inUnwrap) {
984 setState(STATE_NEEDS_FLUSH);
985 }
986 out = null;
987 }
988
989 HandshakeStatus status = result.getHandshakeStatus();
990 switch (status) {
991 case FINISHED:
992
993
994
995
996 if (setHandshakeSuccess() && inUnwrap && !pendingUnencryptedWrites.isEmpty()) {
997 wrap(ctx, true);
998 }
999 return false;
1000 case NEED_TASK:
1001 if (!runDelegatedTasks(inUnwrap)) {
1002
1003
1004 break outer;
1005 }
1006 break;
1007 case NEED_UNWRAP:
1008 if (inUnwrap || unwrapNonAppData(ctx) <= 0) {
1009
1010
1011
1012 return false;
1013 }
1014 break;
1015 case NEED_WRAP:
1016 break;
1017 case NOT_HANDSHAKING:
1018 if (setHandshakeSuccess() && inUnwrap && !pendingUnencryptedWrites.isEmpty()) {
1019 wrap(ctx, true);
1020 }
1021
1022
1023 if (!inUnwrap) {
1024 unwrapNonAppData(ctx);
1025 }
1026 return true;
1027 default:
1028 throw new IllegalStateException("Unknown handshake status: " + result.getHandshakeStatus());
1029 }
1030
1031
1032
1033 if (result.bytesProduced() == 0 && status != HandshakeStatus.NEED_TASK) {
1034 break;
1035 }
1036
1037
1038
1039 if (result.bytesConsumed() == 0 && result.getHandshakeStatus() == HandshakeStatus.NOT_HANDSHAKING) {
1040 break;
1041 }
1042 }
1043 } finally {
1044 if (out != null) {
1045 out.release();
1046 }
1047 }
1048 return false;
1049 }
1050
1051 private SSLEngineResult wrapMultiple(ByteBufAllocator alloc, SSLEngine engine, ByteBuf in, ByteBuf out)
1052 throws SSLException {
1053 SSLEngineResult result = null;
1054
1055 do {
1056 int nextSliceSize = Math.min(MAX_PLAINTEXT_LENGTH, in.readableBytes());
1057
1058
1059 int nextOutSize = engineType.calculateRequiredOutBufSpace(this, nextSliceSize, in.nioBufferCount());
1060
1061 if (!out.isWritable(nextOutSize)) {
1062 if (result != null) {
1063
1064
1065 break;
1066 }
1067
1068
1069 out.ensureWritable(nextOutSize);
1070 }
1071
1072 ByteBuf wrapBuf = in.readSlice(nextSliceSize);
1073 result = wrap(alloc, engine, wrapBuf, out);
1074
1075 if (result.getStatus() == Status.CLOSED) {
1076
1077
1078 break;
1079 }
1080
1081 if (wrapBuf.isReadable()) {
1082
1083
1084 in.readerIndex(in.readerIndex() - wrapBuf.readableBytes());
1085 }
1086 } while (in.readableBytes() > 0);
1087
1088 return result;
1089 }
1090
1091 private SSLEngineResult wrap(ByteBufAllocator alloc, SSLEngine engine, ByteBuf in, ByteBuf out)
1092 throws SSLException {
1093 ByteBuf newDirectIn = null;
1094 try {
1095 int readerIndex = in.readerIndex();
1096 int readableBytes = in.readableBytes();
1097
1098
1099
1100 final ByteBuffer[] in0;
1101 if (in.isDirect() || !engineType.wantsDirectBuffer) {
1102
1103
1104
1105
1106 if (!(in instanceof CompositeByteBuf) && in.nioBufferCount() == 1) {
1107 in0 = singleBuffer;
1108
1109
1110 in0[0] = in.internalNioBuffer(readerIndex, readableBytes);
1111 } else {
1112 in0 = in.nioBuffers();
1113 }
1114 } else {
1115
1116
1117
1118 newDirectIn = alloc.directBuffer(readableBytes);
1119 newDirectIn.writeBytes(in, readerIndex, readableBytes);
1120 in0 = singleBuffer;
1121 in0[0] = newDirectIn.internalNioBuffer(newDirectIn.readerIndex(), readableBytes);
1122 }
1123
1124 for (;;) {
1125
1126
1127 ByteBuffer out0 = toByteBuffer(out, out.writerIndex(), out.writableBytes());
1128 SSLEngineResult result = engine.wrap(in0, out0);
1129 in.skipBytes(result.bytesConsumed());
1130 out.writerIndex(out.writerIndex() + result.bytesProduced());
1131
1132 if (result.getStatus() == Status.BUFFER_OVERFLOW) {
1133 out.ensureWritable(engine.getSession().getPacketBufferSize());
1134 } else {
1135 return result;
1136 }
1137 }
1138 } finally {
1139
1140 singleBuffer[0] = null;
1141
1142 if (newDirectIn != null) {
1143 newDirectIn.release();
1144 }
1145 }
1146 }
1147
1148 @Override
1149 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
1150 boolean handshakeFailed = handshakePromise.cause() != null;
1151
1152
1153 ClosedChannelException exception = new ClosedChannelException();
1154
1155
1156 if (isStateSet(STATE_HANDSHAKE_STARTED) && !handshakePromise.isDone()) {
1157 ThrowableUtil.addSuppressed(exception, StacklessSSLHandshakeException.newInstance(
1158 "Connection closed while SSL/TLS handshake was in progress",
1159 SslHandler.class, "channelInactive"));
1160 }
1161
1162
1163
1164 setHandshakeFailure(ctx, exception, !isStateSet(STATE_OUTBOUND_CLOSED), isStateSet(STATE_HANDSHAKE_STARTED),
1165 false);
1166
1167
1168 notifyClosePromise(exception);
1169
1170 try {
1171 super.channelInactive(ctx);
1172 } catch (DecoderException e) {
1173 if (!handshakeFailed || !(e.getCause() instanceof SSLException)) {
1174
1175
1176
1177
1178
1179 throw e;
1180 }
1181 }
1182 }
1183
1184 @Override
1185 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
1186 if (ignoreException(cause)) {
1187
1188
1189 if (logger.isDebugEnabled()) {
1190 logger.debug(
1191 "{} Swallowing a harmless 'connection reset by peer / broken pipe' error that occurred " +
1192 "while writing close_notify in response to the peer's close_notify", ctx.channel(), cause);
1193 }
1194
1195
1196
1197 if (ctx.channel().isActive()) {
1198 ctx.close();
1199 }
1200 } else {
1201 ctx.fireExceptionCaught(cause);
1202 }
1203 }
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214 private boolean ignoreException(Throwable t) {
1215 if (!(t instanceof SSLException) && t instanceof IOException && sslClosePromise.isDone()) {
1216 String message = t.getMessage();
1217
1218
1219
1220 if (message != null && IGNORABLE_ERROR_MESSAGE.matcher(message).matches()) {
1221 return true;
1222 }
1223
1224
1225 StackTraceElement[] elements = t.getStackTrace();
1226 for (StackTraceElement element: elements) {
1227 String classname = element.getClassName();
1228 String methodname = element.getMethodName();
1229
1230
1231 if (classname.startsWith("io.netty.")) {
1232 continue;
1233 }
1234
1235
1236 if (!"read".equals(methodname)) {
1237 continue;
1238 }
1239
1240
1241
1242 if (IGNORABLE_CLASS_IN_STACK.matcher(classname).matches()) {
1243 return true;
1244 }
1245
1246 try {
1247
1248
1249
1250 Class<?> clazz = PlatformDependent.getClassLoader(getClass()).loadClass(classname);
1251
1252 if (SocketChannel.class.isAssignableFrom(clazz)
1253 || DatagramChannel.class.isAssignableFrom(clazz)) {
1254 return true;
1255 }
1256
1257
1258 if ("com.sun.nio.sctp.SctpChannel".equals(clazz.getSuperclass().getName())) {
1259 return true;
1260 }
1261 } catch (Throwable cause) {
1262 if (logger.isDebugEnabled()) {
1263 logger.debug("Unexpected exception while loading class {} classname {}",
1264 getClass(), classname, cause);
1265 }
1266 }
1267 }
1268 }
1269
1270 return false;
1271 }
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285 public static boolean isEncrypted(ByteBuf buffer) {
1286 if (buffer.readableBytes() < SslUtils.SSL_RECORD_HEADER_LENGTH) {
1287 throw new IllegalArgumentException(
1288 "buffer must have at least " + SslUtils.SSL_RECORD_HEADER_LENGTH + " readable bytes");
1289 }
1290 return getEncryptedPacketLength(buffer, buffer.readerIndex()) != SslUtils.NOT_ENCRYPTED;
1291 }
1292
1293 private void decodeJdkCompatible(ChannelHandlerContext ctx, ByteBuf in) throws NotSslRecordException {
1294 int packetLength = this.packetLength;
1295
1296 if (packetLength > 0) {
1297 if (in.readableBytes() < packetLength) {
1298 return;
1299 }
1300 } else {
1301
1302 final int readableBytes = in.readableBytes();
1303 if (readableBytes < SslUtils.SSL_RECORD_HEADER_LENGTH) {
1304 return;
1305 }
1306 packetLength = getEncryptedPacketLength(in, in.readerIndex());
1307 if (packetLength == SslUtils.NOT_ENCRYPTED) {
1308
1309 NotSslRecordException e = new NotSslRecordException(
1310 "not an SSL/TLS record: " + ByteBufUtil.hexDump(in));
1311 in.skipBytes(in.readableBytes());
1312
1313
1314
1315 setHandshakeFailure(ctx, e);
1316
1317 throw e;
1318 }
1319 if (packetLength == NOT_ENOUGH_DATA) {
1320 return;
1321 }
1322 assert packetLength > 0;
1323 if (packetLength > readableBytes) {
1324
1325 this.packetLength = packetLength;
1326 return;
1327 }
1328 }
1329
1330
1331
1332 this.packetLength = 0;
1333 try {
1334 final int bytesConsumed = unwrap(ctx, in, packetLength);
1335 assert bytesConsumed == packetLength || engine.isInboundDone() :
1336 "we feed the SSLEngine a packets worth of data: " + packetLength + " but it only consumed: " +
1337 bytesConsumed;
1338 } catch (Throwable cause) {
1339 handleUnwrapThrowable(ctx, cause);
1340 }
1341 }
1342
1343 private void decodeNonJdkCompatible(ChannelHandlerContext ctx, ByteBuf in) {
1344 try {
1345 unwrap(ctx, in, in.readableBytes());
1346 } catch (Throwable cause) {
1347 handleUnwrapThrowable(ctx, cause);
1348 }
1349 }
1350
1351 private void handleUnwrapThrowable(ChannelHandlerContext ctx, Throwable cause) {
1352 try {
1353
1354
1355
1356
1357 if (handshakePromise.tryFailure(cause)) {
1358 ctx.fireUserEventTriggered(new SslHandshakeCompletionEvent(cause));
1359 }
1360
1361
1362 if (pendingUnencryptedWrites != null) {
1363
1364
1365 wrapAndFlush(ctx);
1366 }
1367 } catch (SSLException ex) {
1368 logger.debug("SSLException during trying to call SSLEngine.wrap(...)" +
1369 " because of an previous SSLException, ignoring...", ex);
1370 } finally {
1371
1372 setHandshakeFailure(ctx, cause, true, false, true);
1373 }
1374 PlatformDependent.throwException(cause);
1375 }
1376
1377 @Override
1378 protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws SSLException {
1379 if (isStateSet(STATE_PROCESS_TASK)) {
1380 return;
1381 }
1382 if (jdkCompatibilityMode) {
1383 decodeJdkCompatible(ctx, in);
1384 } else {
1385 decodeNonJdkCompatible(ctx, in);
1386 }
1387 }
1388
1389 @Override
1390 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
1391 channelReadComplete0(ctx);
1392 }
1393
1394 private void channelReadComplete0(ChannelHandlerContext ctx) {
1395
1396 discardSomeReadBytes();
1397
1398 flushIfNeeded(ctx);
1399 readIfNeeded(ctx);
1400
1401 clearState(STATE_FIRE_CHANNEL_READ);
1402 ctx.fireChannelReadComplete();
1403 }
1404
1405 private void readIfNeeded(ChannelHandlerContext ctx) {
1406
1407 if (!ctx.channel().config().isAutoRead() &&
1408 (!isStateSet(STATE_FIRE_CHANNEL_READ) || !handshakePromise.isDone())) {
1409
1410
1411 ctx.read();
1412 }
1413 }
1414
1415 private void flushIfNeeded(ChannelHandlerContext ctx) {
1416 if (isStateSet(STATE_NEEDS_FLUSH)) {
1417 forceFlush(ctx);
1418 }
1419 }
1420
1421
1422
1423
1424 private int unwrapNonAppData(ChannelHandlerContext ctx) throws SSLException {
1425 return unwrap(ctx, Unpooled.EMPTY_BUFFER, 0);
1426 }
1427
1428
1429
1430
1431 private int unwrap(ChannelHandlerContext ctx, ByteBuf packet, int length) throws SSLException {
1432 final int originalLength = length;
1433 boolean wrapLater = false;
1434 boolean notifyClosure = false;
1435 boolean executedRead = false;
1436 ByteBuf decodeOut = allocate(ctx, length);
1437 try {
1438
1439
1440 do {
1441 final SSLEngineResult result = engineType.unwrap(this, packet, length, decodeOut);
1442 final Status status = result.getStatus();
1443 final HandshakeStatus handshakeStatus = result.getHandshakeStatus();
1444 final int produced = result.bytesProduced();
1445 final int consumed = result.bytesConsumed();
1446
1447
1448
1449
1450 packet.skipBytes(consumed);
1451 length -= consumed;
1452
1453
1454
1455
1456 if (handshakeStatus == HandshakeStatus.FINISHED || handshakeStatus == HandshakeStatus.NOT_HANDSHAKING) {
1457 wrapLater |= (decodeOut.isReadable() ?
1458 setHandshakeSuccessUnwrapMarkReentry() : setHandshakeSuccess()) ||
1459 handshakeStatus == HandshakeStatus.FINISHED;
1460 }
1461
1462
1463
1464
1465 if (decodeOut.isReadable()) {
1466 setState(STATE_FIRE_CHANNEL_READ);
1467 if (isStateSet(STATE_UNWRAP_REENTRY)) {
1468 executedRead = true;
1469 executeChannelRead(ctx, decodeOut);
1470 } else {
1471 ctx.fireChannelRead(decodeOut);
1472 }
1473 decodeOut = null;
1474 }
1475
1476 if (status == Status.CLOSED) {
1477 notifyClosure = true;
1478 } else if (status == Status.BUFFER_OVERFLOW) {
1479 if (decodeOut != null) {
1480 decodeOut.release();
1481 }
1482 final int applicationBufferSize = engine.getSession().getApplicationBufferSize();
1483
1484
1485
1486
1487 decodeOut = allocate(ctx, engineType.calculatePendingData(this, applicationBufferSize < produced ?
1488 applicationBufferSize : applicationBufferSize - produced));
1489 continue;
1490 }
1491
1492 if (handshakeStatus == HandshakeStatus.NEED_TASK) {
1493 boolean pending = runDelegatedTasks(true);
1494 if (!pending) {
1495
1496
1497
1498
1499
1500 wrapLater = false;
1501 break;
1502 }
1503 } else if (handshakeStatus == HandshakeStatus.NEED_WRAP) {
1504
1505
1506
1507 if (wrapNonAppData(ctx, true) && length == 0) {
1508 break;
1509 }
1510 }
1511
1512 if (status == Status.BUFFER_UNDERFLOW ||
1513
1514 handshakeStatus != HandshakeStatus.NEED_TASK && (consumed == 0 && produced == 0 ||
1515 (length == 0 && handshakeStatus == HandshakeStatus.NOT_HANDSHAKING))) {
1516 if (handshakeStatus == HandshakeStatus.NEED_UNWRAP) {
1517
1518
1519 readIfNeeded(ctx);
1520 }
1521
1522 break;
1523 } else if (decodeOut == null) {
1524 decodeOut = allocate(ctx, length);
1525 }
1526 } while (!ctx.isRemoved());
1527
1528 if (isStateSet(STATE_FLUSHED_BEFORE_HANDSHAKE) && handshakePromise.isDone()) {
1529
1530
1531
1532
1533 clearState(STATE_FLUSHED_BEFORE_HANDSHAKE);
1534 wrapLater = true;
1535 }
1536
1537 if (wrapLater) {
1538 wrap(ctx, true);
1539 }
1540 } finally {
1541 if (decodeOut != null) {
1542 decodeOut.release();
1543 }
1544
1545 if (notifyClosure) {
1546 if (executedRead) {
1547 executeNotifyClosePromise(ctx);
1548 } else {
1549 notifyClosePromise(null);
1550 }
1551 }
1552 }
1553 return originalLength - length;
1554 }
1555
1556 private boolean setHandshakeSuccessUnwrapMarkReentry() {
1557
1558
1559 final boolean setReentryState = !isStateSet(STATE_UNWRAP_REENTRY);
1560 if (setReentryState) {
1561 setState(STATE_UNWRAP_REENTRY);
1562 }
1563 try {
1564 return setHandshakeSuccess();
1565 } finally {
1566
1567
1568 if (setReentryState) {
1569 clearState(STATE_UNWRAP_REENTRY);
1570 }
1571 }
1572 }
1573
1574 private void executeNotifyClosePromise(final ChannelHandlerContext ctx) {
1575 try {
1576 ctx.executor().execute(new Runnable() {
1577 @Override
1578 public void run() {
1579 notifyClosePromise(null);
1580 }
1581 });
1582 } catch (RejectedExecutionException e) {
1583 notifyClosePromise(e);
1584 }
1585 }
1586
1587 private void executeChannelRead(final ChannelHandlerContext ctx, final ByteBuf decodedOut) {
1588 try {
1589 ctx.executor().execute(new Runnable() {
1590 @Override
1591 public void run() {
1592 ctx.fireChannelRead(decodedOut);
1593 }
1594 });
1595 } catch (RejectedExecutionException e) {
1596 decodedOut.release();
1597 throw e;
1598 }
1599 }
1600
1601 private static ByteBuffer toByteBuffer(ByteBuf out, int index, int len) {
1602 return out.nioBufferCount() == 1 ? out.internalNioBuffer(index, len) :
1603 out.nioBuffer(index, len);
1604 }
1605
1606 private static boolean inEventLoop(Executor executor) {
1607 return executor instanceof EventExecutor && ((EventExecutor) executor).inEventLoop();
1608 }
1609
1610
1611
1612
1613
1614
1615
1616
1617 private boolean runDelegatedTasks(boolean inUnwrap) {
1618 if (delegatedTaskExecutor == ImmediateExecutor.INSTANCE || inEventLoop(delegatedTaskExecutor)) {
1619
1620
1621 for (;;) {
1622 Runnable task = engine.getDelegatedTask();
1623 if (task == null) {
1624 return true;
1625 }
1626 setState(STATE_PROCESS_TASK);
1627 if (task instanceof AsyncRunnable) {
1628
1629 boolean pending = false;
1630 try {
1631 AsyncRunnable asyncTask = (AsyncRunnable) task;
1632 AsyncTaskCompletionHandler completionHandler = new AsyncTaskCompletionHandler(inUnwrap);
1633 asyncTask.run(completionHandler);
1634 pending = completionHandler.resumeLater();
1635 if (pending) {
1636 return false;
1637 }
1638 } finally {
1639 if (!pending) {
1640
1641
1642 clearState(STATE_PROCESS_TASK);
1643 }
1644 }
1645 } else {
1646 try {
1647 task.run();
1648 } finally {
1649 clearState(STATE_PROCESS_TASK);
1650 }
1651 }
1652 }
1653 } else {
1654 executeDelegatedTask(inUnwrap);
1655 return false;
1656 }
1657 }
1658
1659 private SslTasksRunner getTaskRunner(boolean inUnwrap) {
1660 return inUnwrap ? sslTaskRunnerForUnwrap : sslTaskRunner;
1661 }
1662
1663 private void executeDelegatedTask(boolean inUnwrap) {
1664 executeDelegatedTask(getTaskRunner(inUnwrap));
1665 }
1666
1667 private void executeDelegatedTask(SslTasksRunner task) {
1668 setState(STATE_PROCESS_TASK);
1669 try {
1670 delegatedTaskExecutor.execute(task);
1671 } catch (RejectedExecutionException e) {
1672 clearState(STATE_PROCESS_TASK);
1673 throw e;
1674 }
1675 }
1676
1677 private final class AsyncTaskCompletionHandler implements Runnable {
1678 private final boolean inUnwrap;
1679 boolean didRun;
1680 boolean resumeLater;
1681
1682 AsyncTaskCompletionHandler(boolean inUnwrap) {
1683 this.inUnwrap = inUnwrap;
1684 }
1685
1686 @Override
1687 public void run() {
1688 didRun = true;
1689 if (resumeLater) {
1690 getTaskRunner(inUnwrap).runComplete();
1691 }
1692 }
1693
1694 boolean resumeLater() {
1695 if (!didRun) {
1696 resumeLater = true;
1697 return true;
1698 }
1699 return false;
1700 }
1701 }
1702
1703
1704
1705
1706
1707 private final class SslTasksRunner implements Runnable {
1708 private final boolean inUnwrap;
1709 private final Runnable runCompleteTask = new Runnable() {
1710 @Override
1711 public void run() {
1712 runComplete();
1713 }
1714 };
1715
1716 SslTasksRunner(boolean inUnwrap) {
1717 this.inUnwrap = inUnwrap;
1718 }
1719
1720
1721 private void taskError(Throwable e) {
1722 if (inUnwrap) {
1723
1724
1725
1726
1727 try {
1728 handleUnwrapThrowable(ctx, e);
1729 } catch (Throwable cause) {
1730 safeExceptionCaught(cause);
1731 }
1732 } else {
1733 setHandshakeFailure(ctx, e);
1734 forceFlush(ctx);
1735 }
1736 }
1737
1738
1739 private void safeExceptionCaught(Throwable cause) {
1740 try {
1741 exceptionCaught(ctx, wrapIfNeeded(cause));
1742 } catch (Throwable error) {
1743 ctx.fireExceptionCaught(error);
1744 }
1745 }
1746
1747 private Throwable wrapIfNeeded(Throwable cause) {
1748 if (!inUnwrap) {
1749
1750 return cause;
1751 }
1752
1753
1754 return cause instanceof DecoderException ? cause : new DecoderException(cause);
1755 }
1756
1757 private void tryDecodeAgain() {
1758 try {
1759 channelRead(ctx, Unpooled.EMPTY_BUFFER);
1760 } catch (Throwable cause) {
1761 safeExceptionCaught(cause);
1762 } finally {
1763
1764
1765
1766 channelReadComplete0(ctx);
1767 }
1768 }
1769
1770
1771
1772
1773
1774 private void resumeOnEventExecutor() {
1775 assert ctx.executor().inEventLoop();
1776 clearState(STATE_PROCESS_TASK);
1777 try {
1778 HandshakeStatus status = engine.getHandshakeStatus();
1779 switch (status) {
1780
1781
1782 case NEED_TASK:
1783 executeDelegatedTask(this);
1784
1785 break;
1786
1787
1788 case FINISHED:
1789
1790 case NOT_HANDSHAKING:
1791 setHandshakeSuccess();
1792 try {
1793
1794
1795 wrap(ctx, inUnwrap);
1796 } catch (Throwable e) {
1797 taskError(e);
1798 return;
1799 }
1800 if (inUnwrap) {
1801
1802
1803 unwrapNonAppData(ctx);
1804 }
1805
1806
1807 forceFlush(ctx);
1808
1809 tryDecodeAgain();
1810 break;
1811
1812
1813
1814 case NEED_UNWRAP:
1815 try {
1816 unwrapNonAppData(ctx);
1817 } catch (SSLException e) {
1818 handleUnwrapThrowable(ctx, e);
1819 return;
1820 }
1821 tryDecodeAgain();
1822 break;
1823
1824
1825
1826 case NEED_WRAP:
1827 try {
1828 if (!wrapNonAppData(ctx, false) && inUnwrap) {
1829
1830
1831
1832
1833 unwrapNonAppData(ctx);
1834 }
1835
1836
1837 forceFlush(ctx);
1838 } catch (Throwable e) {
1839 taskError(e);
1840 return;
1841 }
1842
1843
1844 tryDecodeAgain();
1845 break;
1846
1847 default:
1848
1849 throw new AssertionError();
1850 }
1851 } catch (Throwable cause) {
1852 safeExceptionCaught(cause);
1853 }
1854 }
1855
1856 void runComplete() {
1857 EventExecutor executor = ctx.executor();
1858
1859
1860
1861
1862
1863
1864
1865 executor.execute(new Runnable() {
1866 @Override
1867 public void run() {
1868 resumeOnEventExecutor();
1869 }
1870 });
1871 }
1872
1873 @Override
1874 public void run() {
1875 try {
1876 Runnable task = engine.getDelegatedTask();
1877 if (task == null) {
1878
1879 return;
1880 }
1881 if (task instanceof AsyncRunnable) {
1882 AsyncRunnable asyncTask = (AsyncRunnable) task;
1883 asyncTask.run(runCompleteTask);
1884 } else {
1885 task.run();
1886 runComplete();
1887 }
1888 } catch (final Throwable cause) {
1889 handleException(cause);
1890 }
1891 }
1892
1893 private void handleException(final Throwable cause) {
1894 EventExecutor executor = ctx.executor();
1895 if (executor.inEventLoop()) {
1896 clearState(STATE_PROCESS_TASK);
1897 safeExceptionCaught(cause);
1898 } else {
1899 try {
1900 executor.execute(new Runnable() {
1901 @Override
1902 public void run() {
1903 clearState(STATE_PROCESS_TASK);
1904 safeExceptionCaught(cause);
1905 }
1906 });
1907 } catch (RejectedExecutionException ignore) {
1908 clearState(STATE_PROCESS_TASK);
1909
1910
1911 ctx.fireExceptionCaught(cause);
1912 }
1913 }
1914 }
1915 }
1916
1917
1918
1919
1920
1921
1922 private boolean setHandshakeSuccess() {
1923
1924
1925
1926 final boolean notified;
1927 if (notified = !handshakePromise.isDone() && handshakePromise.trySuccess(ctx.channel())) {
1928 if (logger.isDebugEnabled()) {
1929 SSLSession session = engine.getSession();
1930 logger.debug(
1931 "{} HANDSHAKEN: protocol:{} cipher suite:{}",
1932 ctx.channel(),
1933 session.getProtocol(),
1934 session.getCipherSuite());
1935 }
1936 ctx.fireUserEventTriggered(SslHandshakeCompletionEvent.SUCCESS);
1937 }
1938 if (isStateSet(STATE_READ_DURING_HANDSHAKE)) {
1939 clearState(STATE_READ_DURING_HANDSHAKE);
1940 if (!ctx.channel().config().isAutoRead()) {
1941 ctx.read();
1942 }
1943 }
1944 return notified;
1945 }
1946
1947
1948
1949
1950 private void setHandshakeFailure(ChannelHandlerContext ctx, Throwable cause) {
1951 setHandshakeFailure(ctx, cause, true, true, false);
1952 }
1953
1954
1955
1956
1957 private void setHandshakeFailure(ChannelHandlerContext ctx, Throwable cause, boolean closeInbound,
1958 boolean notify, boolean alwaysFlushAndClose) {
1959 try {
1960
1961 setState(STATE_OUTBOUND_CLOSED);
1962 engine.closeOutbound();
1963
1964 if (closeInbound) {
1965 try {
1966 engine.closeInbound();
1967 } catch (SSLException e) {
1968 if (logger.isDebugEnabled()) {
1969
1970
1971
1972
1973 String msg = e.getMessage();
1974 if (msg == null || !(msg.contains("possible truncation attack") ||
1975 msg.contains("closing inbound before receiving peer's close_notify"))) {
1976 logger.debug("{} SSLEngine.closeInbound() raised an exception.", ctx.channel(), e);
1977 }
1978 }
1979 }
1980 }
1981 if (handshakePromise.tryFailure(cause) || alwaysFlushAndClose) {
1982 SslUtils.handleHandshakeFailure(ctx, cause, notify);
1983 }
1984 } finally {
1985
1986 releaseAndFailAll(ctx, cause);
1987 }
1988 }
1989
1990 private void setHandshakeFailureTransportFailure(ChannelHandlerContext ctx, Throwable cause) {
1991
1992
1993
1994 try {
1995 SSLException transportFailure = new SSLException("failure when writing TLS control frames", cause);
1996 releaseAndFailAll(ctx, transportFailure);
1997 if (handshakePromise.tryFailure(transportFailure)) {
1998 ctx.fireUserEventTriggered(new SslHandshakeCompletionEvent(transportFailure));
1999 }
2000 } finally {
2001 ctx.close();
2002 }
2003 }
2004
2005 private void releaseAndFailAll(ChannelHandlerContext ctx, Throwable cause) {
2006 if (pendingUnencryptedWrites != null) {
2007 pendingUnencryptedWrites.releaseAndFailAll(ctx, cause);
2008 }
2009 }
2010
2011 private void notifyClosePromise(Throwable cause) {
2012 if (cause == null) {
2013 if (sslClosePromise.trySuccess(ctx.channel())) {
2014 ctx.fireUserEventTriggered(SslCloseCompletionEvent.SUCCESS);
2015 }
2016 } else {
2017 if (sslClosePromise.tryFailure(cause)) {
2018 ctx.fireUserEventTriggered(new SslCloseCompletionEvent(cause));
2019 }
2020 }
2021 }
2022
2023 private void closeOutboundAndChannel(
2024 final ChannelHandlerContext ctx, final ChannelPromise promise, boolean disconnect) throws Exception {
2025 setState(STATE_OUTBOUND_CLOSED);
2026 engine.closeOutbound();
2027
2028 if (!ctx.channel().isActive()) {
2029 if (disconnect) {
2030 ctx.disconnect(promise);
2031 } else {
2032 ctx.close(promise);
2033 }
2034 return;
2035 }
2036
2037 ChannelPromise closeNotifyPromise = ctx.newPromise();
2038 try {
2039 flush(ctx, closeNotifyPromise);
2040 } finally {
2041 if (!isStateSet(STATE_CLOSE_NOTIFY)) {
2042 setState(STATE_CLOSE_NOTIFY);
2043
2044
2045
2046
2047
2048
2049
2050
2051 safeClose(ctx, closeNotifyPromise, PromiseNotifier.cascade(false, ctx.newPromise(), promise));
2052 } else {
2053
2054 sslClosePromise.addListener(new FutureListener<Channel>() {
2055 @Override
2056 public void operationComplete(Future<Channel> future) {
2057 promise.setSuccess();
2058 }
2059 });
2060 }
2061 }
2062 }
2063
2064 private void flush(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
2065 if (pendingUnencryptedWrites != null) {
2066 pendingUnencryptedWrites.add(Unpooled.EMPTY_BUFFER, promise);
2067 } else {
2068 promise.setFailure(newPendingWritesNullException());
2069 }
2070 flush(ctx);
2071 }
2072
2073 @Override
2074 public void handlerAdded(final ChannelHandlerContext ctx) throws Exception {
2075 this.ctx = ctx;
2076 Channel channel = ctx.channel();
2077 pendingUnencryptedWrites = new SslHandlerCoalescingBufferQueue(channel, 16, engineType.wantsDirectBuffer) {
2078 @Override
2079 protected int wrapDataSize() {
2080 return SslHandler.this.wrapDataSize;
2081 }
2082 };
2083
2084 setOpensslEngineSocketFd(channel);
2085 boolean fastOpen = Boolean.TRUE.equals(channel.config().getOption(ChannelOption.TCP_FASTOPEN_CONNECT));
2086 boolean active = channel.isActive();
2087 if (active || fastOpen) {
2088
2089
2090
2091 startHandshakeProcessing(active);
2092
2093
2094 final ChannelOutboundBuffer outboundBuffer;
2095 if (fastOpen && ((outboundBuffer = channel.unsafe().outboundBuffer()) == null ||
2096 outboundBuffer.totalPendingWriteBytes() > 0)) {
2097 setState(STATE_NEEDS_FLUSH);
2098 }
2099 }
2100 }
2101
2102 private void startHandshakeProcessing(boolean flushAtEnd) {
2103 if (!isStateSet(STATE_HANDSHAKE_STARTED)) {
2104 setState(STATE_HANDSHAKE_STARTED);
2105 if (engine.getUseClientMode()) {
2106
2107
2108
2109 handshake(flushAtEnd);
2110 }
2111 applyHandshakeTimeout();
2112 } else if (isStateSet(STATE_NEEDS_FLUSH)) {
2113 forceFlush(ctx);
2114 }
2115 }
2116
2117
2118
2119
2120 public Future<Channel> renegotiate() {
2121 ChannelHandlerContext ctx = this.ctx;
2122 if (ctx == null) {
2123 throw new IllegalStateException();
2124 }
2125
2126 return renegotiate(ctx.executor().<Channel>newPromise());
2127 }
2128
2129
2130
2131
2132 public Future<Channel> renegotiate(final Promise<Channel> promise) {
2133 ObjectUtil.checkNotNull(promise, "promise");
2134
2135 ChannelHandlerContext ctx = this.ctx;
2136 if (ctx == null) {
2137 throw new IllegalStateException();
2138 }
2139
2140 EventExecutor executor = ctx.executor();
2141 if (!executor.inEventLoop()) {
2142 executor.execute(new Runnable() {
2143 @Override
2144 public void run() {
2145 renegotiateOnEventLoop(promise);
2146 }
2147 });
2148 return promise;
2149 }
2150
2151 renegotiateOnEventLoop(promise);
2152 return promise;
2153 }
2154
2155 private void renegotiateOnEventLoop(final Promise<Channel> newHandshakePromise) {
2156 final Promise<Channel> oldHandshakePromise = handshakePromise;
2157 if (!oldHandshakePromise.isDone()) {
2158
2159
2160 PromiseNotifier.cascade(oldHandshakePromise, newHandshakePromise);
2161 } else {
2162 handshakePromise = newHandshakePromise;
2163 handshake(true);
2164 applyHandshakeTimeout();
2165 }
2166 }
2167
2168
2169
2170
2171
2172
2173
2174 private void handshake(boolean flushAtEnd) {
2175 if (engine.getHandshakeStatus() != HandshakeStatus.NOT_HANDSHAKING) {
2176
2177
2178 return;
2179 }
2180 if (handshakePromise.isDone()) {
2181
2182
2183
2184
2185
2186 return;
2187 }
2188
2189
2190 final ChannelHandlerContext ctx = this.ctx;
2191 try {
2192 engine.beginHandshake();
2193 wrapNonAppData(ctx, false);
2194 } catch (Throwable e) {
2195 setHandshakeFailure(ctx, e);
2196 } finally {
2197 if (flushAtEnd) {
2198 forceFlush(ctx);
2199 }
2200 }
2201 }
2202
2203 private void applyHandshakeTimeout() {
2204 final Promise<Channel> localHandshakePromise = this.handshakePromise;
2205
2206
2207 final long handshakeTimeoutMillis = this.handshakeTimeoutMillis;
2208 if (handshakeTimeoutMillis <= 0 || localHandshakePromise.isDone()) {
2209 return;
2210 }
2211
2212 final Future<?> timeoutFuture = ctx.executor().schedule(new Runnable() {
2213 @Override
2214 public void run() {
2215 if (localHandshakePromise.isDone()) {
2216 return;
2217 }
2218 SSLException exception =
2219 new SslHandshakeTimeoutException("handshake timed out after " + handshakeTimeoutMillis + "ms");
2220 try {
2221 if (localHandshakePromise.tryFailure(exception)) {
2222 SslUtils.handleHandshakeFailure(ctx, exception, true);
2223 }
2224 } finally {
2225 releaseAndFailAll(ctx, exception);
2226 }
2227 }
2228 }, handshakeTimeoutMillis, TimeUnit.MILLISECONDS);
2229
2230
2231 localHandshakePromise.addListener(new FutureListener<Channel>() {
2232 @Override
2233 public void operationComplete(Future<Channel> f) throws Exception {
2234 timeoutFuture.cancel(false);
2235 }
2236 });
2237 }
2238
2239 private void forceFlush(ChannelHandlerContext ctx) {
2240 clearState(STATE_NEEDS_FLUSH);
2241 ctx.flush();
2242 }
2243
2244 private void setOpensslEngineSocketFd(Channel c) {
2245 if (c instanceof UnixChannel && engine instanceof ReferenceCountedOpenSslEngine) {
2246 ((ReferenceCountedOpenSslEngine) engine).bioSetFd(((UnixChannel) c).fd().intValue());
2247 }
2248 }
2249
2250
2251
2252
2253 @Override
2254 public void channelActive(final ChannelHandlerContext ctx) throws Exception {
2255 setOpensslEngineSocketFd(ctx.channel());
2256 if (!startTls) {
2257 startHandshakeProcessing(true);
2258 }
2259 ctx.fireChannelActive();
2260 }
2261
2262 private void safeClose(
2263 final ChannelHandlerContext ctx, final ChannelFuture flushFuture,
2264 final ChannelPromise promise) {
2265 if (!ctx.channel().isActive()) {
2266 ctx.close(promise);
2267 return;
2268 }
2269
2270 final Future<?> timeoutFuture;
2271 if (!flushFuture.isDone()) {
2272 long closeNotifyTimeout = closeNotifyFlushTimeoutMillis;
2273 if (closeNotifyTimeout > 0) {
2274
2275 timeoutFuture = ctx.executor().schedule(new Runnable() {
2276 @Override
2277 public void run() {
2278
2279 if (!flushFuture.isDone()) {
2280 logger.warn("{} Last write attempt timed out; force-closing the connection.",
2281 ctx.channel());
2282 addCloseListener(ctx.close(ctx.newPromise()), promise);
2283 }
2284 }
2285 }, closeNotifyTimeout, TimeUnit.MILLISECONDS);
2286 } else {
2287 timeoutFuture = null;
2288 }
2289 } else {
2290 timeoutFuture = null;
2291 }
2292
2293
2294 flushFuture.addListener(new ChannelFutureListener() {
2295 @Override
2296 public void operationComplete(ChannelFuture f) {
2297 if (timeoutFuture != null) {
2298 timeoutFuture.cancel(false);
2299 }
2300 final long closeNotifyReadTimeout = closeNotifyReadTimeoutMillis;
2301 if (closeNotifyReadTimeout <= 0) {
2302
2303
2304 addCloseListener(ctx.close(ctx.newPromise()), promise);
2305 } else {
2306 final Future<?> closeNotifyReadTimeoutFuture;
2307
2308 if (!sslClosePromise.isDone()) {
2309 closeNotifyReadTimeoutFuture = ctx.executor().schedule(new Runnable() {
2310 @Override
2311 public void run() {
2312 if (!sslClosePromise.isDone()) {
2313 logger.debug(
2314 "{} did not receive close_notify in {}ms; force-closing the connection.",
2315 ctx.channel(), closeNotifyReadTimeout);
2316
2317
2318 addCloseListener(ctx.close(ctx.newPromise()), promise);
2319 }
2320 }
2321 }, closeNotifyReadTimeout, TimeUnit.MILLISECONDS);
2322 } else {
2323 closeNotifyReadTimeoutFuture = null;
2324 }
2325
2326
2327 sslClosePromise.addListener(new FutureListener<Channel>() {
2328 @Override
2329 public void operationComplete(Future<Channel> future) throws Exception {
2330 if (closeNotifyReadTimeoutFuture != null) {
2331 closeNotifyReadTimeoutFuture.cancel(false);
2332 }
2333 addCloseListener(ctx.close(ctx.newPromise()), promise);
2334 }
2335 });
2336 }
2337 }
2338 });
2339 }
2340
2341 private static void addCloseListener(ChannelFuture future, ChannelPromise promise) {
2342
2343
2344
2345
2346
2347
2348 PromiseNotifier.cascade(false, future, promise);
2349 }
2350
2351
2352
2353
2354
2355 private ByteBuf allocate(ChannelHandlerContext ctx, int capacity) {
2356 ByteBufAllocator alloc = ctx.alloc();
2357 if (engineType.wantsDirectBuffer) {
2358 return alloc.directBuffer(capacity);
2359 } else {
2360 return alloc.buffer(capacity);
2361 }
2362 }
2363
2364
2365
2366
2367
2368 private ByteBuf allocateOutNetBuf(ChannelHandlerContext ctx, int pendingBytes, int numComponents) {
2369 return engineType.allocateWrapBuffer(this, ctx.alloc(), pendingBytes, numComponents);
2370 }
2371
2372 private boolean isStateSet(int bit) {
2373 return (state & bit) == bit;
2374 }
2375
2376 private void setState(int bit) {
2377 state |= bit;
2378 }
2379
2380 private void clearState(int bit) {
2381 state &= ~bit;
2382 }
2383
2384 private final class LazyChannelPromise extends DefaultPromise<Channel> {
2385
2386 @Override
2387 protected EventExecutor executor() {
2388 if (ctx == null) {
2389 throw new IllegalStateException();
2390 }
2391 return ctx.executor();
2392 }
2393
2394 @Override
2395 protected void checkDeadLock() {
2396 if (ctx == null) {
2397
2398
2399
2400
2401
2402
2403 return;
2404 }
2405 super.checkDeadLock();
2406 }
2407 }
2408 }