1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.handler.ssl;
17
18 import org.jboss.netty.buffer.ChannelBuffer;
19 import org.jboss.netty.buffer.ChannelBufferFactory;
20 import org.jboss.netty.buffer.ChannelBuffers;
21 import org.jboss.netty.channel.Channel;
22 import org.jboss.netty.channel.ChannelDownstreamHandler;
23 import org.jboss.netty.channel.ChannelEvent;
24 import org.jboss.netty.channel.ChannelFuture;
25 import org.jboss.netty.channel.ChannelFutureListener;
26 import org.jboss.netty.channel.ChannelHandlerContext;
27 import org.jboss.netty.channel.ChannelPipeline;
28 import org.jboss.netty.channel.ChannelStateEvent;
29 import org.jboss.netty.channel.Channels;
30 import org.jboss.netty.channel.DefaultChannelFuture;
31 import org.jboss.netty.channel.DownstreamMessageEvent;
32 import org.jboss.netty.channel.ExceptionEvent;
33 import org.jboss.netty.channel.MessageEvent;
34 import org.jboss.netty.handler.codec.frame.FrameDecoder;
35 import org.jboss.netty.logging.InternalLogger;
36 import org.jboss.netty.logging.InternalLoggerFactory;
37 import org.jboss.netty.util.Timeout;
38 import org.jboss.netty.util.Timer;
39 import org.jboss.netty.util.TimerTask;
40 import org.jboss.netty.util.internal.DetectionUtil;
41 import org.jboss.netty.util.internal.NonReentrantLock;
42
43 import javax.net.ssl.SSLEngine;
44 import javax.net.ssl.SSLEngineResult;
45 import javax.net.ssl.SSLEngineResult.HandshakeStatus;
46 import javax.net.ssl.SSLEngineResult.Status;
47 import javax.net.ssl.SSLException;
48 import java.io.IOException;
49 import java.nio.ByteBuffer;
50 import java.nio.channels.ClosedChannelException;
51 import java.nio.channels.DatagramChannel;
52 import java.nio.channels.SocketChannel;
53 import java.util.ArrayList;
54 import java.util.LinkedList;
55 import java.util.List;
56 import java.util.Queue;
57 import java.util.concurrent.ConcurrentLinkedQueue;
58 import java.util.concurrent.TimeUnit;
59 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
60 import java.util.regex.Pattern;
61
62 import static org.jboss.netty.channel.Channels.*;
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180 public class SslHandler extends FrameDecoder
181 implements ChannelDownstreamHandler {
182
183 private static final InternalLogger logger = InternalLoggerFactory.getInstance(SslHandler.class);
184
185 private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);
186
187 private static final Pattern IGNORABLE_CLASS_IN_STACK = Pattern.compile(
188 "^.*(?:Socket|Datagram|Sctp|Udt)Channel.*$");
189 private static final Pattern IGNORABLE_ERROR_MESSAGE = Pattern.compile(
190 "^.*(?:connection.*(?:reset|closed|abort|broken)|broken.*pipe).*$", Pattern.CASE_INSENSITIVE);
191
192 private static SslBufferPool defaultBufferPool;
193
194
195
196
197
198 public static synchronized SslBufferPool getDefaultBufferPool() {
199 if (defaultBufferPool == null) {
200 defaultBufferPool = new SslBufferPool();
201 }
202 return defaultBufferPool;
203 }
204
205 private volatile ChannelHandlerContext ctx;
206 private final SSLEngine engine;
207 private final SslBufferPool bufferPool;
208 private final boolean startTls;
209
210 private volatile boolean enableRenegotiation = true;
211
212 final Object handshakeLock = new Object();
213 private boolean handshaking;
214 private volatile boolean handshaken;
215 private volatile ChannelFuture handshakeFuture;
216
217 @SuppressWarnings("UnusedDeclaration")
218 private volatile int sentFirstMessage;
219 @SuppressWarnings("UnusedDeclaration")
220 private volatile int sentCloseNotify;
221 @SuppressWarnings("UnusedDeclaration")
222 private volatile int closedOutboundAndChannel;
223
224 private static final AtomicIntegerFieldUpdater<SslHandler> SENT_FIRST_MESSAGE_UPDATER =
225 AtomicIntegerFieldUpdater.newUpdater(SslHandler.class, "sentFirstMessage");
226 private static final AtomicIntegerFieldUpdater<SslHandler> SENT_CLOSE_NOTIFY_UPDATER =
227 AtomicIntegerFieldUpdater.newUpdater(SslHandler.class, "sentCloseNotify");
228 private static final AtomicIntegerFieldUpdater<SslHandler> CLOSED_OUTBOUND_AND_CHANNEL_UPDATER =
229 AtomicIntegerFieldUpdater.newUpdater(SslHandler.class, "closedOutboundAndChannel");
230
231 int ignoreClosedChannelException;
232 final Object ignoreClosedChannelExceptionLock = new Object();
233 private final Queue<PendingWrite> pendingUnencryptedWrites = new LinkedList<PendingWrite>();
234 private final NonReentrantLock pendingUnencryptedWritesLock = new NonReentrantLock();
235 private final Queue<MessageEvent> pendingEncryptedWrites = new ConcurrentLinkedQueue<MessageEvent>();
236 private final NonReentrantLock pendingEncryptedWritesLock = new NonReentrantLock();
237
238 private volatile boolean issueHandshake;
239 private volatile boolean writeBeforeHandshakeDone;
240 private final SSLEngineInboundCloseFuture sslEngineCloseFuture = new SSLEngineInboundCloseFuture();
241
242 private boolean closeOnSslException;
243
244 private int packetLength;
245
246 private final Timer timer;
247 private final long handshakeTimeoutInMillis;
248 private Timeout handshakeTimeout;
249
250
251
252
253
254
255 public SslHandler(SSLEngine engine) {
256 this(engine, getDefaultBufferPool(), false, null, 0);
257 }
258
259
260
261
262
263
264
265
266 public SslHandler(SSLEngine engine, SslBufferPool bufferPool) {
267 this(engine, bufferPool, false, null, 0);
268 }
269
270
271
272
273
274
275
276
277 public SslHandler(SSLEngine engine, boolean startTls) {
278 this(engine, getDefaultBufferPool(), startTls);
279 }
280
281
282
283
284
285
286
287
288
289
290 public SslHandler(SSLEngine engine, SslBufferPool bufferPool, boolean startTls) {
291 this(engine, bufferPool, startTls, null, 0);
292 }
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312 public SslHandler(SSLEngine engine, SslBufferPool bufferPool, boolean startTls, Timer timer,
313 long handshakeTimeoutInMillis) {
314 if (engine == null) {
315 throw new NullPointerException("engine");
316 }
317 if (bufferPool == null) {
318 throw new NullPointerException("bufferPool");
319 }
320 if (timer == null && handshakeTimeoutInMillis > 0) {
321 throw new IllegalArgumentException("No Timer was given but a handshakeTimeoutInMillis, need both or none");
322 }
323
324 this.engine = engine;
325 this.bufferPool = bufferPool;
326 this.startTls = startTls;
327 this.timer = timer;
328 this.handshakeTimeoutInMillis = handshakeTimeoutInMillis;
329 }
330
331
332
333
334 public SSLEngine getEngine() {
335 return engine;
336 }
337
338
339
340
341
342
343
344 public ChannelFuture handshake() {
345 synchronized (handshakeLock) {
346 if (handshaken && !isEnableRenegotiation()) {
347 throw new IllegalStateException("renegotiation disabled");
348 }
349
350 final ChannelHandlerContext ctx = this.ctx;
351 final Channel channel = ctx.getChannel();
352 ChannelFuture handshakeFuture;
353 Exception exception = null;
354
355 if (handshaking) {
356 return this.handshakeFuture;
357 }
358
359 handshaking = true;
360 try {
361 engine.beginHandshake();
362 runDelegatedTasks();
363 handshakeFuture = this.handshakeFuture = future(channel);
364 if (handshakeTimeoutInMillis > 0) {
365 handshakeTimeout = timer.newTimeout(new TimerTask() {
366 public void run(Timeout timeout) throws Exception {
367 ChannelFuture future = SslHandler.this.handshakeFuture;
368 if (future != null && future.isDone()) {
369 return;
370 }
371
372 setHandshakeFailure(channel, new SSLException("Handshake did not complete within " +
373 handshakeTimeoutInMillis + "ms"));
374 }
375 }, handshakeTimeoutInMillis, TimeUnit.MILLISECONDS);
376 }
377 } catch (Exception e) {
378 handshakeFuture = this.handshakeFuture = failedFuture(channel, e);
379 exception = e;
380 }
381
382 if (exception == null) {
383 try {
384 final ChannelFuture hsFuture = handshakeFuture;
385 wrapNonAppData(ctx, channel).addListener(new ChannelFutureListener() {
386 public void operationComplete(ChannelFuture future) throws Exception {
387 if (!future.isSuccess()) {
388 Throwable cause = future.getCause();
389 hsFuture.setFailure(cause);
390
391 fireExceptionCaught(ctx, cause);
392 if (closeOnSslException) {
393 Channels.close(ctx, future(channel));
394 }
395 }
396 }
397 });
398 } catch (SSLException e) {
399 handshakeFuture.setFailure(e);
400
401 fireExceptionCaught(ctx, e);
402 if (closeOnSslException) {
403 Channels.close(ctx, future(channel));
404 }
405 }
406 } else {
407 fireExceptionCaught(ctx, exception);
408 if (closeOnSslException) {
409 Channels.close(ctx, future(channel));
410 }
411 }
412 return handshakeFuture;
413 }
414 }
415
416
417
418
419
420 public ChannelFuture close() {
421 ChannelHandlerContext ctx = this.ctx;
422 Channel channel = ctx.getChannel();
423 try {
424 engine.closeOutbound();
425 return wrapNonAppData(ctx, channel);
426 } catch (SSLException e) {
427 fireExceptionCaught(ctx, e);
428 if (closeOnSslException) {
429 Channels.close(ctx, future(channel));
430 }
431 return failedFuture(channel, e);
432 }
433 }
434
435
436
437
438 public boolean isEnableRenegotiation() {
439 return enableRenegotiation;
440 }
441
442
443
444
445 public void setEnableRenegotiation(boolean enableRenegotiation) {
446 this.enableRenegotiation = enableRenegotiation;
447 }
448
449
450
451
452
453
454 public void setIssueHandshake(boolean issueHandshake) {
455 this.issueHandshake = issueHandshake;
456 }
457
458
459
460
461 public boolean isIssueHandshake() {
462 return issueHandshake;
463 }
464
465
466
467
468
469
470
471
472
473 public ChannelFuture getSSLEngineInboundCloseFuture() {
474 return sslEngineCloseFuture;
475 }
476
477
478
479
480
481 public long getHandshakeTimeout() {
482 return handshakeTimeoutInMillis;
483 }
484
485
486
487
488
489
490
491
492
493
494 public void setCloseOnSSLException(boolean closeOnSslException) {
495 if (ctx != null) {
496 throw new IllegalStateException("Can only get changed before attached to ChannelPipeline");
497 }
498 this.closeOnSslException = closeOnSslException;
499 }
500
501 public boolean getCloseOnSSLException() {
502 return closeOnSslException;
503 }
504
505 public void handleDownstream(
506 final ChannelHandlerContext context, final ChannelEvent evt) throws Exception {
507 if (evt instanceof ChannelStateEvent) {
508 ChannelStateEvent e = (ChannelStateEvent) evt;
509 switch (e.getState()) {
510 case OPEN:
511 case CONNECTED:
512 case BOUND:
513 if (Boolean.FALSE.equals(e.getValue()) || e.getValue() == null) {
514 closeOutboundAndChannel(context, e);
515 return;
516 }
517 }
518 }
519 if (!(evt instanceof MessageEvent)) {
520 context.sendDownstream(evt);
521 return;
522 }
523
524 MessageEvent e = (MessageEvent) evt;
525 if (!(e.getMessage() instanceof ChannelBuffer)) {
526 context.sendDownstream(evt);
527 return;
528 }
529
530
531
532 if (startTls && SENT_FIRST_MESSAGE_UPDATER.compareAndSet(this, 0, 1)) {
533 context.sendDownstream(evt);
534 return;
535 }
536
537
538 ChannelBuffer msg = (ChannelBuffer) e.getMessage();
539 PendingWrite pendingWrite;
540
541 if (msg.readable()) {
542 pendingWrite = new PendingWrite(evt.getFuture(), msg.toByteBuffer(msg.readerIndex(), msg.readableBytes()));
543 } else {
544 pendingWrite = new PendingWrite(evt.getFuture(), null);
545 }
546
547 pendingUnencryptedWritesLock.lock();
548 try {
549 pendingUnencryptedWrites.add(pendingWrite);
550 } finally {
551 pendingUnencryptedWritesLock.unlock();
552 }
553
554 if (handshakeFuture == null || !handshakeFuture.isDone()) {
555 writeBeforeHandshakeDone = true;
556 }
557 wrap(context, evt.getChannel());
558 }
559
560 private void cancelHandshakeTimeout() {
561 if (handshakeTimeout != null) {
562
563 handshakeTimeout.cancel();
564 }
565 }
566
567 @Override
568 public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
569
570
571
572 synchronized (handshakeLock) {
573 if (handshaking) {
574 cancelHandshakeTimeout();
575 handshakeFuture.setFailure(new ClosedChannelException());
576 }
577 }
578
579 try {
580 super.channelDisconnected(ctx, e);
581 } finally {
582 unwrapNonAppData(ctx, e.getChannel(), false);
583 closeEngine();
584 }
585 }
586
587 private void closeEngine() {
588 engine.closeOutbound();
589 if (sentCloseNotify == 0 && handshaken) {
590 try {
591 engine.closeInbound();
592 } catch (SSLException ex) {
593 if (logger.isDebugEnabled()) {
594 logger.debug("Failed to clean up SSLEngine.", ex);
595 }
596 }
597 }
598 }
599
600 @Override
601 public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
602 throws Exception {
603
604 Throwable cause = e.getCause();
605 if (cause instanceof IOException) {
606 if (cause instanceof ClosedChannelException) {
607 synchronized (ignoreClosedChannelExceptionLock) {
608 if (ignoreClosedChannelException > 0) {
609 ignoreClosedChannelException --;
610 if (logger.isDebugEnabled()) {
611 logger.debug(
612 "Swallowing an exception raised while " +
613 "writing non-app data", cause);
614 }
615
616 return;
617 }
618 }
619 } else {
620 if (ignoreException(cause)) {
621 return;
622 }
623 }
624 }
625
626 ctx.sendUpstream(e);
627 }
628
629
630
631
632
633
634
635
636
637
638 private boolean ignoreException(Throwable t) {
639 if (!(t instanceof SSLException) && t instanceof IOException && engine.isOutboundDone()) {
640 String message = String.valueOf(t.getMessage()).toLowerCase();
641
642
643
644 if (IGNORABLE_ERROR_MESSAGE.matcher(message).matches()) {
645 return true;
646 }
647
648
649 StackTraceElement[] elements = t.getStackTrace();
650 for (StackTraceElement element: elements) {
651 String classname = element.getClassName();
652 String methodname = element.getMethodName();
653
654
655 if (classname.startsWith("org.jboss.netty.")) {
656 continue;
657 }
658
659
660 if (!"read".equals(methodname)) {
661 continue;
662 }
663
664
665
666 if (IGNORABLE_CLASS_IN_STACK.matcher(classname).matches()) {
667 return true;
668 }
669
670 try {
671
672
673
674 Class<?> clazz = getClass().getClassLoader().loadClass(classname);
675
676 if (SocketChannel.class.isAssignableFrom(clazz)
677 || DatagramChannel.class.isAssignableFrom(clazz)) {
678 return true;
679 }
680
681
682 if (DetectionUtil.javaVersion() >= 7
683 && "com.sun.nio.sctp.SctpChannel".equals(clazz.getSuperclass().getName())) {
684 return true;
685 }
686 } catch (ClassNotFoundException e) {
687
688 }
689 }
690 }
691
692 return false;
693 }
694
695
696
697
698
699
700
701
702
703
704
705
706
707 public static boolean isEncrypted(ChannelBuffer buffer) {
708 return getEncryptedPacketLength(buffer, buffer.readerIndex()) != -1;
709 }
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724 private static int getEncryptedPacketLength(ChannelBuffer buffer, int offset) {
725 int packetLength = 0;
726
727
728 boolean tls;
729 switch (buffer.getUnsignedByte(offset)) {
730 case 20:
731 case 21:
732 case 22:
733 case 23:
734 tls = true;
735 break;
736 default:
737
738 tls = false;
739 }
740
741 if (tls) {
742
743 int majorVersion = buffer.getUnsignedByte(offset + 1);
744 if (majorVersion == 3) {
745
746 packetLength = (getShort(buffer, offset + 3) & 0xFFFF) + 5;
747 if (packetLength <= 5) {
748
749 tls = false;
750 }
751 } else {
752
753 tls = false;
754 }
755 }
756
757 if (!tls) {
758
759 boolean sslv2 = true;
760 int headerLength = (buffer.getUnsignedByte(offset) & 0x80) != 0 ? 2 : 3;
761 int majorVersion = buffer.getUnsignedByte(offset + headerLength + 1);
762 if (majorVersion == 2 || majorVersion == 3) {
763
764 if (headerLength == 2) {
765 packetLength = (getShort(buffer, offset) & 0x7FFF) + 2;
766 } else {
767 packetLength = (getShort(buffer, offset) & 0x3FFF) + 3;
768 }
769 if (packetLength <= headerLength) {
770 sslv2 = false;
771 }
772 } else {
773 sslv2 = false;
774 }
775
776 if (!sslv2) {
777 return -1;
778 }
779 }
780 return packetLength;
781 }
782
783 @Override
784 protected Object decode(
785 final ChannelHandlerContext ctx, Channel channel, ChannelBuffer in) throws Exception {
786
787 final int startOffset = in.readerIndex();
788 final int endOffset = in.writerIndex();
789 int offset = startOffset;
790 int totalLength = 0;
791
792
793 if (packetLength > 0) {
794 if (endOffset - startOffset < packetLength) {
795 return null;
796 } else {
797 offset += packetLength;
798 totalLength = packetLength;
799 packetLength = 0;
800 }
801 }
802
803 boolean nonSslRecord = false;
804
805 while (totalLength < OpenSslEngine.MAX_ENCRYPTED_PACKET_LENGTH) {
806 final int readableBytes = endOffset - offset;
807 if (readableBytes < 5) {
808 break;
809 }
810
811 final int packetLength = getEncryptedPacketLength(in, offset);
812 if (packetLength == -1) {
813 nonSslRecord = true;
814 break;
815 }
816
817 assert packetLength > 0;
818
819 if (packetLength > readableBytes) {
820
821 this.packetLength = packetLength;
822 break;
823 }
824
825 int newTotalLength = totalLength + packetLength;
826 if (newTotalLength > OpenSslEngine.MAX_ENCRYPTED_PACKET_LENGTH) {
827
828 break;
829 }
830
831
832
833 offset += packetLength;
834 totalLength = newTotalLength;
835 }
836
837 ChannelBuffer unwrapped = null;
838 if (totalLength > 0) {
839
840
841
842
843
844
845
846
847
848
849
850 in.skipBytes(totalLength);
851 final ByteBuffer inNetBuf = in.toByteBuffer(startOffset, totalLength);
852 unwrapped = unwrap(ctx, channel, inNetBuf, totalLength, true);
853 }
854
855 if (nonSslRecord) {
856
857 NotSslRecordException e = new NotSslRecordException(
858 "not an SSL/TLS record: " + ChannelBuffers.hexDump(in));
859 in.skipBytes(in.readableBytes());
860 if (closeOnSslException) {
861
862 fireExceptionCaught(ctx, e);
863 Channels.close(ctx, future(channel));
864
865
866
867 return null;
868 } else {
869 throw e;
870 }
871 }
872
873 return unwrapped;
874 }
875
876
877
878
879
880 private static short getShort(ChannelBuffer buf, int offset) {
881 return (short) (buf.getByte(offset) << 8 | buf.getByte(offset + 1) & 0xFF);
882 }
883
884 private void wrap(ChannelHandlerContext context, Channel channel) throws SSLException {
885 ChannelBuffer msg;
886 ByteBuffer outNetBuf = bufferPool.acquireBuffer();
887 boolean success = true;
888 boolean offered = false;
889 boolean needsUnwrap = false;
890 PendingWrite pendingWrite = null;
891
892 try {
893 loop:
894 for (;;) {
895
896
897
898 pendingUnencryptedWritesLock.lock();
899 try {
900 pendingWrite = pendingUnencryptedWrites.peek();
901 if (pendingWrite == null) {
902 break;
903 }
904
905 ByteBuffer outAppBuf = pendingWrite.outAppBuf;
906 if (outAppBuf == null) {
907
908 pendingUnencryptedWrites.remove();
909 offerEncryptedWriteRequest(
910 new DownstreamMessageEvent(
911 channel, pendingWrite.future,
912 ChannelBuffers.EMPTY_BUFFER,
913 channel.getRemoteAddress()));
914 offered = true;
915 } else {
916 synchronized (handshakeLock) {
917 SSLEngineResult result = null;
918 try {
919 result = engine.wrap(outAppBuf, outNetBuf);
920 } finally {
921 if (!outAppBuf.hasRemaining()) {
922 pendingUnencryptedWrites.remove();
923 }
924 }
925
926 if (result.bytesProduced() > 0) {
927 outNetBuf.flip();
928 int remaining = outNetBuf.remaining();
929 msg = ctx.getChannel().getConfig().getBufferFactory().getBuffer(remaining);
930
931
932
933
934
935 msg.writeBytes(outNetBuf);
936 outNetBuf.clear();
937
938 ChannelFuture future;
939 if (pendingWrite.outAppBuf.hasRemaining()) {
940
941
942 future = succeededFuture(channel);
943 } else {
944 future = pendingWrite.future;
945 }
946
947 MessageEvent encryptedWrite = new DownstreamMessageEvent(
948 channel, future, msg, channel.getRemoteAddress());
949 offerEncryptedWriteRequest(encryptedWrite);
950 offered = true;
951 } else if (result.getStatus() == Status.CLOSED) {
952
953
954 success = false;
955 break;
956 } else {
957 final HandshakeStatus handshakeStatus = result.getHandshakeStatus();
958 handleRenegotiation(handshakeStatus);
959 switch (handshakeStatus) {
960 case NEED_WRAP:
961 if (outAppBuf.hasRemaining()) {
962 break;
963 } else {
964 break loop;
965 }
966 case NEED_UNWRAP:
967 needsUnwrap = true;
968 break loop;
969 case NEED_TASK:
970 runDelegatedTasks();
971 break;
972 case FINISHED:
973 setHandshakeSuccess(channel);
974 if (result.getStatus() == Status.CLOSED) {
975 success = false;
976 }
977 break loop;
978 case NOT_HANDSHAKING:
979 setHandshakeSuccessIfStillHandshaking(channel);
980 if (result.getStatus() == Status.CLOSED) {
981 success = false;
982 }
983 break loop;
984 default:
985 throw new IllegalStateException(
986 "Unknown handshake status: " +
987 handshakeStatus);
988 }
989 }
990 }
991 }
992 } finally {
993 pendingUnencryptedWritesLock.unlock();
994 }
995 }
996 } catch (SSLException e) {
997 success = false;
998 setHandshakeFailure(channel, e);
999 throw e;
1000 } finally {
1001 bufferPool.releaseBuffer(outNetBuf);
1002
1003 if (offered) {
1004 flushPendingEncryptedWrites(context);
1005 }
1006
1007 if (!success) {
1008 Exception cause = channel.isOpen()
1009 ? new SSLException("SSLEngine already closed")
1010 : new ClosedChannelException();
1011
1012
1013
1014 if (pendingWrite != null) {
1015 pendingWrite.future.setFailure(cause);
1016 }
1017
1018
1019
1020
1021
1022 for (;;) {
1023 pendingUnencryptedWritesLock.lock();
1024 try {
1025 pendingWrite = pendingUnencryptedWrites.poll();
1026 if (pendingWrite == null) {
1027 break;
1028 }
1029 } finally {
1030 pendingUnencryptedWritesLock.unlock();
1031 }
1032
1033 pendingWrite.future.setFailure(cause);
1034 }
1035 }
1036 }
1037
1038 if (needsUnwrap) {
1039 unwrapNonAppData(ctx, channel, true);
1040 }
1041 }
1042
1043 private void offerEncryptedWriteRequest(MessageEvent encryptedWrite) {
1044 final boolean locked = pendingEncryptedWritesLock.tryLock();
1045 try {
1046 pendingEncryptedWrites.add(encryptedWrite);
1047 } finally {
1048 if (locked) {
1049 pendingEncryptedWritesLock.unlock();
1050 }
1051 }
1052 }
1053
1054 private void flushPendingEncryptedWrites(ChannelHandlerContext ctx) {
1055 while (!pendingEncryptedWrites.isEmpty()) {
1056
1057
1058
1059 if (!pendingEncryptedWritesLock.tryLock()) {
1060 return;
1061 }
1062
1063 try {
1064 MessageEvent e;
1065 while ((e = pendingEncryptedWrites.poll()) != null) {
1066 ctx.sendDownstream(e);
1067 }
1068 } finally {
1069 pendingEncryptedWritesLock.unlock();
1070 }
1071
1072
1073 }
1074 }
1075
1076 private ChannelFuture wrapNonAppData(ChannelHandlerContext ctx, Channel channel) throws SSLException {
1077 ChannelFuture future = null;
1078 ByteBuffer outNetBuf = bufferPool.acquireBuffer();
1079
1080 SSLEngineResult result;
1081 try {
1082 for (;;) {
1083 synchronized (handshakeLock) {
1084 result = engine.wrap(EMPTY_BUFFER, outNetBuf);
1085 }
1086
1087 if (result.bytesProduced() > 0) {
1088 outNetBuf.flip();
1089 ChannelBuffer msg =
1090 ctx.getChannel().getConfig().getBufferFactory().getBuffer(outNetBuf.remaining());
1091
1092
1093
1094
1095
1096 msg.writeBytes(outNetBuf);
1097 outNetBuf.clear();
1098
1099 future = future(channel);
1100 future.addListener(new ChannelFutureListener() {
1101 public void operationComplete(ChannelFuture future)
1102 throws Exception {
1103 if (future.getCause() instanceof ClosedChannelException) {
1104 synchronized (ignoreClosedChannelExceptionLock) {
1105 ignoreClosedChannelException ++;
1106 }
1107 }
1108 }
1109 });
1110
1111 write(ctx, future, msg);
1112 }
1113
1114 final HandshakeStatus handshakeStatus = result.getHandshakeStatus();
1115 handleRenegotiation(handshakeStatus);
1116 switch (handshakeStatus) {
1117 case FINISHED:
1118 setHandshakeSuccess(channel);
1119 runDelegatedTasks();
1120 break;
1121 case NEED_TASK:
1122 runDelegatedTasks();
1123 break;
1124 case NEED_UNWRAP:
1125 if (!Thread.holdsLock(handshakeLock)) {
1126
1127
1128
1129 unwrapNonAppData(ctx, channel, true);
1130 }
1131 break;
1132 case NOT_HANDSHAKING:
1133 if (setHandshakeSuccessIfStillHandshaking(channel)) {
1134 runDelegatedTasks();
1135 }
1136 break;
1137 case NEED_WRAP:
1138 break;
1139 default:
1140 throw new IllegalStateException(
1141 "Unexpected handshake status: " + handshakeStatus);
1142 }
1143
1144 if (result.bytesProduced() == 0) {
1145 break;
1146 }
1147 }
1148 } catch (SSLException e) {
1149 setHandshakeFailure(channel, e);
1150 throw e;
1151 } finally {
1152 bufferPool.releaseBuffer(outNetBuf);
1153 }
1154
1155 if (future == null) {
1156 future = succeededFuture(channel);
1157 }
1158
1159 return future;
1160 }
1161
1162
1163
1164
1165 private void unwrapNonAppData(
1166 ChannelHandlerContext ctx, Channel channel, boolean mightNeedHandshake) throws SSLException {
1167 unwrap(ctx, channel, EMPTY_BUFFER, -1, mightNeedHandshake);
1168 }
1169
1170
1171
1172
1173 private ChannelBuffer unwrap(
1174 ChannelHandlerContext ctx, Channel channel,
1175 ByteBuffer nioInNetBuf,
1176 int initialNettyOutAppBufCapacity, boolean mightNeedHandshake) throws SSLException {
1177
1178 final int nioInNetBufStartOffset = nioInNetBuf.position();
1179 final ByteBuffer nioOutAppBuf = bufferPool.acquireBuffer();
1180
1181 ChannelBuffer nettyOutAppBuf = null;
1182
1183 try {
1184 boolean needsWrap = false;
1185 for (;;) {
1186 SSLEngineResult result;
1187 boolean needsHandshake = false;
1188 if (mightNeedHandshake) {
1189 synchronized (handshakeLock) {
1190 if (!handshaken && !handshaking &&
1191 !engine.getUseClientMode() &&
1192 !engine.isInboundDone() && !engine.isOutboundDone()) {
1193 needsHandshake = true;
1194 }
1195 }
1196 }
1197
1198 if (needsHandshake) {
1199 handshake();
1200 }
1201
1202 synchronized (handshakeLock) {
1203
1204
1205
1206
1207 for (;;) {
1208 final int outAppBufSize = engine.getSession().getApplicationBufferSize();
1209 final ByteBuffer outAppBuf;
1210 if (nioOutAppBuf.capacity() < outAppBufSize) {
1211
1212
1213 outAppBuf = ByteBuffer.allocate(outAppBufSize);
1214 } else {
1215 outAppBuf = nioOutAppBuf;
1216 }
1217
1218 try {
1219 result = engine.unwrap(nioInNetBuf, outAppBuf);
1220 switch (result.getStatus()) {
1221 case CLOSED:
1222
1223 sslEngineCloseFuture.setClosed();
1224 break;
1225 case BUFFER_OVERFLOW:
1226
1227
1228 continue;
1229 }
1230
1231 break;
1232 } finally {
1233 outAppBuf.flip();
1234
1235
1236 if (outAppBuf.hasRemaining()) {
1237 if (nettyOutAppBuf == null) {
1238 ChannelBufferFactory factory = ctx.getChannel().getConfig().getBufferFactory();
1239 nettyOutAppBuf = factory.getBuffer(initialNettyOutAppBufCapacity);
1240 }
1241 nettyOutAppBuf.writeBytes(outAppBuf);
1242 }
1243 outAppBuf.clear();
1244 }
1245 }
1246
1247 final HandshakeStatus handshakeStatus = result.getHandshakeStatus();
1248 handleRenegotiation(handshakeStatus);
1249 switch (handshakeStatus) {
1250 case NEED_UNWRAP:
1251 break;
1252 case NEED_WRAP:
1253 wrapNonAppData(ctx, channel);
1254 break;
1255 case NEED_TASK:
1256 runDelegatedTasks();
1257 break;
1258 case FINISHED:
1259 setHandshakeSuccess(channel);
1260 needsWrap = true;
1261 continue;
1262 case NOT_HANDSHAKING:
1263 if (setHandshakeSuccessIfStillHandshaking(channel)) {
1264 needsWrap = true;
1265 continue;
1266 }
1267 if (writeBeforeHandshakeDone) {
1268
1269
1270
1271 writeBeforeHandshakeDone = false;
1272 needsWrap = true;
1273 }
1274 break;
1275 default:
1276 throw new IllegalStateException(
1277 "Unknown handshake status: " + handshakeStatus);
1278 }
1279
1280 if (result.getStatus() == Status.BUFFER_UNDERFLOW ||
1281 result.bytesConsumed() == 0 && result.bytesProduced() == 0) {
1282 if (nioInNetBuf.hasRemaining() && !engine.isInboundDone()) {
1283
1284
1285
1286
1287
1288 logger.warn("Unexpected leftover data after SSLEngine.unwrap():"
1289 + " status=" + result.getStatus()
1290 + " handshakeStatus=" + result.getHandshakeStatus()
1291 + " consumed=" + result.bytesConsumed()
1292 + " produced=" + result.bytesProduced()
1293 + " remaining=" + nioInNetBuf.remaining()
1294 + " data=" + ChannelBuffers.hexDump(ChannelBuffers.wrappedBuffer(nioInNetBuf)));
1295 }
1296 break;
1297 }
1298 }
1299 }
1300
1301 if (needsWrap) {
1302
1303
1304
1305
1306
1307
1308
1309
1310 if (!Thread.holdsLock(handshakeLock) && !pendingEncryptedWritesLock.isHeldByCurrentThread()) {
1311 wrap(ctx, channel);
1312 }
1313 }
1314 } catch (SSLException e) {
1315 setHandshakeFailure(channel, e);
1316 throw e;
1317 } finally {
1318 bufferPool.releaseBuffer(nioOutAppBuf);
1319 }
1320
1321 if (nettyOutAppBuf != null && nettyOutAppBuf.readable()) {
1322 return nettyOutAppBuf;
1323 } else {
1324 return null;
1325 }
1326 }
1327
1328 private void handleRenegotiation(HandshakeStatus handshakeStatus) {
1329 synchronized (handshakeLock) {
1330 if (handshakeStatus == HandshakeStatus.NOT_HANDSHAKING ||
1331 handshakeStatus == HandshakeStatus.FINISHED) {
1332
1333 return;
1334 }
1335
1336 if (!handshaken) {
1337
1338 return;
1339 }
1340
1341 final boolean renegotiate;
1342 if (handshaking) {
1343
1344
1345 return;
1346 }
1347
1348 if (engine.isInboundDone() || engine.isOutboundDone()) {
1349
1350 return;
1351 }
1352
1353 if (isEnableRenegotiation()) {
1354
1355 renegotiate = true;
1356 } else {
1357
1358 renegotiate = false;
1359
1360 handshaking = true;
1361 }
1362
1363 if (renegotiate) {
1364
1365 handshake();
1366 } else {
1367
1368 fireExceptionCaught(
1369 ctx, new SSLException(
1370 "renegotiation attempted by peer; " +
1371 "closing the connection"));
1372
1373
1374 Channels.close(ctx, succeededFuture(ctx.getChannel()));
1375 }
1376 }
1377 }
1378
1379
1380
1381
1382 private void runDelegatedTasks() {
1383 for (;;) {
1384 final Runnable task;
1385 synchronized (handshakeLock) {
1386 task = engine.getDelegatedTask();
1387 }
1388
1389 if (task == null) {
1390 break;
1391 }
1392
1393 task.run();
1394 }
1395 }
1396
1397
1398
1399
1400
1401
1402
1403
1404 private boolean setHandshakeSuccessIfStillHandshaking(Channel channel) {
1405 if (handshaking && !handshakeFuture.isDone()) {
1406 setHandshakeSuccess(channel);
1407 return true;
1408 }
1409 return false;
1410 }
1411
1412 private void setHandshakeSuccess(Channel channel) {
1413 synchronized (handshakeLock) {
1414 handshaking = false;
1415 handshaken = true;
1416
1417 if (handshakeFuture == null) {
1418 handshakeFuture = future(channel);
1419 }
1420 cancelHandshakeTimeout();
1421 }
1422
1423 if (logger.isDebugEnabled()) {
1424 logger.debug(channel + " HANDSHAKEN: " + engine.getSession().getCipherSuite());
1425 }
1426
1427 handshakeFuture.setSuccess();
1428 }
1429
1430 private void setHandshakeFailure(Channel channel, SSLException cause) {
1431 synchronized (handshakeLock) {
1432 if (!handshaking) {
1433 return;
1434 }
1435 handshaking = false;
1436 handshaken = false;
1437
1438 if (handshakeFuture == null) {
1439 handshakeFuture = future(channel);
1440 }
1441
1442
1443 cancelHandshakeTimeout();
1444
1445
1446
1447
1448 engine.closeOutbound();
1449
1450 try {
1451 engine.closeInbound();
1452 } catch (SSLException e) {
1453 if (logger.isDebugEnabled()) {
1454 logger.debug(
1455 "SSLEngine.closeInbound() raised an exception after " +
1456 "a handshake failure.", e);
1457 }
1458 }
1459 }
1460
1461 handshakeFuture.setFailure(cause);
1462 if (closeOnSslException) {
1463 Channels.close(ctx, future(channel));
1464 }
1465 }
1466
1467 private void closeOutboundAndChannel(
1468 final ChannelHandlerContext context, final ChannelStateEvent e) {
1469 if (!e.getChannel().isConnected()) {
1470 context.sendDownstream(e);
1471 return;
1472 }
1473
1474
1475 if (!CLOSED_OUTBOUND_AND_CHANNEL_UPDATER.compareAndSet(this, 0, 1)) {
1476
1477
1478
1479 e.getChannel().getCloseFuture().addListener(new ChannelFutureListener() {
1480 public void operationComplete(ChannelFuture future) throws Exception {
1481 context.sendDownstream(e);
1482 }
1483 });
1484 return;
1485 }
1486
1487 boolean passthrough = true;
1488 try {
1489 try {
1490 unwrapNonAppData(ctx, e.getChannel(), false);
1491 } catch (SSLException ex) {
1492 if (logger.isDebugEnabled()) {
1493 logger.debug("Failed to unwrap before sending a close_notify message", ex);
1494 }
1495 }
1496
1497 if (!engine.isOutboundDone()) {
1498 if (SENT_CLOSE_NOTIFY_UPDATER.compareAndSet(this, 0, 1)) {
1499 engine.closeOutbound();
1500 try {
1501 ChannelFuture closeNotifyFuture = wrapNonAppData(context, e.getChannel());
1502 closeNotifyFuture.addListener(
1503 new ClosingChannelFutureListener(context, e));
1504 passthrough = false;
1505 } catch (SSLException ex) {
1506 if (logger.isDebugEnabled()) {
1507 logger.debug("Failed to encode a close_notify message", ex);
1508 }
1509 }
1510 }
1511 }
1512 } finally {
1513 if (passthrough) {
1514 context.sendDownstream(e);
1515 }
1516 }
1517 }
1518
1519 private static final class PendingWrite {
1520 final ChannelFuture future;
1521 final ByteBuffer outAppBuf;
1522
1523 PendingWrite(ChannelFuture future, ByteBuffer outAppBuf) {
1524 this.future = future;
1525 this.outAppBuf = outAppBuf;
1526 }
1527 }
1528
1529 private static final class ClosingChannelFutureListener implements ChannelFutureListener {
1530
1531 private final ChannelHandlerContext context;
1532 private final ChannelStateEvent e;
1533
1534 ClosingChannelFutureListener(
1535 ChannelHandlerContext context, ChannelStateEvent e) {
1536 this.context = context;
1537 this.e = e;
1538 }
1539
1540 public void operationComplete(ChannelFuture closeNotifyFuture) throws Exception {
1541 if (!(closeNotifyFuture.getCause() instanceof ClosedChannelException)) {
1542 Channels.close(context, e.getFuture());
1543 } else {
1544 e.getFuture().setSuccess();
1545 }
1546 }
1547 }
1548
1549 @Override
1550 public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
1551 super.beforeAdd(ctx);
1552 this.ctx = ctx;
1553 }
1554
1555
1556
1557
1558 @Override
1559 public void afterRemove(ChannelHandlerContext ctx) throws Exception {
1560 closeEngine();
1561
1562
1563 Throwable cause = null;
1564 for (;;) {
1565 PendingWrite pw = pendingUnencryptedWrites.poll();
1566 if (pw == null) {
1567 break;
1568 }
1569 if (cause == null) {
1570 cause = new IOException("Unable to write data");
1571 }
1572 pw.future.setFailure(cause);
1573 }
1574
1575 for (;;) {
1576 MessageEvent ev = pendingEncryptedWrites.poll();
1577 if (ev == null) {
1578 break;
1579 }
1580 if (cause == null) {
1581 cause = new IOException("Unable to write data");
1582 }
1583 ev.getFuture().setFailure(cause);
1584 }
1585
1586 if (cause != null) {
1587 fireExceptionCaughtLater(ctx, cause);
1588 }
1589 }
1590
1591
1592
1593
1594 @Override
1595 public void channelConnected(final ChannelHandlerContext ctx, final ChannelStateEvent e) throws Exception {
1596 if (issueHandshake) {
1597
1598
1599 handshake().addListener(new ChannelFutureListener() {
1600
1601 public void operationComplete(ChannelFuture future) throws Exception {
1602 if (future.isSuccess()) {
1603
1604
1605
1606 ctx.sendUpstream(e);
1607 }
1608 }
1609 });
1610 } else {
1611 super.channelConnected(ctx, e);
1612 }
1613 }
1614
1615
1616
1617
1618
1619
1620 @Override
1621 public void channelClosed(final ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
1622
1623
1624 ctx.getPipeline().execute(new Runnable() {
1625 public void run() {
1626 if (!pendingUnencryptedWritesLock.tryLock()) {
1627 return;
1628 }
1629
1630 List<ChannelFuture> futures = null;
1631 try {
1632 for (;;) {
1633 PendingWrite pw = pendingUnencryptedWrites.poll();
1634 if (pw == null) {
1635 break;
1636 }
1637 if (futures == null) {
1638 futures = new ArrayList<ChannelFuture>();
1639 }
1640 futures.add(pw.future);
1641 }
1642
1643 for (;;) {
1644 MessageEvent ev = pendingEncryptedWrites.poll();
1645 if (ev == null) {
1646 break;
1647 }
1648 if (futures == null) {
1649 futures = new ArrayList<ChannelFuture>();
1650 }
1651 futures.add(ev.getFuture());
1652 }
1653 } finally {
1654 pendingUnencryptedWritesLock.unlock();
1655 }
1656
1657 if (futures != null) {
1658 final ClosedChannelException cause = new ClosedChannelException();
1659 final int size = futures.size();
1660 for (int i = 0; i < size; i ++) {
1661 futures.get(i).setFailure(cause);
1662 }
1663 fireExceptionCaught(ctx, cause);
1664 }
1665 }
1666 });
1667
1668 super.channelClosed(ctx, e);
1669 }
1670
1671 private final class SSLEngineInboundCloseFuture extends DefaultChannelFuture {
1672 SSLEngineInboundCloseFuture() {
1673 super(null, true);
1674 }
1675
1676 void setClosed() {
1677 super.setSuccess();
1678 }
1679
1680 @Override
1681 public Channel getChannel() {
1682 if (ctx == null) {
1683
1684 return null;
1685 } else {
1686 return ctx.getChannel();
1687 }
1688 }
1689
1690 @Override
1691 public boolean setSuccess() {
1692 return false;
1693 }
1694
1695 @Override
1696 public boolean setFailure(Throwable cause) {
1697 return false;
1698 }
1699 }
1700 }