View Javadoc
1   /*
2    * Copyright 2020 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    * https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.pcap;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.channel.Channel;
21  import io.netty.channel.ChannelDuplexHandler;
22  import io.netty.channel.ChannelHandlerContext;
23  import io.netty.channel.ChannelInboundHandlerAdapter;
24  import io.netty.channel.ChannelPromise;
25  import io.netty.channel.ServerChannel;
26  import io.netty.channel.socket.DatagramChannel;
27  import io.netty.channel.socket.DatagramPacket;
28  import io.netty.channel.socket.ServerSocketChannel;
29  import io.netty.channel.socket.SocketChannel;
30  import io.netty.util.NetUtil;
31  import io.netty.util.internal.logging.InternalLogger;
32  import io.netty.util.internal.logging.InternalLoggerFactory;
33  
34  import java.io.Closeable;
35  import java.io.IOException;
36  import java.io.OutputStream;
37  import java.net.Inet4Address;
38  import java.net.Inet6Address;
39  import java.net.InetAddress;
40  import java.net.InetSocketAddress;
41  import java.net.UnknownHostException;
42  import java.util.concurrent.atomic.AtomicReference;
43  
44  import static io.netty.util.internal.ObjectUtil.checkNotNull;
45  
46  /**
47   * <p> {@link PcapWriteHandler} captures {@link ByteBuf} from {@link SocketChannel} / {@link ServerChannel}
48   * or {@link DatagramPacket} and writes it into Pcap {@link OutputStream}. </p>
49   *
50   * <p>
51   * Things to keep in mind when using {@link PcapWriteHandler} with TCP:
52   *
53   *    <ul>
54   *        <li> Whenever {@link ChannelInboundHandlerAdapter#channelActive(ChannelHandlerContext)} is called,
55   *        a fake TCP 3-way handshake (SYN, SYN+ACK, ACK) is simulated as new connection in Pcap. </li>
56   *
57   *        <li> Whenever {@link ChannelInboundHandlerAdapter#handlerRemoved(ChannelHandlerContext)} is called,
58   *        a fake TCP 3-way handshake (FIN+ACK, FIN+ACK, ACK) is simulated as connection shutdown in Pcap.  </li>
59   *
60   *        <li> Whenever {@link ChannelInboundHandlerAdapter#exceptionCaught(ChannelHandlerContext, Throwable)}
61   *        is called, a fake TCP RST is sent to simulate connection Reset in Pcap. </li>
62   *
63   *        <li> ACK is sent each time data is send / received. </li>
64   *
65   *        <li> Zero Length Data Packets can cause TCP Double ACK error in Wireshark. To tackle this,
66   *        set {@code captureZeroByte} to {@code false}. </li>
67   *    </ul>
68   * </p>
69   */
70  public final class PcapWriteHandler extends ChannelDuplexHandler implements Closeable {
71  
72      /**
73       * Logger for logging events
74       */
75      private final InternalLogger logger = InternalLoggerFactory.getInstance(PcapWriteHandler.class);
76  
77      /**
78       * {@link PcapWriter} Instance
79       */
80      private PcapWriter pCapWriter;
81  
82      /**
83       * {@link OutputStream} where we'll write Pcap data.
84       */
85      private final OutputStream outputStream;
86  
87      /**
88       * {@code true} if we want to capture packets with zero bytes else {@code false}.
89       */
90      private final boolean captureZeroByte;
91  
92      /**
93       * {@code true} if we want to write Pcap Global Header on initialization of
94       * {@link PcapWriter} else {@code false}.
95       */
96      private final boolean writePcapGlobalHeader;
97  
98      /**
99       * {@code true} if we want to synchronize on the {@link OutputStream} while writing
100      * else {@code false}.
101      */
102     private final boolean sharedOutputStream;
103 
104     /**
105      * TCP Sender Segment Number.
106      * It'll start with 1 and keep incrementing with number of bytes read/sent and wrap at the uint32 max.
107      */
108     private long sendSegmentNumber = 1;
109 
110     /**
111      * TCP Receiver Segment Number.
112      * It'll start with 1 and keep incrementing with number of bytes read/sent and wrap at the uint32 max
113      */
114     private long receiveSegmentNumber = 1;
115 
116     /**
117      * Type of the channel this handler is registered on
118      */
119     private ChannelType channelType;
120 
121     /**
122      * Address of the initiator of the connection
123      */
124     private InetSocketAddress initiatorAddr;
125 
126     /**
127      * Address of the receiver of the connection
128      */
129     private InetSocketAddress handlerAddr;
130 
131     /**
132      * Set to {@code true} if this handler is registered on a server pipeline
133      */
134     private boolean isServerPipeline;
135 
136     /**
137      * Current of this {@link PcapWriteHandler}
138      */
139     private final AtomicReference<State> state = new AtomicReference<State>(State.INIT);
140 
141     /**
142      * Create new {@link PcapWriteHandler} Instance.
143      * {@code captureZeroByte} is set to {@code false} and
144      * {@code writePcapGlobalHeader} is set to {@code true}.
145      *
146      * @param outputStream OutputStream where Pcap data will be written. Call {@link #close()} to close this
147      *                     OutputStream.
148      * @throws NullPointerException If {@link OutputStream} is {@code null} then we'll throw an
149      *                              {@link NullPointerException}
150      * @deprecated Use {@link Builder} instead.
151      */
152     @Deprecated
153     public PcapWriteHandler(OutputStream outputStream) {
154         this(outputStream, false, true);
155     }
156 
157     /**
158      * Create new {@link PcapWriteHandler} Instance
159      *
160      * @param outputStream          OutputStream where Pcap data will be written. Call {@link #close()} to close this
161      *                              OutputStream.
162      * @param captureZeroByte       Set to {@code true} to enable capturing packets with empty (0 bytes) payload.
163      *                              Otherwise, if set to {@code false}, empty packets will be filtered out.
164      * @param writePcapGlobalHeader Set to {@code true} to write Pcap Global Header on initialization.
165      *                              Otherwise, if set to {@code false}, Pcap Global Header will not be written
166      *                              on initialization. This could when writing Pcap data on a existing file where
167      *                              Pcap Global Header is already present.
168      * @throws NullPointerException If {@link OutputStream} is {@code null} then we'll throw an
169      *                              {@link NullPointerException}
170      * @deprecated Use {@link Builder} instead.
171      */
172     @Deprecated
173     public PcapWriteHandler(OutputStream outputStream, boolean captureZeroByte, boolean writePcapGlobalHeader) {
174         this.outputStream = checkNotNull(outputStream, "OutputStream");
175         this.captureZeroByte = captureZeroByte;
176         this.writePcapGlobalHeader = writePcapGlobalHeader;
177         sharedOutputStream = false;
178     }
179 
180     private PcapWriteHandler(Builder builder, OutputStream outputStream) {
181         this.outputStream = outputStream;
182         captureZeroByte = builder.captureZeroByte;
183         sharedOutputStream = builder.sharedOutputStream;
184         writePcapGlobalHeader = builder.writePcapGlobalHeader;
185         channelType = builder.channelType;
186         handlerAddr = builder.handlerAddr;
187         initiatorAddr = builder.initiatorAddr;
188         isServerPipeline = builder.isServerPipeline;
189     }
190 
191     /**
192      * Writes the Pcap Global Header to the provided {@code OutputStream}
193      *
194      * @param outputStream OutputStream where Pcap data will be written.
195      * @throws IOException if there is an error writing to the {@code OutputStream}
196      */
197     public static void writeGlobalHeader(OutputStream outputStream) throws IOException {
198         PcapHeaders.writeGlobalHeader(outputStream);
199     }
200 
201     private void initializeIfNecessary(ChannelHandlerContext ctx) throws Exception {
202         // If State is not 'INIT' then it means we're already initialized so then no need to initiaize again.
203         if (state.get() != State.INIT) {
204             return;
205         }
206 
207         pCapWriter = new PcapWriter(this);
208 
209         if (channelType == null) {
210             // infer channel type
211             if (ctx.channel() instanceof SocketChannel) {
212                 channelType = ChannelType.TCP;
213 
214                 // If Channel belongs to `SocketChannel` then we're handling TCP.
215                 // Capture correct `localAddress` and `remoteAddress`
216                 if (ctx.channel().parent() instanceof ServerSocketChannel) {
217                     isServerPipeline = true;
218                     initiatorAddr = (InetSocketAddress) ctx.channel().remoteAddress();
219                     handlerAddr = getLocalAddress(ctx.channel(), initiatorAddr);
220                 } else {
221                     isServerPipeline = false;
222                     handlerAddr = (InetSocketAddress) ctx.channel().remoteAddress();
223                     initiatorAddr = getLocalAddress(ctx.channel(), handlerAddr);
224                 }
225             } else if (ctx.channel() instanceof DatagramChannel) {
226                 channelType = ChannelType.UDP;
227 
228                 DatagramChannel datagramChannel = (DatagramChannel) ctx.channel();
229 
230                 // If `DatagramChannel` is connected then we can get
231                 // `localAddress` and `remoteAddress` from Channel.
232                 if (datagramChannel.isConnected()) {
233                     handlerAddr = (InetSocketAddress) ctx.channel().remoteAddress();
234                     initiatorAddr = getLocalAddress(ctx.channel(), handlerAddr);
235                 }
236             }
237         }
238 
239         if (channelType == ChannelType.TCP) {
240             logger.debug("Initiating Fake TCP 3-Way Handshake");
241 
242             ByteBuf tcpBuf = ctx.alloc().buffer();
243 
244             try {
245                 // Write SYN with Normal Source and Destination Address
246                 TCPPacket.writePacket(tcpBuf, null, 0, 0,
247                                       initiatorAddr.getPort(), handlerAddr.getPort(), TCPPacket.TCPFlag.SYN);
248                 completeTCPWrite(initiatorAddr, handlerAddr, tcpBuf, ctx.alloc(), ctx);
249 
250                 // Write SYN+ACK with Reversed Source and Destination Address
251                 TCPPacket.writePacket(tcpBuf, null, 0, 1,
252                                       handlerAddr.getPort(), initiatorAddr.getPort(), TCPPacket.TCPFlag.SYN,
253                                       TCPPacket.TCPFlag.ACK);
254                 completeTCPWrite(handlerAddr, initiatorAddr, tcpBuf, ctx.alloc(), ctx);
255 
256                 // Write ACK with Normal Source and Destination Address
257                 TCPPacket.writePacket(tcpBuf, null, 1, 1, initiatorAddr.getPort(),
258                                       handlerAddr.getPort(), TCPPacket.TCPFlag.ACK);
259                 completeTCPWrite(initiatorAddr, handlerAddr, tcpBuf, ctx.alloc(), ctx);
260             } finally {
261                 tcpBuf.release();
262             }
263 
264             logger.debug("Finished Fake TCP 3-Way Handshake");
265         }
266 
267         state.set(State.WRITING);
268     }
269 
270     @Override
271     public void channelActive(ChannelHandlerContext ctx) throws Exception {
272         initializeIfNecessary(ctx);
273         super.channelActive(ctx);
274     }
275 
276     @Override
277     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
278         // Initialize if needed
279         if (state.get() == State.INIT) {
280             initializeIfNecessary(ctx);
281         }
282 
283         // Only write if State is STARTED
284         if (state.get() == State.WRITING) {
285             if (channelType == ChannelType.TCP) {
286                 handleTCP(ctx, msg, false);
287             } else if (channelType == ChannelType.UDP) {
288                 handleUDP(ctx, msg, false);
289             } else {
290                 logDiscard();
291             }
292         }
293         super.channelRead(ctx, msg);
294     }
295 
296     @Override
297     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
298         // Initialize if needed
299         if (state.get() == State.INIT) {
300             initializeIfNecessary(ctx);
301         }
302 
303         // Only write if State is STARTED
304         if (state.get() == State.WRITING) {
305             if (channelType == ChannelType.TCP) {
306                 handleTCP(ctx, msg, true);
307             } else if (channelType == ChannelType.UDP) {
308                 handleUDP(ctx, msg, true);
309             } else {
310                 logDiscard();
311             }
312         }
313         super.write(ctx, msg, promise);
314     }
315 
316     /**
317      * Handle TCP L4
318      *
319      * @param ctx              {@link ChannelHandlerContext} for {@link ByteBuf} allocation and
320      *                         {@code fireExceptionCaught}
321      * @param msg              {@link Object} must be {@link ByteBuf} else it'll be discarded
322      * @param isWriteOperation Set {@code true} if we have to process packet when packets are being sent out
323      *                         else set {@code false}
324      */
325     private void handleTCP(ChannelHandlerContext ctx, Object msg, boolean isWriteOperation) {
326         if (msg instanceof ByteBuf) {
327 
328             // If bytes are 0 and `captureZeroByte` is false, we won't capture this.
329             int totalBytes = ((ByteBuf) msg).readableBytes();
330             if (totalBytes == 0 && !captureZeroByte) {
331                 logger.debug("Discarding Zero Byte TCP Packet. isWriteOperation {}", isWriteOperation);
332                 return;
333             }
334 
335             ByteBufAllocator byteBufAllocator = ctx.alloc();
336             if (totalBytes == 0) {
337                 handleTcpPacket(ctx, (ByteBuf) msg, isWriteOperation, byteBufAllocator);
338                 return;
339             }
340 
341             // If the payload exceeds the max size of that can fit in a single TCP IPv4 packet, fragment the payload
342             int maxTcpPayload = 65495;
343 
344             for (int i = 0; i < totalBytes; i += maxTcpPayload) {
345                 ByteBuf packet = ((ByteBuf) msg).slice(i, Math.min(maxTcpPayload, totalBytes - i));
346                 handleTcpPacket(ctx, packet, isWriteOperation, byteBufAllocator);
347             }
348         } else {
349             logger.debug("Discarding Pcap Write for TCP Object: {}", msg);
350         }
351     }
352 
353     private void handleTcpPacket(ChannelHandlerContext ctx, ByteBuf packet, boolean isWriteOperation,
354                                  ByteBufAllocator byteBufAllocator) {
355         ByteBuf tcpBuf = byteBufAllocator.buffer();
356         int bytes = packet.readableBytes();
357 
358         try {
359             if (isWriteOperation) {
360                 final InetSocketAddress srcAddr;
361                 final InetSocketAddress dstAddr;
362                 if (isServerPipeline) {
363                     srcAddr = handlerAddr;
364                     dstAddr = initiatorAddr;
365                 } else {
366                     srcAddr = initiatorAddr;
367                     dstAddr = handlerAddr;
368                 }
369 
370                 TCPPacket.writePacket(tcpBuf, packet, sendSegmentNumber, receiveSegmentNumber,
371                                       srcAddr.getPort(),
372                                       dstAddr.getPort(), TCPPacket.TCPFlag.ACK);
373                 completeTCPWrite(srcAddr, dstAddr, tcpBuf, byteBufAllocator, ctx);
374                 logTCP(true, bytes, sendSegmentNumber, receiveSegmentNumber, srcAddr, dstAddr, false);
375 
376                 sendSegmentNumber = incrementUintSegmentNumber(sendSegmentNumber, bytes);
377 
378                 TCPPacket.writePacket(tcpBuf, null, receiveSegmentNumber, sendSegmentNumber, dstAddr.getPort(),
379                                       srcAddr.getPort(), TCPPacket.TCPFlag.ACK);
380                 completeTCPWrite(dstAddr, srcAddr, tcpBuf, byteBufAllocator, ctx);
381                 logTCP(true, bytes, sendSegmentNumber, receiveSegmentNumber, dstAddr, srcAddr, true);
382             } else {
383                 final InetSocketAddress srcAddr;
384                 final InetSocketAddress dstAddr;
385                 if (isServerPipeline) {
386                     srcAddr = initiatorAddr;
387                     dstAddr = handlerAddr;
388                 } else {
389                     srcAddr = handlerAddr;
390                     dstAddr = initiatorAddr;
391                 }
392 
393                 TCPPacket.writePacket(tcpBuf, packet, receiveSegmentNumber, sendSegmentNumber,
394                                       srcAddr.getPort(),
395                                       dstAddr.getPort(), TCPPacket.TCPFlag.ACK);
396                 completeTCPWrite(srcAddr, dstAddr, tcpBuf, byteBufAllocator, ctx);
397                 logTCP(false, bytes, receiveSegmentNumber, sendSegmentNumber, srcAddr, dstAddr, false);
398 
399                 receiveSegmentNumber = incrementUintSegmentNumber(receiveSegmentNumber, bytes);
400 
401                 TCPPacket.writePacket(tcpBuf, null, sendSegmentNumber, receiveSegmentNumber, dstAddr.getPort(),
402                                       srcAddr.getPort(), TCPPacket.TCPFlag.ACK);
403                 completeTCPWrite(dstAddr, srcAddr, tcpBuf, byteBufAllocator, ctx);
404                 logTCP(false, bytes, sendSegmentNumber, receiveSegmentNumber, dstAddr, srcAddr, true);
405             }
406         } finally {
407             tcpBuf.release();
408         }
409     }
410 
411     /**
412      * Write TCP/IP L3 and L2 here.
413      *
414      * @param srcAddr          {@link InetSocketAddress} Source Address of this Packet
415      * @param dstAddr          {@link InetSocketAddress} Destination Address of this Packet
416      * @param tcpBuf           {@link ByteBuf} containing TCP L4 Data
417      * @param byteBufAllocator {@link ByteBufAllocator} for allocating bytes for TCP/IP L3 and L2 data.
418      * @param ctx              {@link ChannelHandlerContext} for {@code fireExceptionCaught}
419      */
420     private void completeTCPWrite(InetSocketAddress srcAddr, InetSocketAddress dstAddr, ByteBuf tcpBuf,
421                                   ByteBufAllocator byteBufAllocator, ChannelHandlerContext ctx) {
422 
423         ByteBuf ipBuf = byteBufAllocator.buffer();
424         ByteBuf ethernetBuf = byteBufAllocator.buffer();
425         ByteBuf pcap = byteBufAllocator.buffer();
426 
427         try {
428             if (srcAddr.getAddress() instanceof Inet4Address && dstAddr.getAddress() instanceof Inet4Address) {
429                 IPPacket.writeTCPv4(ipBuf, tcpBuf,
430                                     NetUtil.ipv4AddressToInt((Inet4Address) srcAddr.getAddress()),
431                                     NetUtil.ipv4AddressToInt((Inet4Address) dstAddr.getAddress()));
432 
433                 EthernetPacket.writeIPv4(ethernetBuf, ipBuf);
434             } else if (srcAddr.getAddress() instanceof Inet6Address && dstAddr.getAddress() instanceof Inet6Address) {
435                 IPPacket.writeTCPv6(ipBuf, tcpBuf,
436                                     srcAddr.getAddress().getAddress(),
437                                     dstAddr.getAddress().getAddress());
438 
439                 EthernetPacket.writeIPv6(ethernetBuf, ipBuf);
440             } else {
441                 logger.error("Source and Destination IP Address versions are not same. Source Address: {}, " +
442                              "Destination Address: {}", srcAddr.getAddress(), dstAddr.getAddress());
443                 return;
444             }
445 
446             // Write Packet into Pcap
447             pCapWriter.writePacket(pcap, ethernetBuf);
448         } catch (IOException ex) {
449             logger.error("Caught Exception While Writing Packet into Pcap", ex);
450             ctx.fireExceptionCaught(ex);
451         } finally {
452             ipBuf.release();
453             ethernetBuf.release();
454             pcap.release();
455         }
456     }
457 
458     private static long incrementUintSegmentNumber(long sequenceNumber, int value) {
459         // If the sequence number would go above the max for uint32, wrap around
460         return (sequenceNumber + value) % (0xFFFFFFFFL + 1);
461     }
462 
463     /**
464      * Handle UDP l4
465      *
466      * @param ctx {@link ChannelHandlerContext} for {@code localAddress} / {@code remoteAddress},
467      *            {@link ByteBuf} allocation and {@code fireExceptionCaught}
468      * @param msg {@link DatagramPacket} or {@link ByteBuf}
469      */
470     private void handleUDP(ChannelHandlerContext ctx, Object msg, boolean isWriteOperation) {
471         ByteBuf udpBuf = ctx.alloc().buffer();
472 
473         try {
474             if (msg instanceof DatagramPacket) {
475 
476                 // If bytes are 0 and `captureZeroByte` is false, we won't capture this.
477                 if (((DatagramPacket) msg).content().readableBytes() == 0 && !captureZeroByte) {
478                     logger.debug("Discarding Zero Byte UDP Packet");
479                     return;
480                 }
481 
482                 if (((DatagramPacket) msg).content().readableBytes() > 65507) {
483                     logger.warn("Unable to write UDP packet to PCAP. Payload of size {} exceeds max size of 65507");
484                     return;
485                 }
486 
487                 DatagramPacket datagramPacket = ((DatagramPacket) msg).duplicate();
488                 InetSocketAddress srcAddr = datagramPacket.sender();
489                 InetSocketAddress dstAddr = datagramPacket.recipient();
490 
491                 // If `datagramPacket.sender()` is `null` then DatagramPacket is initialized
492                 // `sender` (local) address. In this case, we'll get source address from Channel.
493                 if (srcAddr == null) {
494                     srcAddr = getLocalAddress(ctx.channel(), dstAddr);
495                 }
496 
497                 logger.debug("Writing UDP Data of {} Bytes, isWriteOperation {}, Src Addr {}, Dst Addr {}",
498                              datagramPacket.content().readableBytes(), isWriteOperation, srcAddr, dstAddr);
499 
500                 UDPPacket.writePacket(udpBuf, datagramPacket.content(), srcAddr.getPort(), dstAddr.getPort());
501                 completeUDPWrite(srcAddr, dstAddr, udpBuf, ctx.alloc(), ctx);
502             } else if (msg instanceof ByteBuf &&
503                        (!(ctx.channel() instanceof DatagramChannel) ||
504                         ((DatagramChannel) ctx.channel()).isConnected())) {
505 
506                 // If bytes are 0 and `captureZeroByte` is false, we won't capture this.
507                 if (((ByteBuf) msg).readableBytes() == 0 && !captureZeroByte) {
508                     logger.debug("Discarding Zero Byte UDP Packet");
509                     return;
510                 }
511 
512                 if (((ByteBuf) msg).readableBytes() > 65507) {
513                     logger.warn("Unable to write UDP packet to PCAP. Payload of size {} exceeds max size of 65507");
514                     return;
515                 }
516 
517                 ByteBuf byteBuf = ((ByteBuf) msg).duplicate();
518 
519                 InetSocketAddress sourceAddr = isWriteOperation? initiatorAddr : handlerAddr;
520                 InetSocketAddress destinationAddr = isWriteOperation? handlerAddr : initiatorAddr;
521 
522                 logger.debug("Writing UDP Data of {} Bytes, Src Addr {}, Dst Addr {}",
523                              byteBuf.readableBytes(), sourceAddr, destinationAddr);
524 
525                 UDPPacket.writePacket(udpBuf, byteBuf, sourceAddr.getPort(), destinationAddr.getPort());
526                 completeUDPWrite(sourceAddr, destinationAddr, udpBuf, ctx.alloc(), ctx);
527             } else {
528                 logger.debug("Discarding Pcap Write for UDP Object: {}", msg);
529             }
530         } finally {
531             udpBuf.release();
532         }
533     }
534 
535     /**
536      * Write UDP/IP L3 and L2 here.
537      *
538      * @param srcAddr          {@link InetSocketAddress} Source Address of this Packet
539      * @param dstAddr          {@link InetSocketAddress} Destination Address of this Packet
540      * @param udpBuf           {@link ByteBuf} containing UDP L4 Data
541      * @param byteBufAllocator {@link ByteBufAllocator} for allocating bytes for UDP/IP L3 and L2 data.
542      * @param ctx              {@link ChannelHandlerContext} for {@code fireExceptionCaught}
543      */
544     private void completeUDPWrite(InetSocketAddress srcAddr, InetSocketAddress dstAddr, ByteBuf udpBuf,
545                                   ByteBufAllocator byteBufAllocator, ChannelHandlerContext ctx) {
546 
547         ByteBuf ipBuf = byteBufAllocator.buffer();
548         ByteBuf ethernetBuf = byteBufAllocator.buffer();
549         ByteBuf pcap = byteBufAllocator.buffer();
550 
551         try {
552             if (srcAddr.getAddress() instanceof Inet4Address && dstAddr.getAddress() instanceof Inet4Address) {
553                 IPPacket.writeUDPv4(ipBuf, udpBuf,
554                                     NetUtil.ipv4AddressToInt((Inet4Address) srcAddr.getAddress()),
555                                     NetUtil.ipv4AddressToInt((Inet4Address) dstAddr.getAddress()));
556 
557                 EthernetPacket.writeIPv4(ethernetBuf, ipBuf);
558             } else if (srcAddr.getAddress() instanceof Inet6Address && dstAddr.getAddress() instanceof Inet6Address) {
559                 IPPacket.writeUDPv6(ipBuf, udpBuf,
560                                     srcAddr.getAddress().getAddress(),
561                                     dstAddr.getAddress().getAddress());
562 
563                 EthernetPacket.writeIPv6(ethernetBuf, ipBuf);
564             } else {
565                 logger.error("Source and Destination IP Address versions are not same. Source Address: {}, " +
566                              "Destination Address: {}", srcAddr.getAddress(), dstAddr.getAddress());
567                 return;
568             }
569 
570             // Write Packet into Pcap
571             pCapWriter.writePacket(pcap, ethernetBuf);
572         } catch (IOException ex) {
573             logger.error("Caught Exception While Writing Packet into Pcap", ex);
574             ctx.fireExceptionCaught(ex);
575         } finally {
576             ipBuf.release();
577             ethernetBuf.release();
578             pcap.release();
579         }
580     }
581 
582     /**
583      * Get the local address of a channel. If the address is a wildcard address ({@code 0.0.0.0} or {@code ::}), and
584      * the address family does not match that of the {@code remote}, return the wildcard address of the {@code remote}'s
585      * family instead.
586      *
587      * @param ch     The channel to get the local address from
588      * @param remote The remote address
589      * @return The fixed local address
590      */
591     private static InetSocketAddress getLocalAddress(Channel ch, InetSocketAddress remote) {
592         InetSocketAddress local = (InetSocketAddress) ch.localAddress();
593         if (remote != null && local.getAddress().isAnyLocalAddress()) {
594             if (local.getAddress() instanceof Inet4Address && remote.getAddress() instanceof Inet6Address) {
595                 return new InetSocketAddress(WildcardAddressHolder.wildcard6, local.getPort());
596             }
597             if (local.getAddress() instanceof Inet6Address && remote.getAddress() instanceof Inet4Address) {
598                 return new InetSocketAddress(WildcardAddressHolder.wildcard4, local.getPort());
599             }
600         }
601         return local;
602     }
603 
604     @Override
605     public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
606 
607         // If `isTCP` is true and state is WRITING, then we'll simulate a `FIN` flow.
608         if (channelType == ChannelType.TCP && state.get() == State.WRITING) {
609             logger.debug("Starting Fake TCP FIN+ACK Flow to close connection");
610 
611             ByteBufAllocator byteBufAllocator = ctx.alloc();
612             ByteBuf tcpBuf = byteBufAllocator.buffer();
613 
614             try {
615                 long initiatorSegmentNumber = isServerPipeline? receiveSegmentNumber : sendSegmentNumber;
616                 long initiatorAckNumber = isServerPipeline? sendSegmentNumber : receiveSegmentNumber;
617                 // Write FIN+ACK with Normal Source and Destination Address
618                 TCPPacket.writePacket(tcpBuf, null, initiatorSegmentNumber, initiatorAckNumber, initiatorAddr.getPort(),
619                                       handlerAddr.getPort(), TCPPacket.TCPFlag.FIN, TCPPacket.TCPFlag.ACK);
620                 completeTCPWrite(initiatorAddr, handlerAddr, tcpBuf, byteBufAllocator, ctx);
621 
622                 // Write FIN+ACK with Reversed Source and Destination Address
623                 TCPPacket.writePacket(tcpBuf, null, initiatorAckNumber, initiatorSegmentNumber, handlerAddr.getPort(),
624                                       initiatorAddr.getPort(), TCPPacket.TCPFlag.FIN, TCPPacket.TCPFlag.ACK);
625                 completeTCPWrite(handlerAddr, initiatorAddr, tcpBuf, byteBufAllocator, ctx);
626 
627                 // Increment by 1 when responding to FIN
628                 sendSegmentNumber = incrementUintSegmentNumber(sendSegmentNumber, 1);
629                 receiveSegmentNumber = incrementUintSegmentNumber(receiveSegmentNumber, 1);
630                 initiatorSegmentNumber = isServerPipeline? receiveSegmentNumber : sendSegmentNumber;
631                 initiatorAckNumber = isServerPipeline? sendSegmentNumber : receiveSegmentNumber;
632 
633                 // Write ACK with Normal Source and Destination Address
634                 TCPPacket.writePacket(tcpBuf, null, initiatorSegmentNumber, initiatorAckNumber,
635                                       initiatorAddr.getPort(), handlerAddr.getPort(), TCPPacket.TCPFlag.ACK);
636                 completeTCPWrite(initiatorAddr, handlerAddr, tcpBuf, byteBufAllocator, ctx);
637             } finally {
638                 tcpBuf.release();
639             }
640 
641             logger.debug("Finished Fake TCP FIN+ACK Flow to close connection");
642         }
643 
644         close();
645         super.handlerRemoved(ctx);
646     }
647 
648     @Override
649     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
650         // Only write RST if this is an initialized TCP stream
651         if (channelType == ChannelType.TCP && state.get() == State.WRITING) {
652             ByteBuf tcpBuf = ctx.alloc().buffer();
653 
654             try {
655                 // Write RST with Normal Source and Destination Address
656                 TCPPacket.writePacket(tcpBuf, null, sendSegmentNumber, receiveSegmentNumber, initiatorAddr.getPort(),
657                                       handlerAddr.getPort(), TCPPacket.TCPFlag.RST, TCPPacket.TCPFlag.ACK);
658                 completeTCPWrite(initiatorAddr, handlerAddr, tcpBuf, ctx.alloc(), ctx);
659             } finally {
660                 tcpBuf.release();
661             }
662 
663             logger.debug("Sent Fake TCP RST to close connection");
664         }
665 
666         close();
667         ctx.fireExceptionCaught(cause);
668     }
669 
670     /**
671      * Logger for TCP
672      */
673     private void logTCP(boolean isWriteOperation, int bytes, long sendSegmentNumber, long receiveSegmentNumber,
674                         InetSocketAddress srcAddr, InetSocketAddress dstAddr, boolean ackOnly) {
675         // If `ackOnly` is `true` when we don't need to write any data so we'll not
676         // log number of bytes being written and mark the operation as "TCP ACK".
677         if (logger.isDebugEnabled()) {
678             if (ackOnly) {
679                 logger.debug("Writing TCP ACK, isWriteOperation {}, Segment Number {}, Ack Number {}, Src Addr {}, "
680                              + "Dst Addr {}", isWriteOperation, sendSegmentNumber, receiveSegmentNumber, dstAddr,
681                              srcAddr);
682             } else {
683                 logger.debug("Writing TCP Data of {} Bytes, isWriteOperation {}, Segment Number {}, Ack Number {}, " +
684                              "Src Addr {}, Dst Addr {}", bytes, isWriteOperation, sendSegmentNumber,
685                              receiveSegmentNumber, srcAddr, dstAddr);
686             }
687         }
688     }
689 
690     OutputStream outputStream() {
691         return outputStream;
692     }
693 
694     boolean sharedOutputStream() {
695         return sharedOutputStream;
696     }
697 
698     boolean writePcapGlobalHeader() {
699         return writePcapGlobalHeader;
700     }
701 
702     /**
703      * Returns {@code true} if the {@link PcapWriteHandler} is currently
704      * writing packets to the {@link OutputStream} else returns {@code false}.
705      */
706     public boolean isWriting() {
707         return state.get() == State.WRITING;
708     }
709 
710     State state() {
711         return state.get();
712     }
713 
714     /**
715      * Pause the {@link PcapWriteHandler} from writing packets to the {@link OutputStream}.
716      */
717     public void pause() {
718         if (!state.compareAndSet(State.WRITING, State.PAUSED)) {
719             throw new IllegalStateException("State must be 'STARTED' to pause but current state is: " + state);
720         }
721     }
722 
723     /**
724      * Resume the {@link PcapWriteHandler} to writing packets to the {@link OutputStream}.
725      */
726     public void resume() {
727         if (!state.compareAndSet(State.PAUSED, State.WRITING)) {
728             throw new IllegalStateException("State must be 'PAUSED' to resume but current state is: " + state);
729         }
730     }
731 
732     void markClosed() {
733         if (state.get() != State.CLOSED) {
734             state.set(State.CLOSED);
735         }
736     }
737 
738     // Visible for testing only.
739     PcapWriter pCapWriter() {
740         return pCapWriter;
741     }
742 
743     private void logDiscard() {
744         logger.warn("Discarding pcap write because channel type is unknown. The channel this handler is registered " +
745                     "on is not a SocketChannel or DatagramChannel, so the inference does not work. Please call " +
746                     "forceTcpChannel or forceUdpChannel before registering the handler.");
747     }
748 
749     @Override
750     public String toString() {
751         return "PcapWriteHandler{" +
752                "captureZeroByte=" + captureZeroByte +
753                ", writePcapGlobalHeader=" + writePcapGlobalHeader +
754                ", sharedOutputStream=" + sharedOutputStream +
755                ", sendSegmentNumber=" + sendSegmentNumber +
756                ", receiveSegmentNumber=" + receiveSegmentNumber +
757                ", channelType=" + channelType +
758                ", initiatorAddr=" + initiatorAddr +
759                ", handlerAddr=" + handlerAddr +
760                ", isServerPipeline=" + isServerPipeline +
761                ", state=" + state +
762                '}';
763     }
764 
765     /**
766      * <p> Close {@code PcapWriter} and {@link OutputStream}. </p>
767      * <p> Note: Calling this method does not close {@link PcapWriteHandler}.
768      * Only Pcap Writes are closed. </p>
769      *
770      * @throws IOException If {@link OutputStream#close()} throws an exception
771      */
772     @Override
773     public void close() throws IOException {
774         if (state.get() == State.CLOSED) {
775             logger.debug("PcapWriterHandler is already closed");
776         } else {
777             // If close is called prematurely, create writer to close output stream
778             if (pCapWriter == null) {
779                 pCapWriter = new PcapWriter(this);
780             }
781             pCapWriter.close();
782             markClosed();
783             logger.debug("PcapWriterHandler is now closed");
784         }
785     }
786 
787     private enum ChannelType {
788         TCP, UDP
789     }
790 
791     public static Builder builder() {
792         return new Builder();
793     }
794 
795     /**
796      * Builder for {@link PcapWriteHandler}
797      */
798     public static final class Builder {
799         private boolean captureZeroByte;
800         private boolean sharedOutputStream;
801         private boolean writePcapGlobalHeader = true;
802 
803         private ChannelType channelType;
804         private InetSocketAddress initiatorAddr;
805         private InetSocketAddress handlerAddr;
806         private boolean isServerPipeline;
807 
808         private Builder() {
809         }
810 
811         /**
812          * Set to {@code true} to enable capturing packets with empty (0 bytes) payload. Otherwise, if set to
813          * {@code false}, empty packets will be filtered out.
814          *
815          * @param captureZeroByte Whether to filter out empty packets.
816          * @return this builder
817          */
818         public Builder captureZeroByte(boolean captureZeroByte) {
819             this.captureZeroByte = captureZeroByte;
820             return this;
821         }
822 
823         /**
824          * Set to {@code true} if multiple {@link PcapWriteHandler} instances will be
825          * writing to the same {@link OutputStream} concurrently, and write locking is
826          * required. Otherwise, if set to {@code false}, no locking will be done.
827          * Additionally, {@link #close} will not close the underlying {@code OutputStream}.
828          * Note: it is probably an error to have both {@code writePcapGlobalHeader} and
829          * {@code sharedOutputStream} set to {@code true} at the same time.
830          *
831          * @param sharedOutputStream Whether {@link OutputStream} is shared or not
832          * @return this builder
833          */
834         public Builder sharedOutputStream(boolean sharedOutputStream) {
835             this.sharedOutputStream = sharedOutputStream;
836             return this;
837         }
838 
839         /**
840          * Set to {@code true} to write Pcap Global Header on initialization. Otherwise, if set to {@code false}, Pcap
841          * Global Header will not be written on initialization. This could when writing Pcap data on a existing file
842          * where Pcap Global Header is already present.
843          *
844          * @param writePcapGlobalHeader Whether to write the pcap global header.
845          * @return this builder
846          */
847         public Builder writePcapGlobalHeader(boolean writePcapGlobalHeader) {
848             this.writePcapGlobalHeader = writePcapGlobalHeader;
849             return this;
850         }
851 
852         /**
853          * Force this handler to write data as if they were TCP packets, with the given connection metadata. If this
854          * method isn't called, we determine the metadata from the channel.
855          *
856          * @param serverAddress    The address of the TCP server (handler)
857          * @param clientAddress    The address of the TCP client (initiator)
858          * @param isServerPipeline Whether the handler is part of the server channel
859          * @return this builder
860          */
861         public Builder forceTcpChannel(InetSocketAddress serverAddress, InetSocketAddress clientAddress,
862                                        boolean isServerPipeline) {
863             channelType = ChannelType.TCP;
864             handlerAddr = checkNotNull(serverAddress, "serverAddress");
865             initiatorAddr = checkNotNull(clientAddress, "clientAddress");
866             this.isServerPipeline = isServerPipeline;
867             return this;
868         }
869 
870         /**
871          * Force this handler to write data as if they were UDP packets, with the given connection metadata. If this
872          * method isn't called, we determine the metadata from the channel.
873          * <br>
874          * Note that even if this method is called, the address information on {@link DatagramPacket} takes precedence
875          * if it is present.
876          *
877          * @param localAddress  The address of the UDP local
878          * @param remoteAddress The address of the UDP remote
879          * @return this builder
880          */
881         public Builder forceUdpChannel(InetSocketAddress localAddress, InetSocketAddress remoteAddress) {
882             channelType = ChannelType.UDP;
883             handlerAddr = checkNotNull(remoteAddress, "remoteAddress");
884             initiatorAddr = checkNotNull(localAddress, "localAddress");
885             return this;
886         }
887 
888         /**
889          * Build the {@link PcapWriteHandler}.
890          *
891          * @param outputStream The output stream to write the pcap data to.
892          * @return The handler.
893          */
894         public PcapWriteHandler build(OutputStream outputStream) {
895             checkNotNull(outputStream, "outputStream");
896             return new PcapWriteHandler(this, outputStream);
897         }
898     }
899 
900     private static final class WildcardAddressHolder {
901         static final InetAddress wildcard4; // 0.0.0.0
902         static final InetAddress wildcard6; // ::
903 
904         static {
905             try {
906                 wildcard4 = InetAddress.getByAddress(new byte[4]);
907                 wildcard6 = InetAddress.getByAddress(new byte[16]);
908             } catch (UnknownHostException e) {
909                 // would only happen if the byte array was of incorrect size
910                 throw new AssertionError(e);
911             }
912         }
913     }
914 }