View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.oio;
17  
18  import io.netty.channel.AbstractChannel;
19  import io.netty.channel.Channel;
20  import io.netty.channel.ChannelPromise;
21  import io.netty.channel.EventLoop;
22  import io.netty.channel.ThreadPerChannelEventLoop;
23  
24  import java.net.SocketAddress;
25  
26  /**
27   * Abstract base class for {@link Channel} implementations that use Old-Blocking-IO
28   */
29  public abstract class AbstractOioChannel extends AbstractChannel {
30  
31      protected static final int SO_TIMEOUT = 1000;
32  
33      private volatile boolean readPending;
34  
35      private final Runnable readTask = new Runnable() {
36          @Override
37          public void run() {
38              if (!isReadPending() && !config().isAutoRead()) {
39                  // ChannelConfig.setAutoRead(false) was called in the meantime so just return
40                  return;
41              }
42  
43              setReadPending(false);
44              doRead();
45          }
46      };
47  
48      /**
49       * @see AbstractChannel#AbstractChannel(Channel)
50       */
51      protected AbstractOioChannel(Channel parent) {
52          super(parent);
53      }
54  
55      @Override
56      protected AbstractUnsafe newUnsafe() {
57          return new DefaultOioUnsafe();
58      }
59  
60      private final class DefaultOioUnsafe extends AbstractUnsafe {
61          @Override
62          public void connect(
63                  final SocketAddress remoteAddress,
64                  final SocketAddress localAddress, final ChannelPromise promise) {
65              if (!promise.setUncancellable() || !ensureOpen(promise)) {
66                  return;
67              }
68  
69              try {
70                  boolean wasActive = isActive();
71                  doConnect(remoteAddress, localAddress);
72  
73                  // Get the state as trySuccess() may trigger an ChannelFutureListener that will close the Channel.
74                  // We still need to ensure we call fireChannelActive() in this case.
75                  boolean active = isActive();
76  
77                  safeSetSuccess(promise);
78                  if (!wasActive && active) {
79                      pipeline().fireChannelActive();
80                  }
81              } catch (Throwable t) {
82                  safeSetFailure(promise, annotateConnectException(t, remoteAddress));
83                  closeIfClosed();
84              }
85          }
86      }
87  
88      @Override
89      protected boolean isCompatible(EventLoop loop) {
90          return loop instanceof ThreadPerChannelEventLoop;
91      }
92  
93      /**
94       * Connect to the remote peer using the given localAddress if one is specified or {@code null} otherwise.
95       */
96      protected abstract void doConnect(
97              SocketAddress remoteAddress, SocketAddress localAddress) throws Exception;
98  
99      @Override
100     protected void doBeginRead() throws Exception {
101         if (isReadPending()) {
102             return;
103         }
104 
105         setReadPending(true);
106         eventLoop().execute(readTask);
107     }
108 
109     protected abstract void doRead();
110 
111     protected boolean isReadPending() {
112         return readPending;
113     }
114 
115     protected void setReadPending(boolean readPending) {
116         this.readPending = readPending;
117     }
118 }