View Javadoc
1   /*
2    * Copyright 2016 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty5.channel.kqueue;
17  
18  import io.netty5.buffer.api.Buffer;
19  import io.netty5.buffer.api.BufferAllocator;
20  import io.netty5.channel.AddressedEnvelope;
21  import io.netty5.channel.ChannelException;
22  import io.netty5.channel.ChannelMetadata;
23  import io.netty5.channel.ChannelOption;
24  import io.netty5.channel.ChannelOutboundBuffer;
25  import io.netty5.channel.ChannelPipeline;
26  import io.netty5.channel.ChannelShutdownDirection;
27  import io.netty5.channel.DefaultBufferAddressedEnvelope;
28  import io.netty5.channel.EventLoop;
29  import io.netty5.channel.FixedRecvBufferAllocator;
30  import io.netty5.channel.RecvBufferAllocator;
31  import io.netty5.channel.socket.DatagramPacket;
32  import io.netty5.channel.socket.DatagramChannel;
33  import io.netty5.channel.socket.DomainSocketAddress;
34  import io.netty5.channel.socket.SocketProtocolFamily;
35  import io.netty5.channel.unix.DatagramSocketAddress;
36  import io.netty5.channel.unix.DomainDatagramSocketAddress;
37  import io.netty5.channel.unix.Errors;
38  import io.netty5.channel.unix.IntegerUnixChannelOption;
39  import io.netty5.channel.unix.IovArray;
40  import io.netty5.channel.unix.RawUnixChannelOption;
41  import io.netty5.channel.unix.RecvFromAddressDomainSocket;
42  import io.netty5.channel.unix.UnixChannel;
43  import io.netty5.channel.unix.UnixChannelOption;
44  import io.netty5.channel.unix.UnixChannelUtil;
45  import io.netty5.util.concurrent.Future;
46  import io.netty5.util.internal.SilentDispose;
47  import io.netty5.util.internal.StringUtil;
48  import io.netty5.util.internal.UnstableApi;
49  import io.netty5.util.internal.logging.InternalLogger;
50  import io.netty5.util.internal.logging.InternalLoggerFactory;
51  
52  import java.io.IOException;
53  import java.net.InetAddress;
54  import java.net.InetSocketAddress;
55  import java.net.NetworkInterface;
56  import java.net.PortUnreachableException;
57  import java.net.ProtocolFamily;
58  import java.net.SocketAddress;
59  import java.nio.ByteBuffer;
60  import java.util.Set;
61  import java.util.function.Predicate;
62  
63  import static io.netty5.channel.ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION;
64  import static io.netty5.channel.ChannelOption.IP_TOS;
65  import static io.netty5.channel.ChannelOption.SO_BROADCAST;
66  import static io.netty5.channel.ChannelOption.SO_RCVBUF;
67  import static io.netty5.channel.ChannelOption.SO_REUSEADDR;
68  import static io.netty5.channel.ChannelOption.SO_SNDBUF;
69  import static io.netty5.channel.unix.UnixChannelOption.SO_REUSEPORT;
70  import static io.netty5.util.CharsetUtil.UTF_8;
71  
72  import static java.util.Objects.requireNonNull;
73  
74  /**
75   * {@link DatagramChannel} implementation that uses KQueue.
76   *
77   * <h3>Available options</h3>
78   *
79   * In addition to the options provided by {@link DatagramChannel} and {@link UnixChannel},
80   * {@link KQueueDatagramChannel} allows the following options in the option map:
81   *
82   * <table border="1" cellspacing="0" cellpadding="6">
83   * <tr>
84   * <th>{@link ChannelOption}</th>
85   * <th>{@code INET}</th>
86   * <th>{@code INET6}</th>
87   * <th>{@code UNIX}</th>
88   * </tr><tr>
89   * <td>{@link IntegerUnixChannelOption}</td><td>X</td><td>X</td><td>X</td>
90   * </tr><tr>
91   * <td>{@link RawUnixChannelOption}</td><td>X</td><td>X</td><td>X</td>
92   * </tr><tr>
93   * <td>{@link UnixChannelOption#SO_REUSEPORT}</td><td>X</td><td>X</td><td>-</td>
94   * </tr>
95   * </table>
96   */
97  @UnstableApi
98  public final class KQueueDatagramChannel
99          extends AbstractKQueueChannel<UnixChannel> implements DatagramChannel {
100     private static final InternalLogger logger = InternalLoggerFactory.getInstance(KQueueDatagramChannel.class);
101     private static final Set<ChannelOption<?>> SUPPORTED_OPTIONS = supportedOptions();
102 
103     private static final Set<ChannelOption<?>> SUPPORTED_OPTIONS_DOMAIN_SOCKET = supportedOptionsDomainSocket();
104 
105     private static final ChannelMetadata METADATA = new ChannelMetadata(true);
106 
107     private static final String EXPECTED_TYPES =
108             " (expected: " + StringUtil.simpleClassName(DatagramPacket.class) + ", " +
109                     StringUtil.simpleClassName(AddressedEnvelope.class) + '<' +
110                     StringUtil.simpleClassName(Buffer.class) + ", " +
111                     StringUtil.simpleClassName(InetSocketAddress.class) + ">, " +
112                     StringUtil.simpleClassName(Buffer.class) + ')';
113 
114     private static final String EXPECTED_TYPES_DOMAIN =
115             " (expected: " +
116                     StringUtil.simpleClassName(DatagramPacket.class) + ", " +
117                     StringUtil.simpleClassName(AddressedEnvelope.class) + '<' +
118                     StringUtil.simpleClassName(Buffer.class) + ", " +
119                     StringUtil.simpleClassName(DomainSocketAddress.class) + ">, " +
120                     StringUtil.simpleClassName(Buffer.class) + ')';
121 
122     private static final Predicate<RecvBufferAllocator.Handle> TRUE_SUPPLIER = h -> true;
123 
124     private volatile boolean connected;
125     private volatile boolean inputShutdown;
126     private volatile boolean outputShutdown;
127 
128     private boolean activeOnOpen;
129 
130     public KQueueDatagramChannel(EventLoop eventLoop) {
131         this(eventLoop, null);
132     }
133 
134     public KQueueDatagramChannel(EventLoop eventLoop, ProtocolFamily protocolFamily) {
135         super(null, eventLoop, METADATA, new FixedRecvBufferAllocator(2048),
136                 BsdSocket.newDatagramSocket(protocolFamily), false);
137     }
138 
139     public KQueueDatagramChannel(EventLoop eventLoop, int fd, ProtocolFamily protocolFamily) {
140         this(eventLoop, new BsdSocket(fd, SocketProtocolFamily.of(protocolFamily)), true);
141     }
142 
143     KQueueDatagramChannel(EventLoop eventLoop, BsdSocket socket, boolean active) {
144         super(null, eventLoop, METADATA, new FixedRecvBufferAllocator(2048), socket, active);
145     }
146 
147     @SuppressWarnings("unchecked")
148     @Override
149     protected <T> T getExtendedOption(ChannelOption<T> option) {
150         if (isSupported(socket.protocolFamily(), option)) {
151             if (option == SO_BROADCAST) {
152                 return (T) Boolean.valueOf(isBroadcast());
153             }
154             if (option == SO_RCVBUF) {
155                 return (T) Integer.valueOf(getReceiveBufferSize());
156             }
157             if (option == SO_SNDBUF) {
158                 return (T) Integer.valueOf(getSendBufferSize());
159             }
160             if (option == SO_REUSEADDR) {
161                 return (T) Boolean.valueOf(isReuseAddress());
162             }
163             if (option == IP_TOS) {
164                 return (T) Integer.valueOf(getTrafficClass());
165             }
166             if (option == DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION) {
167                 return (T) Boolean.valueOf(activeOnOpen);
168             }
169             if (option == SO_REUSEPORT) {
170                 return (T) Boolean.valueOf(isReusePort());
171             }
172         }
173         return super.getExtendedOption(option);
174     }
175 
176     @Override
177     protected  <T> void setExtendedOption(ChannelOption<T> option, T value) {
178         if (isSupported(socket.protocolFamily(), option)) {
179             if (option == SO_BROADCAST) {
180                 setBroadcast((Boolean) value);
181             } else if (option == SO_RCVBUF) {
182                 setReceiveBufferSize((Integer) value);
183             } else if (option == SO_SNDBUF) {
184                 setSendBufferSize((Integer) value);
185             } else if (option == SO_REUSEADDR) {
186                 setReuseAddress((Boolean) value);
187             } else if (option == IP_TOS) {
188                 setTrafficClass((Integer) value);
189             } else if (option == DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION) {
190                 setActiveOnOpen((Boolean) value);
191             } else if (option == SO_REUSEPORT) {
192                 setReusePort((Boolean) value);
193             }
194         } else {
195             super.setExtendedOption(option, value);
196         }
197     }
198 
199     private boolean isSupported(SocketProtocolFamily protocolFamily, ChannelOption<?> option) {
200         if (protocolFamily == SocketProtocolFamily.UNIX) {
201             return SUPPORTED_OPTIONS_DOMAIN_SOCKET.contains(option);
202         }
203         return SUPPORTED_OPTIONS.contains(option);
204     }
205 
206     @Override
207     protected boolean isExtendedOptionSupported(ChannelOption<?> option) {
208         return isSupported(socket.protocolFamily(), option) || super.isExtendedOptionSupported(option);
209     }
210 
211     private static Set<ChannelOption<?>> supportedOptions() {
212         return newSupportedIdentityOptionsSet(SO_BROADCAST, SO_RCVBUF, SO_SNDBUF, SO_REUSEADDR, IP_TOS,
213                 DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION, SO_REUSEPORT);
214     }
215 
216     private static Set<ChannelOption<?>> supportedOptionsDomainSocket() {
217         return newSupportedIdentityOptionsSet(SO_SNDBUF, SO_RCVBUF, DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION);
218     }
219 
220     private void setActiveOnOpen(boolean activeOnOpen) {
221         if (isRegistered()) {
222             throw new IllegalStateException("Can only changed before channel was registered");
223         }
224         this.activeOnOpen = activeOnOpen;
225     }
226 
227     private boolean getActiveOnOpen() {
228         return activeOnOpen;
229     }
230 
231     /**
232      * Returns {@code true} if the SO_REUSEPORT option is set.
233      */
234     private boolean isReusePort() {
235         try {
236             return socket.isReusePort();
237         } catch (IOException e) {
238             throw new ChannelException(e);
239         }
240     }
241 
242     /**
243      * Set the SO_REUSEPORT option on the underlying Channel. This will allow to bind multiple
244      * {@link KQueueSocketChannel}s to the same port and so accept connections with multiple threads.
245      *
246      * Be aware this method needs be called before {@link KQueueDatagramChannel#bind(java.net.SocketAddress)} to have
247      * any affect.
248      */
249     private void setReusePort(boolean reusePort) {
250         try {
251             socket.setReusePort(reusePort);
252         } catch (IOException e) {
253             throw new ChannelException(e);
254         }
255     }
256 
257     private int getSendBufferSize() {
258         try {
259             return socket.getSendBufferSize();
260         } catch (IOException e) {
261             throw new ChannelException(e);
262         }
263     }
264 
265     public void setSendBufferSize(int sendBufferSize) {
266         try {
267             socket.setSendBufferSize(sendBufferSize);
268         } catch (IOException e) {
269             throw new ChannelException(e);
270         }
271     }
272 
273     private int getReceiveBufferSize() {
274         try {
275             return socket.getReceiveBufferSize();
276         } catch (IOException e) {
277             throw new ChannelException(e);
278         }
279     }
280 
281     private void setReceiveBufferSize(int receiveBufferSize) {
282         try {
283             socket.setReceiveBufferSize(receiveBufferSize);
284         } catch (IOException e) {
285             throw new ChannelException(e);
286         }
287     }
288 
289     private int getTrafficClass() {
290         try {
291             return socket.getTrafficClass();
292         } catch (IOException e) {
293             throw new ChannelException(e);
294         }
295     }
296 
297     private void setTrafficClass(int trafficClass) {
298         try {
299             socket.setTrafficClass(trafficClass);
300         } catch (IOException e) {
301             throw new ChannelException(e);
302         }
303     }
304 
305     private boolean isReuseAddress() {
306         try {
307             return socket.isReuseAddress();
308         } catch (IOException e) {
309             throw new ChannelException(e);
310         }
311     }
312 
313     private void setReuseAddress(boolean reuseAddress) {
314         try {
315             socket.setReuseAddress(reuseAddress);
316         } catch (IOException e) {
317             throw new ChannelException(e);
318         }
319     }
320 
321     private boolean isBroadcast() {
322         try {
323             return socket.isBroadcast();
324         } catch (IOException e) {
325             throw new ChannelException(e);
326         }
327     }
328 
329     private void setBroadcast(boolean broadcast) {
330         try {
331             socket.setBroadcast(broadcast);
332         } catch (IOException e) {
333             throw new ChannelException(e);
334         }
335     }
336 
337     @Override
338     public boolean isActive() {
339         return socket.isOpen() && (getActiveOnOpen() && isRegistered() || active);
340     }
341 
342     @Override
343     public boolean isConnected() {
344         return connected;
345     }
346 
347     @Override
348     protected void doBind(SocketAddress localAddress) throws Exception {
349         super.doBind(localAddress);
350         active = true;
351     }
352 
353     private boolean doWriteMessage(Object msg) throws Exception {
354         final Object data;
355         final SocketAddress remoteAddress;
356         if (msg instanceof AddressedEnvelope) {
357             @SuppressWarnings("unchecked")
358             AddressedEnvelope<?, SocketAddress> envelope = (AddressedEnvelope<?, SocketAddress>) msg;
359             data = envelope.content();
360             remoteAddress = envelope.recipient();
361         } else {
362             data = msg;
363             remoteAddress = null;
364         }
365 
366         return doWriteBufferMessage((Buffer) data, remoteAddress);
367     }
368 
369     private boolean doWriteBufferMessage(Buffer data, SocketAddress remoteAddress) throws IOException {
370         final int initialReadableBytes = data.readableBytes();
371         if (initialReadableBytes == 0) {
372             return true;
373         }
374 
375         if (data.countReadableComponents() > 1) {
376             IovArray array = registration().cleanArray();
377             data.forEachReadable(0, array);
378             int count = array.count();
379             assert count != 0;
380 
381             final long writtenBytes;
382             if (remoteAddress == null) {
383                 writtenBytes = socket.writevAddresses(array.memoryAddress(0), count);
384             } else {
385                 if (socket.protocolFamily() == SocketProtocolFamily.UNIX) {
386                     writtenBytes = socket.sendToAddressesDomainSocket(
387                             array.memoryAddress(0), count,
388                             ((DomainSocketAddress) remoteAddress).path().getBytes(UTF_8));
389                 } else {
390                     InetSocketAddress inetSocketAddress = (InetSocketAddress) remoteAddress;
391                     writtenBytes = socket.sendToAddresses(array.memoryAddress(0), count,
392                             inetSocketAddress.getAddress(), inetSocketAddress.getPort());
393                 }
394             }
395             return writtenBytes > 0;
396         } else {
397             if (remoteAddress == null) {
398                 data.forEachReadable(0, (index, component) -> {
399                     int written = socket.writeAddress(component.readableNativeAddress(), 0, component.readableBytes());
400                     component.skipReadableBytes(written);
401                     return false;
402                 });
403             } else {
404                 if (socket.protocolFamily() == SocketProtocolFamily.UNIX) {
405                     byte[] path = ((DomainSocketAddress) remoteAddress).path().getBytes(UTF_8);
406                     data.forEachReadable(0, (index, component) -> {
407                         int written = socket.sendToAddressDomainSocket(
408                                 component.readableNativeAddress(), 0, component.readableBytes(), path);
409                         component.skipReadableBytes(written);
410                         return false;
411                     });
412                 } else {
413                     InetSocketAddress inetSocketAddress = (InetSocketAddress) remoteAddress;
414                     data.forEachReadable(0, (index, component) -> {
415                         int written = socket.sendToAddress(component.readableNativeAddress(), 0,
416                                 component.readableBytes(), inetSocketAddress.getAddress(), inetSocketAddress.getPort());
417                         component.skipReadableBytes(written);
418                         return false;
419                     });
420                 }
421             }
422             return data.readableBytes() < initialReadableBytes;
423         }
424     }
425 
426     @Override
427     protected Object filterOutboundMessage(Object msg) {
428         if (socket.protocolFamily() == SocketProtocolFamily.UNIX) {
429             return filterOutboundMessage0(msg, DomainSocketAddress.class, EXPECTED_TYPES_DOMAIN);
430         } else {
431             return filterOutboundMessage0(msg, InetSocketAddress.class, EXPECTED_TYPES);
432         }
433     }
434 
435     private Object filterOutboundMessage0(Object msg, Class<? extends SocketAddress> recipientClass,
436                                             String expectedTypes) {
437         if (msg instanceof DatagramPacket) {
438             DatagramPacket packet = (DatagramPacket) msg;
439             if (recipientClass.isInstance(packet.recipient())) {
440                 Buffer content = packet.content();
441                 return UnixChannelUtil.isBufferCopyNeededForWrite(content)?
442                         new DatagramPacket(newDirectBuffer(packet, content), packet.recipient()) : msg;
443             }
444         } else if (msg instanceof Buffer) {
445             Buffer buf = (Buffer) msg;
446             return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf) : buf;
447         } else if (msg instanceof AddressedEnvelope) {
448             @SuppressWarnings("unchecked")
449             AddressedEnvelope<Object, SocketAddress> e = (AddressedEnvelope<Object, SocketAddress>) msg;
450             SocketAddress recipient = e.recipient();
451             if (recipient == null || recipientClass.isInstance(recipient)) {
452                 if (e.content() instanceof Buffer) {
453                     Buffer buf = (Buffer) e.content();
454                     if (UnixChannelUtil.isBufferCopyNeededForWrite(buf)) {
455                         try {
456                             return new DefaultBufferAddressedEnvelope<>(newDirectBuffer(null, buf), recipient);
457                         } finally {
458                             SilentDispose.dispose(e, logger); // Don't fail here, because we allocated a buffer.
459                         }
460                     }
461                     return e;
462                 }
463             }
464         }
465         throw new UnsupportedOperationException(
466                 "unsupported message type: " + StringUtil.simpleClassName(msg) + expectedTypes);
467     }
468 
469     @Override
470     protected void doDisconnect() throws Exception {
471         socket.disconnect();
472         connected = active = false;
473         resetCachedAddresses();
474     }
475 
476     @Override
477     protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
478         if (super.doConnect(remoteAddress, localAddress)) {
479             connected = true;
480             return true;
481         }
482         return false;
483     }
484 
485     @Override
486     protected void doClose() throws Exception {
487         super.doClose();
488         connected = false;
489     }
490 
491     @Override
492     void readReady(RecvBufferAllocator.Handle allocHandle, BufferAllocator recvBufferAllocator,
493                    Predicate<RecvBufferAllocator.Handle> maybeMoreData) {
494         final ChannelPipeline pipeline = pipeline();
495 
496         Throwable exception = null;
497         Buffer buffer = null;
498         try {
499             boolean connected = isConnected();
500             do {
501                 buffer = allocHandle.allocate(recvBufferAllocator);
502                 allocHandle.attemptedBytesRead(buffer.writableBytes());
503 
504                 final DatagramPacket packet;
505                 if (connected) {
506                     try {
507                         allocHandle.lastBytesRead(doReadBytes(buffer));
508                     } catch (Errors.NativeIoException e) {
509                         // We need to correctly translate connect errors to match NIO behaviour.
510                         if (e.expectedErr() == Errors.ERROR_ECONNREFUSED_NEGATIVE) {
511                             PortUnreachableException error = new PortUnreachableException(e.getMessage());
512                             error.initCause(e);
513                             throw error;
514                         }
515                         throw e;
516                     }
517                     if (allocHandle.lastBytesRead() <= 0) {
518                         // nothing was read, release the buffer.
519                         buffer.close();
520                         buffer = null;
521                         break;
522                     }
523                     packet = new DatagramPacket(buffer, localAddress(), remoteAddress());
524                 } else {
525                     SocketAddress localAddress = null;
526                     SocketAddress remoteAddress = null;
527                     int bytesRead = 0;
528                     if (socket.protocolFamily() == SocketProtocolFamily.UNIX) {
529                         final RecvFromAddressDomainSocket recvFrom = new RecvFromAddressDomainSocket(socket);
530                         buffer.forEachWritable(0, recvFrom);
531                         DomainDatagramSocketAddress recvAddress = recvFrom.remoteAddress();
532                         if (recvAddress != null) {
533                             remoteAddress = recvAddress;
534                             bytesRead = recvAddress.receivedAmount();
535                             localAddress = recvAddress.localAddress();
536                         }
537                     } else {
538                         try (var iterator = buffer.forEachWritable()) {
539                             var component = iterator.first();
540                             long addr = component.writableNativeAddress();
541                             DatagramSocketAddress datagramSocketAddress;
542                             if (addr != 0) {
543                                 // has a memory address so use optimized call
544                                 datagramSocketAddress = socket.recvFromAddress(addr, 0, component.writableBytes());
545                             } else {
546                                 ByteBuffer nioData = component.writableBuffer();
547                                 datagramSocketAddress = socket.recvFrom(
548                                         nioData, nioData.position(), nioData.limit());
549                             }
550                             if (datagramSocketAddress != null) {
551                                 remoteAddress = datagramSocketAddress;
552                                 localAddress = datagramSocketAddress.localAddress();
553                                 bytesRead = datagramSocketAddress.receivedAmount();
554                             }
555                         }
556                     }
557 
558                     if (remoteAddress == null) {
559                         allocHandle.lastBytesRead(-1);
560                         buffer.close();
561                         break;
562                     }
563                     if (localAddress == null) {
564                         localAddress = localAddress();
565                     }
566                     allocHandle.lastBytesRead(bytesRead);
567                     buffer.skipWritableBytes(allocHandle.lastBytesRead());
568 
569                     packet = new DatagramPacket(buffer, localAddress, remoteAddress);
570                 }
571 
572                 allocHandle.incMessagesRead(1);
573 
574                 readPending = false;
575                 pipeline.fireChannelRead(packet);
576 
577                 buffer = null;
578 
579             // We use the TRUE_SUPPLIER as it is also ok to read less then what we did try to read (as long
580             // as we read anything).
581             } while (allocHandle.continueReading(isAutoRead(), TRUE_SUPPLIER));
582         } catch (Throwable t) {
583             if (buffer != null) {
584                 buffer.close();
585             }
586             exception = t;
587         }
588 
589         allocHandle.readComplete();
590         pipeline.fireChannelReadComplete();
591 
592         if (exception != null) {
593             pipeline.fireChannelExceptionCaught(exception);
594         } else {
595             readIfIsAutoRead();
596         }
597     }
598 
599     private <V> Future<V> newMulticastNotSupportedFuture() {
600         return newFailedFuture(new UnsupportedOperationException("Multicast not supported"));
601     }
602 
603     @Override
604     public Future<Void> joinGroup(InetAddress multicastAddress) {
605         requireNonNull(multicastAddress, "multicast");
606         return newMulticastNotSupportedFuture();
607     }
608 
609     @Override
610     public Future<Void> joinGroup(
611             InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
612         requireNonNull(multicastAddress, "multicastAddress");
613         requireNonNull(networkInterface, "networkInterface");
614 
615         return newMulticastNotSupportedFuture();
616     }
617 
618     @Override
619     public Future<Void> leaveGroup(InetAddress multicastAddress) {
620         requireNonNull(multicastAddress, "multicast");
621         return newMulticastNotSupportedFuture();
622     }
623 
624     @Override
625     public Future<Void> leaveGroup(
626             InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
627         requireNonNull(multicastAddress, "multicastAddress");
628         requireNonNull(networkInterface, "networkInterface");
629 
630         return newMulticastNotSupportedFuture();
631     }
632 
633     @Override
634     public Future<Void> block(
635             InetAddress multicastAddress, NetworkInterface networkInterface,
636             InetAddress sourceToBlock) {
637         requireNonNull(multicastAddress, "multicastAddress");
638         requireNonNull(sourceToBlock, "sourceToBlock");
639         requireNonNull(networkInterface, "networkInterface");
640 
641         return newMulticastNotSupportedFuture();
642     }
643 
644     @Override
645     public Future<Void> block(InetAddress multicastAddress, InetAddress sourceToBlock) {
646         requireNonNull(multicastAddress, "multicastAddress");
647         requireNonNull(sourceToBlock, "sourceToBlock");
648 
649         return newMulticastNotSupportedFuture();
650     }
651 
652     @Override
653     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
654         int maxMessagesPerWrite = getMaxMessagesPerWrite();
655         while (maxMessagesPerWrite > 0) {
656             Object msg = in.current();
657             if (msg == null) {
658                 break;
659             }
660 
661             try {
662                 boolean done = false;
663                 for (int i = getWriteSpinCount(); i > 0; --i) {
664                     if (doWriteMessage(msg)) {
665                         done = true;
666                         break;
667                     }
668                 }
669 
670                 if (done) {
671                     in.remove();
672                     maxMessagesPerWrite--;
673                 } else {
674                     break;
675                 }
676             } catch (IOException e) {
677                 maxMessagesPerWrite--;
678 
679                 // Continue on write error as a DatagramChannel can write to multiple remote peers
680                 //
681                 // See https://github.com/netty/netty/issues/2665
682                 in.remove(e);
683             }
684         }
685 
686         // Whether all messages were written or not.
687         writeFilter(!in.isEmpty());
688     }
689 
690     @Override
691     protected void doShutdown(ChannelShutdownDirection direction) {
692         switch (direction) {
693             case Inbound:
694                 inputShutdown = true;
695                 break;
696             case Outbound:
697                 outputShutdown = true;
698                 break;
699             default:
700                 throw new AssertionError();
701         }
702     }
703 
704     @Override
705     public boolean isShutdown(ChannelShutdownDirection direction) {
706         if (!isActive()) {
707             return true;
708         }
709         switch (direction) {
710             case Inbound:
711                 return inputShutdown;
712             case Outbound:
713                 return outputShutdown;
714             default:
715                 throw new AssertionError();
716         }
717     }
718 }