1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.handler.ssl;
17
18 import org.jboss.netty.buffer.ChannelBuffer;
19 import org.jboss.netty.buffer.ChannelBuffers;
20 import org.jboss.netty.channel.Channel;
21 import org.jboss.netty.channel.ChannelDownstreamHandler;
22 import org.jboss.netty.channel.ChannelEvent;
23 import org.jboss.netty.channel.ChannelFuture;
24 import org.jboss.netty.channel.ChannelFutureListener;
25 import org.jboss.netty.channel.ChannelHandlerContext;
26 import org.jboss.netty.channel.ChannelPipeline;
27 import org.jboss.netty.channel.ChannelStateEvent;
28 import org.jboss.netty.channel.Channels;
29 import org.jboss.netty.channel.DefaultChannelFuture;
30 import org.jboss.netty.channel.DownstreamMessageEvent;
31 import org.jboss.netty.channel.ExceptionEvent;
32 import org.jboss.netty.channel.MessageEvent;
33 import org.jboss.netty.handler.codec.frame.FrameDecoder;
34 import org.jboss.netty.logging.InternalLogger;
35 import org.jboss.netty.logging.InternalLoggerFactory;
36 import org.jboss.netty.util.Timeout;
37 import org.jboss.netty.util.Timer;
38 import org.jboss.netty.util.TimerTask;
39 import org.jboss.netty.util.internal.DetectionUtil;
40 import org.jboss.netty.util.internal.NonReentrantLock;
41
42 import javax.net.ssl.SSLEngine;
43 import javax.net.ssl.SSLEngineResult;
44 import javax.net.ssl.SSLEngineResult.HandshakeStatus;
45 import javax.net.ssl.SSLEngineResult.Status;
46 import javax.net.ssl.SSLException;
47 import java.io.IOException;
48 import java.nio.ByteBuffer;
49 import java.nio.channels.ClosedChannelException;
50 import java.nio.channels.DatagramChannel;
51 import java.nio.channels.SocketChannel;
52 import java.util.LinkedList;
53 import java.util.Queue;
54 import java.util.concurrent.ConcurrentLinkedQueue;
55 import java.util.concurrent.Executor;
56 import java.util.concurrent.TimeUnit;
57 import java.util.concurrent.atomic.AtomicBoolean;
58 import java.util.regex.Pattern;
59
60 import static org.jboss.netty.channel.Channels.*;
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178 public class SslHandler extends FrameDecoder
179 implements ChannelDownstreamHandler {
180
181 private static final InternalLogger logger =
182 InternalLoggerFactory.getInstance(SslHandler.class);
183
184 private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);
185
186 private static final Pattern IGNORABLE_CLASS_IN_STACK = Pattern.compile(
187 "^.*(?:Socket|Datagram|Sctp|Udt)Channel.*$");
188 private static final Pattern IGNORABLE_ERROR_MESSAGE = Pattern.compile(
189 "^.*(?:connection.*(?:reset|closed|abort|broken)|broken.*pipe).*$", Pattern.CASE_INSENSITIVE);
190
191 private static SslBufferPool defaultBufferPool;
192
193
194
195
196
197 public static synchronized SslBufferPool getDefaultBufferPool() {
198 if (defaultBufferPool == null) {
199 defaultBufferPool = new SslBufferPool();
200 }
201 return defaultBufferPool;
202 }
203
204 private volatile ChannelHandlerContext ctx;
205 private final SSLEngine engine;
206 private final SslBufferPool bufferPool;
207 private final Executor delegatedTaskExecutor;
208 private final boolean startTls;
209
210 private volatile boolean enableRenegotiation = true;
211
212 final Object handshakeLock = new Object();
213 private boolean handshaking;
214 private volatile boolean handshaken;
215 private volatile ChannelFuture handshakeFuture;
216
217 private final AtomicBoolean sentFirstMessage = new AtomicBoolean();
218 private final AtomicBoolean sentCloseNotify = new AtomicBoolean();
219 int ignoreClosedChannelException;
220 final Object ignoreClosedChannelExceptionLock = new Object();
221 private final Queue<PendingWrite> pendingUnencryptedWrites = new LinkedList<PendingWrite>();
222 private final NonReentrantLock pendingUnencryptedWritesLock = new NonReentrantLock();
223 private final Queue<MessageEvent> pendingEncryptedWrites = new ConcurrentLinkedQueue<MessageEvent>();
224 private final NonReentrantLock pendingEncryptedWritesLock = new NonReentrantLock();
225
226 private volatile boolean issueHandshake;
227
228 private final SSLEngineInboundCloseFuture sslEngineCloseFuture = new SSLEngineInboundCloseFuture();
229
230 private boolean closeOnSSLException;
231
232 private int packetLength = Integer.MIN_VALUE;
233
234 private final Timer timer;
235 private final long handshakeTimeoutInMillis;
236 private Timeout handshakeTimeout;
237
238
239
240
241
242
243 public SslHandler(SSLEngine engine) {
244 this(engine, getDefaultBufferPool(), ImmediateExecutor.INSTANCE);
245 }
246
247
248
249
250
251
252
253
254 public SslHandler(SSLEngine engine, SslBufferPool bufferPool) {
255 this(engine, bufferPool, ImmediateExecutor.INSTANCE);
256 }
257
258
259
260
261
262
263
264
265 public SslHandler(SSLEngine engine, boolean startTls) {
266 this(engine, getDefaultBufferPool(), startTls);
267 }
268
269
270
271
272
273
274
275
276
277
278 public SslHandler(SSLEngine engine, SslBufferPool bufferPool, boolean startTls) {
279 this(engine, bufferPool, startTls, ImmediateExecutor.INSTANCE);
280 }
281
282
283
284
285
286
287
288
289
290
291 public SslHandler(SSLEngine engine, Executor delegatedTaskExecutor) {
292 this(engine, getDefaultBufferPool(), delegatedTaskExecutor);
293 }
294
295
296
297
298
299
300
301
302
303
304
305
306
307 public SslHandler(SSLEngine engine, SslBufferPool bufferPool, Executor delegatedTaskExecutor) {
308 this(engine, bufferPool, false, delegatedTaskExecutor);
309 }
310
311
312
313
314
315
316
317
318
319
320
321
322
323 public SslHandler(SSLEngine engine, boolean startTls, Executor delegatedTaskExecutor) {
324 this(engine, getDefaultBufferPool(), startTls, delegatedTaskExecutor);
325 }
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342 public SslHandler(SSLEngine engine, SslBufferPool bufferPool, boolean startTls, Executor delegatedTaskExecutor) {
343 this(engine, bufferPool, startTls, delegatedTaskExecutor, null, 0);
344 }
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368 public SslHandler(SSLEngine engine, SslBufferPool bufferPool, boolean startTls, Executor delegatedTaskExecutor,
369 Timer timer, long handshakeTimeoutInMillis) {
370 if (engine == null) {
371 throw new NullPointerException("engine");
372 }
373 if (bufferPool == null) {
374 throw new NullPointerException("bufferPool");
375 }
376 if (delegatedTaskExecutor == null) {
377 throw new NullPointerException("delegatedTaskExecutor");
378 }
379 if (timer == null && handshakeTimeoutInMillis > 0) {
380 throw new IllegalArgumentException("No Timer was given but a handshakeTimeoutInMillis, need both or none");
381 }
382
383 this.engine = engine;
384 this.bufferPool = bufferPool;
385 this.delegatedTaskExecutor = delegatedTaskExecutor;
386 this.startTls = startTls;
387 this.timer = timer;
388 this.handshakeTimeoutInMillis = handshakeTimeoutInMillis;
389 }
390
391
392
393
394 public SSLEngine getEngine() {
395 return engine;
396 }
397
398
399
400
401
402
403
404 public ChannelFuture handshake() {
405 synchronized (handshakeLock) {
406 if (handshaken && !isEnableRenegotiation()) {
407 throw new IllegalStateException("renegotiation disabled");
408 }
409
410 final ChannelHandlerContext ctx = this.ctx;
411 final Channel channel = ctx.getChannel();
412 ChannelFuture handshakeFuture;
413 Exception exception = null;
414
415 if (handshaking) {
416 return this.handshakeFuture;
417 }
418
419 handshaking = true;
420 try {
421 engine.beginHandshake();
422 runDelegatedTasks();
423 handshakeFuture = this.handshakeFuture = future(channel);
424 if (handshakeTimeoutInMillis > 0) {
425 handshakeTimeout = timer.newTimeout(new TimerTask() {
426 public void run(Timeout timeout) throws Exception {
427 ChannelFuture future = SslHandler.this.handshakeFuture;
428 if (future != null && future.isDone()) {
429 return;
430 }
431
432 setHandshakeFailure(channel, new SSLException("Handshake did not complete within " +
433 handshakeTimeoutInMillis + "ms"));
434 }
435 }, handshakeTimeoutInMillis, TimeUnit.MILLISECONDS);
436 }
437 } catch (Exception e) {
438 handshakeFuture = this.handshakeFuture = failedFuture(channel, e);
439 exception = e;
440 }
441
442 if (exception == null) {
443 try {
444 final ChannelFuture hsFuture = handshakeFuture;
445 wrapNonAppData(ctx, channel).addListener(new ChannelFutureListener() {
446 public void operationComplete(ChannelFuture future) throws Exception {
447 if (!future.isSuccess()) {
448 Throwable cause = future.getCause();
449 hsFuture.setFailure(cause);
450
451 fireExceptionCaught(ctx, cause);
452 if (closeOnSSLException) {
453 Channels.close(ctx, future(channel));
454 }
455 }
456 }
457 });
458 } catch (SSLException e) {
459 handshakeFuture.setFailure(e);
460
461 fireExceptionCaught(ctx, e);
462 if (closeOnSSLException) {
463 Channels.close(ctx, future(channel));
464 }
465 }
466 } else {
467 fireExceptionCaught(ctx, exception);
468 if (closeOnSSLException) {
469 Channels.close(ctx, future(channel));
470 }
471 }
472 return handshakeFuture;
473 }
474 }
475
476
477
478
479 @Deprecated
480 public ChannelFuture handshake(@SuppressWarnings("unused") Channel channel) {
481 return handshake();
482 }
483
484
485
486
487
488 public ChannelFuture close() {
489 ChannelHandlerContext ctx = this.ctx;
490 Channel channel = ctx.getChannel();
491 try {
492 engine.closeOutbound();
493 return wrapNonAppData(ctx, channel);
494 } catch (SSLException e) {
495 fireExceptionCaught(ctx, e);
496 if (closeOnSSLException) {
497 Channels.close(ctx, future(channel));
498 }
499 return failedFuture(channel, e);
500 }
501 }
502
503
504
505
506 @Deprecated
507 public ChannelFuture close(@SuppressWarnings("unused") Channel channel) {
508 return close();
509 }
510
511
512
513
514 public boolean isEnableRenegotiation() {
515 return enableRenegotiation;
516 }
517
518
519
520
521 public void setEnableRenegotiation(boolean enableRenegotiation) {
522 this.enableRenegotiation = enableRenegotiation;
523 }
524
525
526
527
528
529
530 public void setIssueHandshake(boolean issueHandshake) {
531 this.issueHandshake = issueHandshake;
532 }
533
534
535
536
537 public boolean isIssueHandshake() {
538 return issueHandshake;
539 }
540
541
542
543
544
545
546
547
548
549 public ChannelFuture getSSLEngineInboundCloseFuture() {
550 return sslEngineCloseFuture;
551 }
552
553
554
555
556
557 public long getHandshakeTimeout() {
558 return handshakeTimeoutInMillis;
559 }
560
561
562
563
564
565
566
567
568
569
570 public void setCloseOnSSLException(boolean closeOnSslException) {
571 if (ctx != null) {
572 throw new IllegalStateException("Can only get changed before attached to ChannelPipeline");
573 }
574 closeOnSSLException = closeOnSslException;
575 }
576
577 public boolean getCloseOnSSLException() {
578 return closeOnSSLException;
579 }
580
581 public void handleDownstream(
582 final ChannelHandlerContext context, final ChannelEvent evt) throws Exception {
583 if (evt instanceof ChannelStateEvent) {
584 ChannelStateEvent e = (ChannelStateEvent) evt;
585 switch (e.getState()) {
586 case OPEN:
587 case CONNECTED:
588 case BOUND:
589 if (Boolean.FALSE.equals(e.getValue()) || e.getValue() == null) {
590 closeOutboundAndChannel(context, e);
591 return;
592 }
593 }
594 }
595 if (!(evt instanceof MessageEvent)) {
596 context.sendDownstream(evt);
597 return;
598 }
599
600 MessageEvent e = (MessageEvent) evt;
601 if (!(e.getMessage() instanceof ChannelBuffer)) {
602 context.sendDownstream(evt);
603 return;
604 }
605
606
607
608 if (startTls && sentFirstMessage.compareAndSet(false, true)) {
609 context.sendDownstream(evt);
610 return;
611 }
612
613
614 ChannelBuffer msg = (ChannelBuffer) e.getMessage();
615 PendingWrite pendingWrite;
616
617 if (msg.readable()) {
618 pendingWrite = new PendingWrite(evt.getFuture(), msg.toByteBuffer(msg.readerIndex(), msg.readableBytes()));
619 } else {
620 pendingWrite = new PendingWrite(evt.getFuture(), null);
621 }
622
623 pendingUnencryptedWritesLock.lock();
624 try {
625 pendingUnencryptedWrites.add(pendingWrite);
626 } finally {
627 pendingUnencryptedWritesLock.unlock();
628 }
629
630 wrap(context, evt.getChannel());
631 }
632
633 private void cancelHandshakeTimeout() {
634 if (handshakeTimeout != null) {
635
636 handshakeTimeout.cancel();
637 }
638 }
639
640 @Override
641 public void channelDisconnected(ChannelHandlerContext ctx,
642 ChannelStateEvent e) throws Exception {
643
644
645
646 synchronized (handshakeLock) {
647 if (handshaking) {
648 cancelHandshakeTimeout();
649 handshakeFuture.setFailure(new ClosedChannelException());
650 }
651 }
652
653 try {
654 super.channelDisconnected(ctx, e);
655 } finally {
656 unwrap(ctx, e.getChannel(), ChannelBuffers.EMPTY_BUFFER, 0, 0);
657 engine.closeOutbound();
658 if (!sentCloseNotify.get() && handshaken) {
659 try {
660 engine.closeInbound();
661 } catch (SSLException ex) {
662 if (logger.isDebugEnabled()) {
663 logger.debug("Failed to clean up SSLEngine.", ex);
664 }
665 }
666 }
667 }
668 }
669 @Override
670 public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
671 throws Exception {
672
673 Throwable cause = e.getCause();
674 if (cause instanceof IOException) {
675 if (cause instanceof ClosedChannelException) {
676 synchronized (ignoreClosedChannelExceptionLock) {
677 if (ignoreClosedChannelException > 0) {
678 ignoreClosedChannelException --;
679 if (logger.isDebugEnabled()) {
680 logger.debug(
681 "Swallowing an exception raised while " +
682 "writing non-app data", cause);
683 }
684
685 return;
686 }
687 }
688 } else {
689 if (ignoreException(cause)) {
690 return;
691 }
692 }
693 }
694
695 ctx.sendUpstream(e);
696 }
697
698
699
700
701
702
703
704
705
706
707 private boolean ignoreException(Throwable t) {
708 if (!(t instanceof SSLException) && t instanceof IOException && engine.isOutboundDone()) {
709 String message = String.valueOf(t.getMessage()).toLowerCase();
710
711
712
713 if (IGNORABLE_ERROR_MESSAGE.matcher(message).matches()) {
714 return true;
715 }
716
717
718 StackTraceElement[] elements = t.getStackTrace();
719 for (StackTraceElement element: elements) {
720 String classname = element.getClassName();
721 String methodname = element.getMethodName();
722
723
724 if (classname.startsWith("org.jboss.netty.")) {
725 continue;
726 }
727
728
729 if (!"read".equals(methodname)) {
730 continue;
731 }
732
733
734
735 if (IGNORABLE_CLASS_IN_STACK.matcher(classname).matches()) {
736 return true;
737 }
738
739 try {
740
741
742
743 Class<?> clazz = getClass().getClassLoader().loadClass(classname);
744
745 if (SocketChannel.class.isAssignableFrom(clazz)
746 || DatagramChannel.class.isAssignableFrom(clazz)) {
747 return true;
748 }
749
750
751 if (DetectionUtil.javaVersion() >= 7
752 && "com.sun.nio.sctp.SctpChannel".equals(clazz.getSuperclass().getName())) {
753 return true;
754 }
755 } catch (ClassNotFoundException e) {
756
757 }
758 }
759 }
760
761 return false;
762 }
763
764
765
766
767
768
769
770
771
772
773
774
775
776 public static boolean isEncrypted(ChannelBuffer buffer) {
777 return getEncryptedPacketLength(buffer) != -1;
778 }
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793 private static int getEncryptedPacketLength(ChannelBuffer buffer) {
794 if (buffer.readableBytes() < 5) {
795 throw new IllegalArgumentException("buffer must have at least 5 readable bytes");
796 }
797
798 int packetLength = 0;
799
800
801 boolean tls;
802 switch (buffer.getUnsignedByte(buffer.readerIndex())) {
803 case 20:
804 case 21:
805 case 22:
806 case 23:
807 tls = true;
808 break;
809 default:
810
811 tls = false;
812 }
813
814 if (tls) {
815
816 int majorVersion = buffer.getUnsignedByte(buffer.readerIndex() + 1);
817 if (majorVersion == 3) {
818
819 packetLength = (getShort(buffer, buffer.readerIndex() + 3) & 0xFFFF) + 5;
820 if (packetLength <= 5) {
821
822 tls = false;
823 }
824 } else {
825
826 tls = false;
827 }
828 }
829
830 if (!tls) {
831
832 boolean sslv2 = true;
833 int headerLength = (buffer.getUnsignedByte(
834 buffer.readerIndex()) & 0x80) != 0 ? 2 : 3;
835 int majorVersion = buffer.getUnsignedByte(
836 buffer.readerIndex() + headerLength + 1);
837 if (majorVersion == 2 || majorVersion == 3) {
838
839 if (headerLength == 2) {
840 packetLength = (getShort(buffer, buffer.readerIndex()) & 0x7FFF) + 2;
841 } else {
842 packetLength = (getShort(buffer, buffer.readerIndex()) & 0x3FFF) + 3;
843 }
844 if (packetLength <= headerLength) {
845 sslv2 = false;
846 }
847 } else {
848 sslv2 = false;
849 }
850
851 if (!sslv2) {
852 return -1;
853 }
854 }
855 return packetLength;
856 }
857
858 @Override
859 protected Object decode(
860 final ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
861
862
863 if (packetLength == Integer.MIN_VALUE) {
864 if (buffer.readableBytes() < 5) {
865 return null;
866 }
867 int packetLength = getEncryptedPacketLength(buffer);
868
869 if (packetLength == -1) {
870
871 NotSslRecordException e = new NotSslRecordException(
872 "not an SSL/TLS record: " + ChannelBuffers.hexDump(buffer));
873 buffer.skipBytes(buffer.readableBytes());
874
875 if (closeOnSSLException) {
876
877 fireExceptionCaught(ctx, e);
878 Channels.close(ctx, future(channel));
879
880
881
882 return null;
883 } else {
884 throw e;
885 }
886 }
887
888 assert packetLength > 0;
889 this.packetLength = packetLength;
890 }
891
892 if (buffer.readableBytes() < packetLength) {
893 return null;
894 }
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910 final int packetOffset = buffer.readerIndex();
911 buffer.skipBytes(packetLength);
912 try {
913 return unwrap(ctx, channel, buffer, packetOffset, packetLength);
914 } finally {
915
916 packetLength = Integer.MIN_VALUE;
917 }
918 }
919
920
921
922
923
924 private static short getShort(ChannelBuffer buf, int offset) {
925 return (short) (buf.getByte(offset) << 8 | buf.getByte(offset + 1) & 0xFF);
926 }
927
928 private void wrap(ChannelHandlerContext context, Channel channel)
929 throws SSLException {
930
931 ChannelBuffer msg;
932 ByteBuffer outNetBuf = bufferPool.acquireBuffer();
933 boolean success = true;
934 boolean offered = false;
935 boolean needsUnwrap = false;
936 PendingWrite pendingWrite = null;
937
938 try {
939 loop:
940 for (;;) {
941
942
943
944 pendingUnencryptedWritesLock.lock();
945 try {
946 pendingWrite = pendingUnencryptedWrites.peek();
947 if (pendingWrite == null) {
948 break;
949 }
950
951 ByteBuffer outAppBuf = pendingWrite.outAppBuf;
952 if (outAppBuf == null) {
953
954 pendingUnencryptedWrites.remove();
955 offerEncryptedWriteRequest(
956 new DownstreamMessageEvent(
957 channel, pendingWrite.future,
958 ChannelBuffers.EMPTY_BUFFER,
959 channel.getRemoteAddress()));
960 offered = true;
961 } else {
962 synchronized (handshakeLock) {
963 SSLEngineResult result = null;
964 try {
965 result = engine.wrap(outAppBuf, outNetBuf);
966 } finally {
967 if (!outAppBuf.hasRemaining()) {
968 pendingUnencryptedWrites.remove();
969 }
970 }
971
972 if (result.bytesProduced() > 0) {
973 outNetBuf.flip();
974 int remaining = outNetBuf.remaining();
975 msg = ctx.getChannel().getConfig().getBufferFactory().getBuffer(remaining);
976
977
978
979
980
981 msg.writeBytes(outNetBuf);
982 outNetBuf.clear();
983
984 ChannelFuture future;
985 if (pendingWrite.outAppBuf.hasRemaining()) {
986
987
988 future = succeededFuture(channel);
989 } else {
990 future = pendingWrite.future;
991 }
992
993 MessageEvent encryptedWrite = new DownstreamMessageEvent(
994 channel, future, msg, channel.getRemoteAddress());
995 offerEncryptedWriteRequest(encryptedWrite);
996 offered = true;
997 } else if (result.getStatus() == Status.CLOSED) {
998
999
1000 success = false;
1001 break;
1002 } else {
1003 final HandshakeStatus handshakeStatus = result.getHandshakeStatus();
1004 handleRenegotiation(handshakeStatus);
1005 switch (handshakeStatus) {
1006 case NEED_WRAP:
1007 if (outAppBuf.hasRemaining()) {
1008 break;
1009 } else {
1010 break loop;
1011 }
1012 case NEED_UNWRAP:
1013 needsUnwrap = true;
1014 break loop;
1015 case NEED_TASK:
1016 runDelegatedTasks();
1017 break;
1018 case FINISHED:
1019 case NOT_HANDSHAKING:
1020 if (handshakeStatus == HandshakeStatus.FINISHED) {
1021 setHandshakeSuccess(channel);
1022 }
1023 if (result.getStatus() == Status.CLOSED) {
1024 success = false;
1025 }
1026 break loop;
1027 default:
1028 throw new IllegalStateException(
1029 "Unknown handshake status: " +
1030 handshakeStatus);
1031 }
1032 }
1033 }
1034 }
1035 } finally {
1036 pendingUnencryptedWritesLock.unlock();
1037 }
1038 }
1039 } catch (SSLException e) {
1040 success = false;
1041 setHandshakeFailure(channel, e);
1042 throw e;
1043 } finally {
1044 bufferPool.releaseBuffer(outNetBuf);
1045
1046 if (offered) {
1047 flushPendingEncryptedWrites(context);
1048 }
1049
1050 if (!success) {
1051 IllegalStateException cause =
1052 new IllegalStateException("SSLEngine already closed");
1053
1054
1055
1056 if (pendingWrite != null) {
1057 pendingWrite.future.setFailure(cause);
1058 }
1059
1060
1061
1062
1063
1064 for (;;) {
1065 pendingUnencryptedWritesLock.lock();
1066 try {
1067 pendingWrite = pendingUnencryptedWrites.poll();
1068 if (pendingWrite == null) {
1069 break;
1070 }
1071 } finally {
1072 pendingUnencryptedWritesLock.unlock();
1073 }
1074
1075 pendingWrite.future.setFailure(cause);
1076 }
1077 }
1078 }
1079
1080 if (needsUnwrap) {
1081 unwrap(context, channel, ChannelBuffers.EMPTY_BUFFER, 0, 0);
1082 }
1083 }
1084
1085 private void offerEncryptedWriteRequest(MessageEvent encryptedWrite) {
1086 final boolean locked = pendingEncryptedWritesLock.tryLock();
1087 try {
1088 pendingEncryptedWrites.add(encryptedWrite);
1089 } finally {
1090 if (locked) {
1091 pendingEncryptedWritesLock.unlock();
1092 }
1093 }
1094 }
1095
1096 private void flushPendingEncryptedWrites(ChannelHandlerContext ctx) {
1097 while (!pendingEncryptedWrites.isEmpty()) {
1098
1099
1100
1101 if (!pendingEncryptedWritesLock.tryLock()) {
1102 return;
1103 }
1104
1105 try {
1106 MessageEvent e;
1107 while ((e = pendingEncryptedWrites.poll()) != null) {
1108 ctx.sendDownstream(e);
1109 }
1110 } finally {
1111 pendingEncryptedWritesLock.unlock();
1112 }
1113
1114
1115 }
1116 }
1117
1118 private ChannelFuture wrapNonAppData(ChannelHandlerContext ctx, Channel channel) throws SSLException {
1119 ChannelFuture future = null;
1120 ByteBuffer outNetBuf = bufferPool.acquireBuffer();
1121
1122 SSLEngineResult result;
1123 try {
1124 for (;;) {
1125 synchronized (handshakeLock) {
1126 result = engine.wrap(EMPTY_BUFFER, outNetBuf);
1127 }
1128
1129 if (result.bytesProduced() > 0) {
1130 outNetBuf.flip();
1131 ChannelBuffer msg =
1132 ctx.getChannel().getConfig().getBufferFactory().getBuffer(outNetBuf.remaining());
1133
1134
1135
1136
1137
1138 msg.writeBytes(outNetBuf);
1139 outNetBuf.clear();
1140
1141 future = future(channel);
1142 future.addListener(new ChannelFutureListener() {
1143 public void operationComplete(ChannelFuture future)
1144 throws Exception {
1145 if (future.getCause() instanceof ClosedChannelException) {
1146 synchronized (ignoreClosedChannelExceptionLock) {
1147 ignoreClosedChannelException ++;
1148 }
1149 }
1150 }
1151 });
1152
1153 write(ctx, future, msg);
1154 }
1155
1156 final HandshakeStatus handshakeStatus = result.getHandshakeStatus();
1157 handleRenegotiation(handshakeStatus);
1158 switch (handshakeStatus) {
1159 case FINISHED:
1160 setHandshakeSuccess(channel);
1161 runDelegatedTasks();
1162 break;
1163 case NEED_TASK:
1164 runDelegatedTasks();
1165 break;
1166 case NEED_UNWRAP:
1167 if (!Thread.holdsLock(handshakeLock)) {
1168
1169
1170
1171 unwrap(ctx, channel, ChannelBuffers.EMPTY_BUFFER, 0, 0);
1172 }
1173 break;
1174 case NOT_HANDSHAKING:
1175 case NEED_WRAP:
1176 break;
1177 default:
1178 throw new IllegalStateException(
1179 "Unexpected handshake status: " + handshakeStatus);
1180 }
1181
1182 if (result.bytesProduced() == 0) {
1183 break;
1184 }
1185 }
1186 } catch (SSLException e) {
1187 setHandshakeFailure(channel, e);
1188 throw e;
1189 } finally {
1190 bufferPool.releaseBuffer(outNetBuf);
1191 }
1192
1193 if (future == null) {
1194 future = succeededFuture(channel);
1195 }
1196
1197 return future;
1198 }
1199
1200 private ChannelBuffer unwrap(
1201 ChannelHandlerContext ctx, Channel channel,
1202 ChannelBuffer buffer, int offset, int length) throws SSLException {
1203 ByteBuffer inNetBuf = buffer.toByteBuffer(offset, length);
1204 ByteBuffer outAppBuf = bufferPool.acquireBuffer();
1205
1206 try {
1207 boolean needsWrap = false;
1208 loop:
1209 for (;;) {
1210 SSLEngineResult result;
1211 boolean needsHandshake = false;
1212 synchronized (handshakeLock) {
1213 if (!handshaken && !handshaking &&
1214 !engine.getUseClientMode() &&
1215 !engine.isInboundDone() && !engine.isOutboundDone()) {
1216 needsHandshake = true;
1217 }
1218 }
1219
1220 if (needsHandshake) {
1221 handshake();
1222 }
1223
1224 synchronized (handshakeLock) {
1225 result = engine.unwrap(inNetBuf, outAppBuf);
1226
1227 switch (result.getStatus()) {
1228 case CLOSED:
1229
1230 sslEngineCloseFuture.setClosed();
1231 break;
1232 case BUFFER_OVERFLOW:
1233 throw new SSLException("SSLEngine.unwrap() reported an impossible buffer overflow.");
1234 }
1235
1236 final HandshakeStatus handshakeStatus = result.getHandshakeStatus();
1237 handleRenegotiation(handshakeStatus);
1238 switch (handshakeStatus) {
1239 case NEED_UNWRAP:
1240 if (inNetBuf.hasRemaining() && !engine.isInboundDone()) {
1241 break;
1242 } else {
1243 break loop;
1244 }
1245 case NEED_WRAP:
1246 wrapNonAppData(ctx, channel);
1247 break;
1248 case NEED_TASK:
1249 runDelegatedTasks();
1250 break;
1251 case FINISHED:
1252 setHandshakeSuccess(channel);
1253 needsWrap = true;
1254 break loop;
1255 case NOT_HANDSHAKING:
1256 needsWrap = true;
1257 break loop;
1258 default:
1259 throw new IllegalStateException(
1260 "Unknown handshake status: " + handshakeStatus);
1261 }
1262 }
1263 }
1264 if (needsWrap) {
1265
1266
1267
1268
1269
1270
1271
1272
1273 if (!Thread.holdsLock(handshakeLock) &&
1274 !pendingEncryptedWritesLock.isHeldByCurrentThread()) {
1275 wrap(ctx, channel);
1276 }
1277 }
1278 outAppBuf.flip();
1279
1280 if (outAppBuf.hasRemaining()) {
1281 ChannelBuffer frame = ctx.getChannel().getConfig().getBufferFactory().getBuffer(outAppBuf.remaining());
1282
1283
1284
1285
1286 frame.writeBytes(outAppBuf);
1287
1288 return frame;
1289 } else {
1290 return null;
1291 }
1292 } catch (SSLException e) {
1293 setHandshakeFailure(channel, e);
1294 throw e;
1295 } finally {
1296 bufferPool.releaseBuffer(outAppBuf);
1297 }
1298 }
1299
1300 private void handleRenegotiation(HandshakeStatus handshakeStatus) {
1301 synchronized (handshakeLock) {
1302 if (handshakeStatus == HandshakeStatus.NOT_HANDSHAKING ||
1303 handshakeStatus == HandshakeStatus.FINISHED) {
1304
1305 return;
1306 }
1307
1308 if (!handshaken) {
1309
1310 return;
1311 }
1312
1313 final boolean renegotiate;
1314 if (handshaking) {
1315
1316
1317 return;
1318 }
1319
1320 if (engine.isInboundDone() || engine.isOutboundDone()) {
1321
1322 return;
1323 }
1324
1325 if (isEnableRenegotiation()) {
1326
1327 renegotiate = true;
1328 } else {
1329
1330 renegotiate = false;
1331
1332 handshaking = true;
1333 }
1334
1335 if (renegotiate) {
1336
1337 handshake();
1338 } else {
1339
1340 fireExceptionCaught(
1341 ctx, new SSLException(
1342 "renegotiation attempted by peer; " +
1343 "closing the connection"));
1344
1345
1346 Channels.close(ctx, succeededFuture(ctx.getChannel()));
1347 }
1348 }
1349 }
1350
1351 private void runDelegatedTasks() {
1352 for (;;) {
1353 final Runnable task;
1354 synchronized (handshakeLock) {
1355 task = engine.getDelegatedTask();
1356 }
1357
1358 if (task == null) {
1359 break;
1360 }
1361
1362 delegatedTaskExecutor.execute(new Runnable() {
1363 public void run() {
1364 synchronized (handshakeLock) {
1365 task.run();
1366 }
1367 }
1368 });
1369 }
1370 }
1371
1372 private void setHandshakeSuccess(Channel channel) {
1373 synchronized (handshakeLock) {
1374 handshaking = false;
1375 handshaken = true;
1376
1377 if (handshakeFuture == null) {
1378 handshakeFuture = future(channel);
1379 }
1380 cancelHandshakeTimeout();
1381 }
1382
1383 handshakeFuture.setSuccess();
1384 }
1385
1386 private void setHandshakeFailure(Channel channel, SSLException cause) {
1387 synchronized (handshakeLock) {
1388 if (!handshaking) {
1389 return;
1390 }
1391 handshaking = false;
1392 handshaken = false;
1393
1394 if (handshakeFuture == null) {
1395 handshakeFuture = future(channel);
1396 }
1397
1398
1399 cancelHandshakeTimeout();
1400
1401
1402
1403
1404 engine.closeOutbound();
1405
1406 try {
1407 engine.closeInbound();
1408 } catch (SSLException e) {
1409 if (logger.isDebugEnabled()) {
1410 logger.debug(
1411 "SSLEngine.closeInbound() raised an exception after " +
1412 "a handshake failure.", e);
1413 }
1414 }
1415 }
1416
1417 handshakeFuture.setFailure(cause);
1418 if (closeOnSSLException) {
1419 Channels.close(ctx, future(channel));
1420 }
1421 }
1422
1423 private void closeOutboundAndChannel(
1424 final ChannelHandlerContext context, final ChannelStateEvent e) {
1425 if (!e.getChannel().isConnected()) {
1426 context.sendDownstream(e);
1427 return;
1428 }
1429
1430 boolean passthrough = true;
1431 try {
1432 try {
1433 unwrap(context, e.getChannel(), ChannelBuffers.EMPTY_BUFFER, 0, 0);
1434 } catch (SSLException ex) {
1435 if (logger.isDebugEnabled()) {
1436 logger.debug("Failed to unwrap before sending a close_notify message", ex);
1437 }
1438 }
1439
1440 if (!engine.isOutboundDone()) {
1441 if (sentCloseNotify.compareAndSet(false, true)) {
1442 engine.closeOutbound();
1443 try {
1444 ChannelFuture closeNotifyFuture = wrapNonAppData(context, e.getChannel());
1445 closeNotifyFuture.addListener(
1446 new ClosingChannelFutureListener(context, e));
1447 passthrough = false;
1448 } catch (SSLException ex) {
1449 if (logger.isDebugEnabled()) {
1450 logger.debug("Failed to encode a close_notify message", ex);
1451 }
1452 }
1453 }
1454 }
1455 } finally {
1456 if (passthrough) {
1457 context.sendDownstream(e);
1458 }
1459 }
1460 }
1461
1462 private static final class PendingWrite {
1463 final ChannelFuture future;
1464 final ByteBuffer outAppBuf;
1465
1466 PendingWrite(ChannelFuture future, ByteBuffer outAppBuf) {
1467 this.future = future;
1468 this.outAppBuf = outAppBuf;
1469 }
1470 }
1471
1472 private static final class ClosingChannelFutureListener implements ChannelFutureListener {
1473
1474 private final ChannelHandlerContext context;
1475 private final ChannelStateEvent e;
1476
1477 ClosingChannelFutureListener(
1478 ChannelHandlerContext context, ChannelStateEvent e) {
1479 this.context = context;
1480 this.e = e;
1481 }
1482
1483 public void operationComplete(ChannelFuture closeNotifyFuture) throws Exception {
1484 if (!(closeNotifyFuture.getCause() instanceof ClosedChannelException)) {
1485 Channels.close(context, e.getFuture());
1486 } else {
1487 e.getFuture().setSuccess();
1488 }
1489 }
1490 }
1491
1492 @Override
1493 public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
1494 super.beforeAdd(ctx);
1495 this.ctx = ctx;
1496 }
1497
1498
1499
1500
1501 @Override
1502 public void afterRemove(ChannelHandlerContext ctx) throws Exception {
1503
1504
1505 Throwable cause = null;
1506 for (;;) {
1507 PendingWrite pw = pendingUnencryptedWrites.poll();
1508 if (pw == null) {
1509 break;
1510 }
1511 if (cause == null) {
1512 cause = new IOException("Unable to write data");
1513 }
1514 pw.future.setFailure(cause);
1515 }
1516
1517 for (;;) {
1518 MessageEvent ev = pendingEncryptedWrites.poll();
1519 if (ev == null) {
1520 break;
1521 }
1522 if (cause == null) {
1523 cause = new IOException("Unable to write data");
1524 }
1525 ev.getFuture().setFailure(cause);
1526 }
1527
1528 if (cause != null) {
1529 fireExceptionCaughtLater(ctx, cause);
1530 }
1531 }
1532
1533
1534
1535
1536 @Override
1537 public void channelConnected(final ChannelHandlerContext ctx, final ChannelStateEvent e) throws Exception {
1538 if (issueHandshake) {
1539
1540
1541 handshake().addListener(new ChannelFutureListener() {
1542
1543 public void operationComplete(ChannelFuture future) throws Exception {
1544 if (future.isSuccess()) {
1545
1546
1547
1548 ctx.sendUpstream(e);
1549 }
1550 }
1551 });
1552 } else {
1553 super.channelConnected(ctx, e);
1554 }
1555 }
1556
1557
1558
1559
1560
1561
1562 @Override
1563 public void channelClosed(final ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
1564
1565
1566 ctx.getPipeline().execute(new Runnable() {
1567 public void run() {
1568 if (!pendingUnencryptedWritesLock.tryLock()) {
1569 return;
1570 }
1571
1572 Throwable cause = null;
1573 try {
1574 for (;;) {
1575 PendingWrite pw = pendingUnencryptedWrites.poll();
1576 if (pw == null) {
1577 break;
1578 }
1579 if (cause == null) {
1580 cause = new ClosedChannelException();
1581 }
1582 pw.future.setFailure(cause);
1583 }
1584
1585 for (;;) {
1586 MessageEvent ev = pendingEncryptedWrites.poll();
1587 if (ev == null) {
1588 break;
1589 }
1590 if (cause == null) {
1591 cause = new ClosedChannelException();
1592 }
1593 ev.getFuture().setFailure(cause);
1594 }
1595 } finally {
1596 pendingUnencryptedWritesLock.unlock();
1597 }
1598
1599 if (cause != null) {
1600 fireExceptionCaught(ctx, cause);
1601 }
1602 }
1603 });
1604
1605 super.channelClosed(ctx, e);
1606 }
1607
1608 private final class SSLEngineInboundCloseFuture extends DefaultChannelFuture {
1609 public SSLEngineInboundCloseFuture() {
1610 super(null, true);
1611 }
1612
1613 void setClosed() {
1614 super.setSuccess();
1615 }
1616
1617 @Override
1618 public Channel getChannel() {
1619 if (ctx == null) {
1620
1621 return null;
1622 } else {
1623 return ctx.getChannel();
1624 }
1625 }
1626
1627 @Override
1628 public boolean setSuccess() {
1629 return false;
1630 }
1631
1632 @Override
1633 public boolean setFailure(Throwable cause) {
1634 return false;
1635 }
1636 }
1637 }