View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.socket.oio;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.channel.Channel;
20  import io.netty.channel.ChannelException;
21  import io.netty.channel.ChannelFuture;
22  import io.netty.channel.ChannelFutureListener;
23  import io.netty.channel.ChannelPromise;
24  import io.netty.channel.ConnectTimeoutException;
25  import io.netty.channel.EventLoop;
26  import io.netty.channel.oio.OioByteStreamChannel;
27  import io.netty.channel.socket.ServerSocketChannel;
28  import io.netty.channel.socket.SocketChannel;
29  import io.netty.util.internal.SocketUtils;
30  import io.netty.util.internal.UnstableApi;
31  import io.netty.util.internal.logging.InternalLogger;
32  import io.netty.util.internal.logging.InternalLoggerFactory;
33  
34  import java.io.IOException;
35  import java.net.InetSocketAddress;
36  import java.net.Socket;
37  import java.net.SocketAddress;
38  import java.net.SocketTimeoutException;
39  
40  /**
41   * A {@link SocketChannel} which is using Old-Blocking-IO
42   */
43  public class OioSocketChannel extends OioByteStreamChannel implements SocketChannel {
44  
45      private static final InternalLogger logger = InternalLoggerFactory.getInstance(OioSocketChannel.class);
46  
47      private final Socket socket;
48      private final OioSocketChannelConfig config;
49  
50      /**
51       * Create a new instance with an new {@link Socket}
52       */
53      public OioSocketChannel() {
54          this(new Socket());
55      }
56  
57      /**
58       * Create a new instance from the given {@link Socket}
59       *
60       * @param socket    the {@link Socket} which is used by this instance
61       */
62      public OioSocketChannel(Socket socket) {
63          this(null, socket);
64      }
65  
66      /**
67       * Create a new instance from the given {@link Socket}
68       *
69       * @param parent    the parent {@link Channel} which was used to create this instance. This can be null if the
70       *                  {@link} has no parent as it was created by your self.
71       * @param socket    the {@link Socket} which is used by this instance
72       */
73      public OioSocketChannel(Channel parent, Socket socket) {
74          super(parent);
75          this.socket = socket;
76          config = new DefaultOioSocketChannelConfig(this, socket);
77  
78          boolean success = false;
79          try {
80              if (socket.isConnected()) {
81                  activate(socket.getInputStream(), socket.getOutputStream());
82              }
83              socket.setSoTimeout(SO_TIMEOUT);
84              success = true;
85          } catch (Exception e) {
86              throw new ChannelException("failed to initialize a socket", e);
87          } finally {
88              if (!success) {
89                  try {
90                      socket.close();
91                  } catch (IOException e) {
92                      logger.warn("Failed to close a socket.", e);
93                  }
94              }
95          }
96      }
97  
98      @Override
99      public ServerSocketChannel parent() {
100         return (ServerSocketChannel) super.parent();
101     }
102 
103     @Override
104     public OioSocketChannelConfig config() {
105         return config;
106     }
107 
108     @Override
109     public boolean isOpen() {
110         return !socket.isClosed();
111     }
112 
113     @Override
114     public boolean isActive() {
115         return !socket.isClosed() && socket.isConnected();
116     }
117 
118     @Override
119     public boolean isOutputShutdown() {
120         return socket.isOutputShutdown() || !isActive();
121     }
122 
123     @Override
124     public boolean isInputShutdown() {
125         return socket.isInputShutdown() || !isActive();
126     }
127 
128     @Override
129     public boolean isShutdown() {
130         return socket.isInputShutdown() && socket.isOutputShutdown() || !isActive();
131     }
132 
133     @UnstableApi
134     @Override
135     protected final void doShutdownOutput() throws Exception {
136         shutdownOutput0();
137     }
138 
139     @Override
140     public ChannelFuture shutdownOutput() {
141         return shutdownOutput(newPromise());
142     }
143 
144     @Override
145     public ChannelFuture shutdownInput() {
146         return shutdownInput(newPromise());
147     }
148 
149     @Override
150     public ChannelFuture shutdown() {
151         return shutdown(newPromise());
152     }
153 
154     @Override
155     protected int doReadBytes(ByteBuf buf) throws Exception {
156         if (socket.isClosed()) {
157             return -1;
158         }
159         try {
160             return super.doReadBytes(buf);
161         } catch (SocketTimeoutException ignored) {
162             return 0;
163         }
164     }
165 
166     @Override
167     public ChannelFuture shutdownOutput(final ChannelPromise promise) {
168         EventLoop loop = eventLoop();
169         if (loop.inEventLoop()) {
170             shutdownOutput0(promise);
171         } else {
172             loop.execute(new Runnable() {
173                 @Override
174                 public void run() {
175                     shutdownOutput0(promise);
176                 }
177             });
178         }
179         return promise;
180     }
181 
182     private void shutdownOutput0(ChannelPromise promise) {
183         try {
184             shutdownOutput0();
185             promise.setSuccess();
186         } catch (Throwable t) {
187             promise.setFailure(t);
188         }
189     }
190 
191     private void shutdownOutput0() throws IOException {
192         socket.shutdownOutput();
193     }
194 
195     @Override
196     public ChannelFuture shutdownInput(final ChannelPromise promise) {
197         EventLoop loop = eventLoop();
198         if (loop.inEventLoop()) {
199             shutdownInput0(promise);
200         } else {
201             loop.execute(new Runnable() {
202                 @Override
203                 public void run() {
204                     shutdownInput0(promise);
205                 }
206             });
207         }
208         return promise;
209     }
210 
211     private void shutdownInput0(ChannelPromise promise) {
212         try {
213             socket.shutdownInput();
214             promise.setSuccess();
215         } catch (Throwable t) {
216             promise.setFailure(t);
217         }
218     }
219 
220     @Override
221     public ChannelFuture shutdown(final ChannelPromise promise) {
222         ChannelFuture shutdownOutputFuture = shutdownOutput();
223         if (shutdownOutputFuture.isDone()) {
224             shutdownOutputDone(shutdownOutputFuture, promise);
225         } else {
226             shutdownOutputFuture.addListener(new ChannelFutureListener() {
227                 @Override
228                 public void operationComplete(final ChannelFuture shutdownOutputFuture) throws Exception {
229                     shutdownOutputDone(shutdownOutputFuture, promise);
230                 }
231             });
232         }
233         return promise;
234     }
235 
236     private void shutdownOutputDone(final ChannelFuture shutdownOutputFuture, final ChannelPromise promise) {
237         ChannelFuture shutdownInputFuture = shutdownInput();
238         if (shutdownInputFuture.isDone()) {
239             shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
240         } else {
241             shutdownInputFuture.addListener(new ChannelFutureListener() {
242                 @Override
243                 public void operationComplete(ChannelFuture shutdownInputFuture) throws Exception {
244                     shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
245                 }
246             });
247         }
248     }
249 
250     private static void shutdownDone(ChannelFuture shutdownOutputFuture,
251                                      ChannelFuture shutdownInputFuture,
252                                      ChannelPromise promise) {
253         Throwable shutdownOutputCause = shutdownOutputFuture.cause();
254         Throwable shutdownInputCause = shutdownInputFuture.cause();
255         if (shutdownOutputCause != null) {
256             if (shutdownInputCause != null) {
257                 logger.debug("Exception suppressed because a previous exception occurred.",
258                         shutdownInputCause);
259             }
260             promise.setFailure(shutdownOutputCause);
261         } else if (shutdownInputCause != null) {
262             promise.setFailure(shutdownInputCause);
263         } else {
264             promise.setSuccess();
265         }
266     }
267 
268     @Override
269     public InetSocketAddress localAddress() {
270         return (InetSocketAddress) super.localAddress();
271     }
272 
273     @Override
274     public InetSocketAddress remoteAddress() {
275         return (InetSocketAddress) super.remoteAddress();
276     }
277 
278     @Override
279     protected SocketAddress localAddress0() {
280         return socket.getLocalSocketAddress();
281     }
282 
283     @Override
284     protected SocketAddress remoteAddress0() {
285         return socket.getRemoteSocketAddress();
286     }
287 
288     @Override
289     protected void doBind(SocketAddress localAddress) throws Exception {
290         SocketUtils.bind(socket, localAddress);
291     }
292 
293     @Override
294     protected void doConnect(SocketAddress remoteAddress,
295             SocketAddress localAddress) throws Exception {
296         if (localAddress != null) {
297             SocketUtils.bind(socket, localAddress);
298         }
299 
300         boolean success = false;
301         try {
302             SocketUtils.connect(socket, remoteAddress, config().getConnectTimeoutMillis());
303             activate(socket.getInputStream(), socket.getOutputStream());
304             success = true;
305         } catch (SocketTimeoutException e) {
306             ConnectTimeoutException cause = new ConnectTimeoutException("connection timed out: " + remoteAddress);
307             cause.setStackTrace(e.getStackTrace());
308             throw cause;
309         } finally {
310             if (!success) {
311                 doClose();
312             }
313         }
314     }
315 
316     @Override
317     protected void doDisconnect() throws Exception {
318         doClose();
319     }
320 
321     @Override
322     protected void doClose() throws Exception {
323         socket.close();
324     }
325 
326     protected boolean checkInputShutdown() {
327         if (isInputShutdown()) {
328             try {
329                 Thread.sleep(config().getSoTimeout());
330             } catch (Throwable e) {
331                 // ignore
332             }
333             return true;
334         }
335         return false;
336     }
337 
338     @Deprecated
339     @Override
340     protected void setReadPending(boolean readPending) {
341         super.setReadPending(readPending);
342     }
343 
344     final void clearReadPending0() {
345         clearReadPending();
346     }
347 }