View Javadoc
1   /*
2    * Copyright 2013 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    * http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.oio;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.channel.Channel;
20  import io.netty.channel.FileRegion;
21  import io.netty.channel.RecvByteBufAllocator;
22  
23  import java.io.EOFException;
24  import java.io.IOException;
25  import java.io.InputStream;
26  import java.io.OutputStream;
27  import java.nio.channels.Channels;
28  import java.nio.channels.ClosedChannelException;
29  import java.nio.channels.NotYetConnectedException;
30  import java.nio.channels.WritableByteChannel;
31  
32  /**
33   * Abstract base class for OIO Channels that are based on streams.
34   */
35  public abstract class OioByteStreamChannel extends AbstractOioByteChannel {
36  
37      private static final InputStream CLOSED_IN = new InputStream() {
38          @Override
39          public int read() {
40              return -1;
41          }
42      };
43  
44      private static final OutputStream CLOSED_OUT = new OutputStream() {
45          @Override
46          public void write(int b) throws IOException {
47              throw new ClosedChannelException();
48          }
49      };
50  
51      private InputStream is;
52      private OutputStream os;
53      private WritableByteChannel outChannel;
54  
55      /**
56       * Create a new instance
57       *
58       * @param parent    the parent {@link Channel} which was used to create this instance. This can be null if the
59       *                  {@link} has no parent as it was created by your self.
60       */
61      protected OioByteStreamChannel(Channel parent) {
62          super(parent);
63      }
64  
65      /**
66       * Activate this instance. After this call {@link #isActive()} will return {@code true}.
67       */
68      protected final void activate(InputStream is, OutputStream os) {
69          if (this.is != null) {
70              throw new IllegalStateException("input was set already");
71          }
72          if (this.os != null) {
73              throw new IllegalStateException("output was set already");
74          }
75          if (is == null) {
76              throw new NullPointerException("is");
77          }
78          if (os == null) {
79              throw new NullPointerException("os");
80          }
81          this.is = is;
82          this.os = os;
83      }
84  
85      @Override
86      public boolean isActive() {
87          InputStream is = this.is;
88          if (is == null || is == CLOSED_IN) {
89              return false;
90          }
91  
92          OutputStream os = this.os;
93          return !(os == null || os == CLOSED_OUT);
94      }
95  
96      @Override
97      protected int available() {
98          try {
99              return is.available();
100         } catch (IOException ignored) {
101             return 0;
102         }
103     }
104 
105     @Override
106     protected int doReadBytes(ByteBuf buf) throws Exception {
107         final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
108         allocHandle.attemptedBytesRead(Math.max(1, Math.min(available(), buf.maxWritableBytes())));
109         return buf.writeBytes(is, allocHandle.attemptedBytesRead());
110     }
111 
112     @Override
113     protected void doWriteBytes(ByteBuf buf) throws Exception {
114         OutputStream os = this.os;
115         if (os == null) {
116             throw new NotYetConnectedException();
117         }
118         buf.readBytes(os, buf.readableBytes());
119     }
120 
121     @Override
122     protected void doWriteFileRegion(FileRegion region) throws Exception {
123         OutputStream os = this.os;
124         if (os == null) {
125             throw new NotYetConnectedException();
126         }
127         if (outChannel == null) {
128             outChannel = Channels.newChannel(os);
129         }
130 
131         long written = 0;
132         for (;;) {
133             long localWritten = region.transferTo(outChannel, written);
134             if (localWritten == -1) {
135                 checkEOF(region);
136                 return;
137             }
138             written += localWritten;
139 
140             if (written >= region.count()) {
141                 return;
142             }
143         }
144     }
145 
146     private static void checkEOF(FileRegion region) throws IOException {
147         if (region.transferred() < region.count()) {
148             throw new EOFException("Expected to be able to write " + region.count() + " bytes, " +
149                                    "but only wrote " + region.transferred());
150         }
151     }
152 
153     @Override
154     protected void doClose() throws Exception {
155         InputStream is = this.is;
156         OutputStream os = this.os;
157         this.is = CLOSED_IN;
158         this.os = CLOSED_OUT;
159 
160         try {
161             if (is != null) {
162                 is.close();
163             }
164         } finally {
165             if (os != null) {
166                 os.close();
167             }
168         }
169     }
170 }