View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.channel.socket.oio;
17  
18  import static org.jboss.netty.channel.Channels.*;
19  
20  import java.io.IOException;
21  import java.io.OutputStream;
22  import java.io.PushbackInputStream;
23  import java.net.SocketException;
24  import java.nio.channels.Channels;
25  import java.nio.channels.ClosedChannelException;
26  import java.nio.channels.WritableByteChannel;
27  import java.util.regex.Pattern;
28  
29  import org.jboss.netty.buffer.ChannelBuffer;
30  import org.jboss.netty.channel.ChannelFuture;
31  import org.jboss.netty.channel.DefaultFileRegion;
32  import org.jboss.netty.channel.FileRegion;
33  
34  class OioWorker extends AbstractOioWorker<OioSocketChannel> {
35  
36      private static final Pattern SOCKET_CLOSED_MESSAGE = Pattern.compile(
37              "^.*(?:Socket.*closed).*$", Pattern.CASE_INSENSITIVE);
38  
39      OioWorker(OioSocketChannel channel) {
40          super(channel);
41      }
42  
43      @Override
44      public void run() {
45          boolean fireConnected = channel instanceof OioAcceptedSocketChannel;
46          if (fireConnected && channel.isOpen()) {
47               // Fire the channelConnected event for OioAcceptedSocketChannel.
48              // See #287
49              fireChannelConnected(channel, channel.getRemoteAddress());
50  
51          }
52          super.run();
53      }
54  
55      @Override
56      boolean process() throws IOException {
57          byte[] buf;
58          int readBytes;
59          PushbackInputStream in = channel.getInputStream();
60          int bytesToRead = in.available();
61          if (bytesToRead > 0) {
62              buf = new byte[bytesToRead];
63              readBytes = in.read(buf);
64          } else {
65              int b = in.read();
66              if (b < 0) {
67                  return false;
68              }
69              in.unread(b);
70              return true;
71          }
72          fireMessageReceived(channel, channel.getConfig().getBufferFactory().getBuffer(buf, 0, readBytes));
73  
74          return true;
75      }
76  
77      static void write(
78              OioSocketChannel channel, ChannelFuture future,
79              Object message) {
80  
81          boolean iothread = isIoThread(channel);
82          OutputStream out = channel.getOutputStream();
83          if (out == null) {
84              Exception e = new ClosedChannelException();
85              future.setFailure(e);
86              if (iothread) {
87                  fireExceptionCaught(channel, e);
88              } else {
89                  fireExceptionCaughtLater(channel, e);
90              }
91              return;
92          }
93  
94          try {
95              int length = 0;
96  
97              // Add support to write a FileRegion. This in fact will not give any performance gain
98              // but at least it not fail and we did the best to emulate it
99              if (message instanceof FileRegion) {
100                 FileRegion fr = (FileRegion) message;
101                 try {
102                     synchronized (out) {
103                         WritableByteChannel  bchannel = Channels.newChannel(out);
104 
105                         long i = 0;
106                         while ((i = fr.transferTo(bchannel, length)) > 0) {
107                             length += i;
108                             if (length >= fr.getCount()) {
109                                 break;
110                             }
111                         }
112                     }
113                 } finally {
114                     if (fr instanceof DefaultFileRegion) {
115                         DefaultFileRegion dfr = (DefaultFileRegion) fr;
116                         if (dfr.releaseAfterTransfer()) {
117                             fr.releaseExternalResources();
118                         }
119                     }
120 
121                 }
122             } else {
123                 ChannelBuffer a = (ChannelBuffer) message;
124                 length = a.readableBytes();
125                 synchronized (out) {
126                     a.getBytes(a.readerIndex(), out, length);
127                 }
128             }
129 
130             future.setSuccess();
131             if (iothread) {
132                 fireWriteComplete(channel, length);
133             } else {
134                 fireWriteCompleteLater(channel, length);
135             }
136 
137         } catch (Throwable t) {
138             // Convert 'SocketException: Socket closed' to
139             // ClosedChannelException.
140             if (t instanceof SocketException &&
141                     SOCKET_CLOSED_MESSAGE.matcher(
142                             String.valueOf(t.getMessage())).matches()) {
143                 t = new ClosedChannelException();
144             }
145             future.setFailure(t);
146             if (iothread) {
147                 fireExceptionCaught(channel, t);
148             } else {
149                 fireExceptionCaughtLater(channel, t);
150             }
151         }
152     }
153 
154 
155 }