View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.oio;
17  
18  import io.netty.channel.AbstractChannel;
19  import io.netty.channel.Channel;
20  import io.netty.channel.ChannelPromise;
21  import io.netty.channel.EventLoop;
22  import io.netty.channel.ThreadPerChannelEventLoop;
23  
24  import java.net.SocketAddress;
25  
26  /**
27   * Abstract base class for {@link Channel} implementations that use Old-Blocking-IO
28   */
29  public abstract class AbstractOioChannel extends AbstractChannel {
30  
31      protected static final int SO_TIMEOUT = 1000;
32  
33      boolean readPending;
34      private final Runnable readTask = new Runnable() {
35          @Override
36          public void run() {
37              doRead();
38          }
39      };
40      private final Runnable clearReadPendingRunnable = new Runnable() {
41          @Override
42          public void run() {
43              readPending = false;
44          }
45      };
46  
47      /**
48       * @see AbstractChannel#AbstractChannel(Channel)
49       */
50      protected AbstractOioChannel(Channel parent) {
51          super(parent);
52      }
53  
54      @Override
55      protected AbstractUnsafe newUnsafe() {
56          return new DefaultOioUnsafe();
57      }
58  
59      private final class DefaultOioUnsafe extends AbstractUnsafe {
60          @Override
61          public void connect(
62                  final SocketAddress remoteAddress,
63                  final SocketAddress localAddress, final ChannelPromise promise) {
64              if (!promise.setUncancellable() || !ensureOpen(promise)) {
65                  return;
66              }
67  
68              try {
69                  boolean wasActive = isActive();
70                  doConnect(remoteAddress, localAddress);
71  
72                  // Get the state as trySuccess() may trigger an ChannelFutureListener that will close the Channel.
73                  // We still need to ensure we call fireChannelActive() in this case.
74                  boolean active = isActive();
75  
76                  safeSetSuccess(promise);
77                  if (!wasActive && active) {
78                      pipeline().fireChannelActive();
79                  }
80              } catch (Throwable t) {
81                  safeSetFailure(promise, annotateConnectException(t, remoteAddress));
82                  closeIfClosed();
83              }
84          }
85      }
86  
87      @Override
88      protected boolean isCompatible(EventLoop loop) {
89          return loop instanceof ThreadPerChannelEventLoop;
90      }
91  
92      /**
93       * Connect to the remote peer using the given localAddress if one is specified or {@code null} otherwise.
94       */
95      protected abstract void doConnect(
96              SocketAddress remoteAddress, SocketAddress localAddress) throws Exception;
97  
98      @Override
99      protected void doBeginRead() throws Exception {
100         if (readPending) {
101             return;
102         }
103 
104         readPending = true;
105         eventLoop().execute(readTask);
106     }
107 
108     protected abstract void doRead();
109 
110     /**
111      * @deprecated No longer supported.
112      * No longer supported.
113      */
114     @Deprecated
115     protected boolean isReadPending() {
116         return readPending;
117     }
118 
119     /**
120      * @deprecated Use {@link #clearReadPending()} if appropriate instead.
121      * No longer supported.
122      */
123     @Deprecated
124     protected void setReadPending(final boolean readPending) {
125         if (isRegistered()) {
126             EventLoop eventLoop = eventLoop();
127             if (eventLoop.inEventLoop()) {
128                 this.readPending = readPending;
129             } else {
130                 eventLoop.execute(new Runnable() {
131                     @Override
132                     public void run() {
133                         AbstractOioChannel.this.readPending = readPending;
134                     }
135                 });
136             }
137         } else {
138             this.readPending = readPending;
139         }
140     }
141 
142     /**
143      * Set read pending to {@code false}.
144      */
145     protected final void clearReadPending() {
146         if (isRegistered()) {
147             EventLoop eventLoop = eventLoop();
148             if (eventLoop.inEventLoop()) {
149                 readPending = false;
150             } else {
151                 eventLoop.execute(clearReadPendingRunnable);
152             }
153         } else {
154             // Best effort if we are not registered yet clear readPending. This happens during channel initialization.
155             readPending = false;
156         }
157     }
158 }