View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.socket.oio;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufUtil;
20  import io.netty.channel.AddressedEnvelope;
21  import io.netty.channel.Channel;
22  import io.netty.channel.ChannelException;
23  import io.netty.channel.ChannelFuture;
24  import io.netty.channel.ChannelMetadata;
25  import io.netty.channel.ChannelOption;
26  import io.netty.channel.ChannelOutboundBuffer;
27  import io.netty.channel.ChannelPromise;
28  import io.netty.channel.RecvByteBufAllocator;
29  import io.netty.channel.oio.AbstractOioMessageChannel;
30  import io.netty.channel.socket.DatagramChannel;
31  import io.netty.channel.socket.DatagramChannelConfig;
32  import io.netty.channel.socket.DatagramPacket;
33  import io.netty.util.internal.EmptyArrays;
34  import io.netty.util.internal.PlatformDependent;
35  import io.netty.util.internal.StringUtil;
36  import io.netty.util.internal.logging.InternalLogger;
37  import io.netty.util.internal.logging.InternalLoggerFactory;
38  
39  import java.io.IOException;
40  import java.net.InetAddress;
41  import java.net.InetSocketAddress;
42  import java.net.MulticastSocket;
43  import java.net.NetworkInterface;
44  import java.net.SocketAddress;
45  import java.net.SocketException;
46  import java.net.SocketTimeoutException;
47  import java.nio.channels.NotYetConnectedException;
48  import java.nio.channels.UnresolvedAddressException;
49  import java.util.List;
50  import java.util.Locale;
51  
52  /**
53   * An OIO datagram {@link Channel} that sends and receives an
54   * {@link AddressedEnvelope AddressedEnvelope<ByteBuf, SocketAddress>}.
55   *
56   * @see AddressedEnvelope
57   * @see DatagramPacket
58   * @deprecated use NIO / EPOLL / KQUEUE transport.
59   */
60  @Deprecated
61  public class OioDatagramChannel extends AbstractOioMessageChannel
62                                  implements DatagramChannel {
63  
64      private static final InternalLogger logger = InternalLoggerFactory.getInstance(OioDatagramChannel.class);
65  
66      private static final ChannelMetadata METADATA = new ChannelMetadata(true);
67      private static final String EXPECTED_TYPES =
68              " (expected: " + StringUtil.simpleClassName(DatagramPacket.class) + ", " +
69              StringUtil.simpleClassName(AddressedEnvelope.class) + '<' +
70              StringUtil.simpleClassName(ByteBuf.class) + ", " +
71              StringUtil.simpleClassName(SocketAddress.class) + ">, " +
72              StringUtil.simpleClassName(ByteBuf.class) + ')';
73  
74      private final MulticastSocket socket;
75      private final OioDatagramChannelConfig config;
76      private final java.net.DatagramPacket tmpPacket = new java.net.DatagramPacket(EmptyArrays.EMPTY_BYTES, 0);
77  
78      private static MulticastSocket newSocket() {
79          try {
80              return new MulticastSocket(null);
81          } catch (Exception e) {
82              throw new ChannelException("failed to create a new socket", e);
83          }
84      }
85  
86      /**
87       * Create a new instance with an new {@link MulticastSocket}.
88       */
89      public OioDatagramChannel() {
90          this(newSocket());
91      }
92  
93      /**
94       * Create a new instance from the given {@link MulticastSocket}.
95       *
96       * @param socket    the {@link MulticastSocket} which is used by this instance
97       */
98      public OioDatagramChannel(MulticastSocket socket) {
99          super(null);
100 
101         boolean success = false;
102         try {
103             socket.setSoTimeout(SO_TIMEOUT);
104             socket.setBroadcast(false);
105             success = true;
106         } catch (SocketException e) {
107             throw new ChannelException(
108                     "Failed to configure the datagram socket timeout.", e);
109         } finally {
110             if (!success) {
111                 socket.close();
112             }
113         }
114 
115         this.socket = socket;
116         config = new DefaultOioDatagramChannelConfig(this, socket);
117     }
118 
119     @Override
120     public ChannelMetadata metadata() {
121         return METADATA;
122     }
123 
124     /**
125      * {@inheritDoc}
126      *
127      * This can be safely cast to {@link OioDatagramChannelConfig}.
128      */
129     @Override
130     // TODO: Change return type to OioDatagramChannelConfig in next major release
131     public DatagramChannelConfig config() {
132         return config;
133     }
134 
135     @Override
136     public boolean isOpen() {
137         return !socket.isClosed();
138     }
139 
140     @Override
141     @SuppressWarnings("deprecation")
142     public boolean isActive() {
143         return isOpen()
144             && (config.getOption(ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION) && isRegistered()
145                  || socket.isBound());
146     }
147 
148     @Override
149     public boolean isConnected() {
150         return socket.isConnected();
151     }
152 
153     @Override
154     protected SocketAddress localAddress0() {
155         return socket.getLocalSocketAddress();
156     }
157 
158     @Override
159     protected SocketAddress remoteAddress0() {
160         return socket.getRemoteSocketAddress();
161     }
162 
163     @Override
164     protected void doBind(SocketAddress localAddress) throws Exception {
165         socket.bind(localAddress);
166     }
167 
168     @Override
169     public InetSocketAddress localAddress() {
170         return (InetSocketAddress) super.localAddress();
171     }
172 
173     @Override
174     public InetSocketAddress remoteAddress() {
175         return (InetSocketAddress) super.remoteAddress();
176     }
177 
178     @Override
179     protected void doConnect(SocketAddress remoteAddress,
180             SocketAddress localAddress) throws Exception {
181         if (localAddress != null) {
182             socket.bind(localAddress);
183         }
184 
185         boolean success = false;
186         try {
187             socket.connect(remoteAddress);
188             success = true;
189         } finally {
190             if (!success) {
191                 try {
192                     socket.close();
193                 } catch (Throwable t) {
194                     logger.warn("Failed to close a socket.", t);
195                 }
196             }
197         }
198     }
199 
200     @Override
201     protected void doDisconnect() throws Exception {
202         socket.disconnect();
203     }
204 
205     @Override
206     protected void doClose() throws Exception {
207         socket.close();
208     }
209 
210     @Override
211     protected int doReadMessages(List<Object> buf) throws Exception {
212         DatagramChannelConfig config = config();
213         final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
214 
215         ByteBuf data = config.getAllocator().heapBuffer(allocHandle.guess());
216         boolean free = true;
217         try {
218             // Ensure we null out the address which may have been set before.
219             tmpPacket.setAddress(null);
220             tmpPacket.setData(data.array(), data.arrayOffset(), data.capacity());
221             socket.receive(tmpPacket);
222 
223             InetSocketAddress remoteAddr = (InetSocketAddress) tmpPacket.getSocketAddress();
224 
225             allocHandle.lastBytesRead(tmpPacket.getLength());
226             buf.add(new DatagramPacket(data.writerIndex(allocHandle.lastBytesRead()), localAddress(), remoteAddr));
227             free = false;
228             return 1;
229         } catch (SocketTimeoutException e) {
230             // Expected
231             return 0;
232         } catch (SocketException e) {
233             if (!e.getMessage().toLowerCase(Locale.US).contains("socket closed")) {
234                 throw e;
235             }
236             return -1;
237         } catch (Throwable cause) {
238             PlatformDependent.throwException(cause);
239             return -1;
240         } finally {
241             if (free) {
242                 data.release();
243             }
244         }
245     }
246 
247     @Override
248     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
249         for (;;) {
250             final Object o = in.current();
251             if (o == null) {
252                 break;
253             }
254 
255             final ByteBuf data;
256             final SocketAddress remoteAddress;
257             if (o instanceof AddressedEnvelope) {
258                 @SuppressWarnings("unchecked")
259                 AddressedEnvelope<ByteBuf, SocketAddress> envelope = (AddressedEnvelope<ByteBuf, SocketAddress>) o;
260                 remoteAddress = envelope.recipient();
261                 data = envelope.content();
262             } else {
263                 data = (ByteBuf) o;
264                 remoteAddress = null;
265             }
266 
267             final int length = data.readableBytes();
268             try {
269                 if (remoteAddress != null) {
270                     tmpPacket.setSocketAddress(remoteAddress);
271                 } else {
272                     if (!isConnected()) {
273                         // If not connected we should throw a NotYetConnectedException() to be consistent with
274                         // NioDatagramChannel
275                         throw new NotYetConnectedException();
276                     }
277                     // Ensure we null out the address which may have been set before.
278                     tmpPacket.setAddress(null);
279                 }
280                 if (data.hasArray()) {
281                     tmpPacket.setData(data.array(), data.arrayOffset() + data.readerIndex(), length);
282                 } else {
283                     tmpPacket.setData(ByteBufUtil.getBytes(data, data.readerIndex(), length));
284                 }
285                 socket.send(tmpPacket);
286                 in.remove();
287             } catch (Exception e) {
288                 // Continue on write error as a DatagramChannel can write to multiple remote peers
289                 //
290                 // See https://github.com/netty/netty/issues/2665
291                 in.remove(e);
292             }
293         }
294     }
295 
296     private static void checkUnresolved(AddressedEnvelope<?, ?> envelope) {
297         if (envelope.recipient() instanceof InetSocketAddress
298                 && (((InetSocketAddress) envelope.recipient()).isUnresolved())) {
299             throw new UnresolvedAddressException();
300         }
301     }
302 
303     @Override
304     protected Object filterOutboundMessage(Object msg) {
305         if (msg instanceof DatagramPacket) {
306             checkUnresolved((DatagramPacket) msg);
307             return msg;
308         }
309 
310         if (msg instanceof ByteBuf) {
311             return msg;
312         }
313 
314         if (msg instanceof AddressedEnvelope) {
315             @SuppressWarnings("unchecked")
316             AddressedEnvelope<Object, SocketAddress> e = (AddressedEnvelope<Object, SocketAddress>) msg;
317             checkUnresolved(e);
318             if (e.content() instanceof ByteBuf) {
319                 return msg;
320             }
321         }
322 
323         throw new UnsupportedOperationException(
324                 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
325     }
326 
327     @Override
328     public ChannelFuture joinGroup(InetAddress multicastAddress) {
329         return joinGroup(multicastAddress, newPromise());
330     }
331 
332     @Override
333     public ChannelFuture joinGroup(InetAddress multicastAddress, ChannelPromise promise) {
334         ensureBound();
335         try {
336             socket.joinGroup(multicastAddress);
337             promise.setSuccess();
338         } catch (IOException e) {
339             promise.setFailure(e);
340         }
341         return promise;
342     }
343 
344     @Override
345     public ChannelFuture joinGroup(InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
346         return joinGroup(multicastAddress, networkInterface, newPromise());
347     }
348 
349     @Override
350     public ChannelFuture joinGroup(
351             InetSocketAddress multicastAddress, NetworkInterface networkInterface,
352             ChannelPromise promise) {
353         ensureBound();
354         try {
355             socket.joinGroup(multicastAddress, networkInterface);
356             promise.setSuccess();
357         } catch (IOException e) {
358             promise.setFailure(e);
359         }
360         return promise;
361     }
362 
363     @Override
364     public ChannelFuture joinGroup(
365             InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
366         return newFailedFuture(new UnsupportedOperationException());
367     }
368 
369     @Override
370     public ChannelFuture joinGroup(
371             InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source,
372             ChannelPromise promise) {
373         promise.setFailure(new UnsupportedOperationException());
374         return promise;
375     }
376 
377     private void ensureBound() {
378         if (!isActive()) {
379             throw new IllegalStateException(
380                     DatagramChannel.class.getName() +
381                     " must be bound to join a group.");
382         }
383     }
384 
385     @Override
386     public ChannelFuture leaveGroup(InetAddress multicastAddress) {
387         return leaveGroup(multicastAddress, newPromise());
388     }
389 
390     @Override
391     public ChannelFuture leaveGroup(InetAddress multicastAddress, ChannelPromise promise) {
392         try {
393             socket.leaveGroup(multicastAddress);
394             promise.setSuccess();
395         } catch (IOException e) {
396             promise.setFailure(e);
397         }
398         return promise;
399     }
400 
401     @Override
402     public ChannelFuture leaveGroup(
403             InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
404         return leaveGroup(multicastAddress, networkInterface, newPromise());
405     }
406 
407     @Override
408     public ChannelFuture leaveGroup(
409             InetSocketAddress multicastAddress, NetworkInterface networkInterface,
410             ChannelPromise promise) {
411         try {
412             socket.leaveGroup(multicastAddress, networkInterface);
413             promise.setSuccess();
414         } catch (IOException e) {
415             promise.setFailure(e);
416         }
417         return promise;
418     }
419 
420     @Override
421     public ChannelFuture leaveGroup(
422             InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
423         return newFailedFuture(new UnsupportedOperationException());
424     }
425 
426     @Override
427     public ChannelFuture leaveGroup(
428             InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source,
429             ChannelPromise promise) {
430         promise.setFailure(new UnsupportedOperationException());
431         return promise;
432     }
433 
434     @Override
435     public ChannelFuture block(InetAddress multicastAddress,
436             NetworkInterface networkInterface, InetAddress sourceToBlock) {
437         return newFailedFuture(new UnsupportedOperationException());
438     }
439 
440     @Override
441     public ChannelFuture block(InetAddress multicastAddress,
442             NetworkInterface networkInterface, InetAddress sourceToBlock,
443             ChannelPromise promise) {
444         promise.setFailure(new UnsupportedOperationException());
445         return promise;
446     }
447 
448     @Override
449     public ChannelFuture block(InetAddress multicastAddress,
450             InetAddress sourceToBlock) {
451         return newFailedFuture(new UnsupportedOperationException());
452     }
453 
454     @Override
455     public ChannelFuture block(InetAddress multicastAddress,
456             InetAddress sourceToBlock, ChannelPromise promise) {
457         promise.setFailure(new UnsupportedOperationException());
458         return promise;
459     }
460 }