Java NIO编程 - Channel

Published on 2017 - 05 - 04

Channels move blocks of data into and out of buffers to and from various I/O sources such as files, sockets, datagrams, and so forth. The channel class hierarchy is rather convoluted, with multiple interfaces and many optional operations. However, for purposes of network programming there are only three really important channel classes, SocketChannel, ServerSocketChannel, and DatagramChannel; and for the TCP connections we’ve talked about so far you only need the first two.

SocketChannel

The SocketChannel class reads from and writes to TCP sockets. The data must be encoded in ByteBuffer objects for reading and writing. Each SocketChannel is associated with a peer Socket object that can be used for advanced configuration, but this requirement can be ignored for applications where the default options are fine.

Connecting

The SocketChannel class does not have any public constructors. Instead, you create a new SocketChannel object using one of the two static open() methods:

public static SocketChannel open(SocketAddress remote) throws IOException
public static SocketChannel open() throws IOException

The first variant makes the connection. This method blocks (i.e., the method will not return until the connection is made or an exception is thrown). For example:

SocketAddress address = new InetSocketAddress("www.cafeaulait.org",  80);
SocketChannel channel = SocketChannel.open(address);

The noargs version does not immediately connect. It creates an initially unconnected socket that must be connected later using the connect() method. For example:

SocketChannel channel = SocketChannel.open();
SocketAddress address = new InetSocketAddress("www.cafeaulait.org",  80);
channel.connect(address);

You might choose this more roundabout approach in order to configure various options on the channel and/or the socket before connecting. Specifically, use this approach if you want to open the channel without blocking:

SocketChannel channel = SocketChannel.open();
SocketAddress address = new InetSocketAddress("www.cafeaulait.org",  80);
channel.configureBlocking(false);
channel.connect();

With a nonblocking channel, the connect() method returns immediately, even before the connection is established. The program can do other things while it waits for the operating system to finish the connection. However, before it can actually use the connection, the program must call finishConnect():

public abstract boolean finishConnect() throws IOException

(This is only necessary in nonblocking mode. For a blocking channel, this method returns true immediately.) If the connection is now ready for use, finishConnect() returns true. If the connection has not been established yet, finishConnect() returns false. Finally, if the connection could not be established, for instance because the network is down, this method throws an exception.

If the program wants to check whether the connection is complete, it can call these two methods:

public abstract boolean isConnected()
public abstract boolean isConnectionPending()

The isConnected() method returns true if the connection is open. The isConnectionPending() method returns true if the connection is still being set up but is not yet open.

Reading

To read from a SocketChannel, first create a ByteBuffer the channel can store data in. Then pass it to the read() method:

public abstract int read(ByteBuffer dst) throws IOException

The channel fills the buffer with as much data as it can, then returns the number of bytes it put there. When it encounters the end of stream, the channel fills the buffer with any remaining bytes and then returns –1 on the next call to read(). If the channel is blocking, this method will read at least one byte or return –1 or throw an exception. If the channel is nonblocking, however, this method may return 0.

Because the data is stored into the buffer at the current position, which is updated automatically as more data is added, you can keep passing the same buffer to the read() method until the buffer is filled. For example, this loop will read until the buffer is filled or the end of stream is detected:

while (buffer.hasRemaining() && channel.read(buffer) != -1) ;

It is sometimes useful to be able to fill several buffers from one source. This is called a scatter. These two methods accept an array of ByteBuffer objects as arguments and fill each one in turn:

public final long read(ByteBuffer[] dsts) throws IOException
public final long read(ByteBuffer[] dsts, int offset, int length)
    throws IOException

The first variant fills all the buffers. The second method fills length buffers, starting with the one at offset.

To fill an array of buffers, just loop while the last buffer in the list has space remaining. For example:

ByteBuffer[] buffers = new ByteBuffer[2];
buffers[0] = ByteBuffer.allocate(1000);
buffers[1] = ByteBuffer.allocate(1000);
while (buffers[1].hasRemaining() && channel.read(buffers) != -1) ;

Writing

Socket channels have both read and write methods. In general, they are full duplex. In order to write, simply fill a ByteBuffer, flip it, and pass it to one of the write methods, which drains it while copying the data onto the output—pretty much the reverse of the reading process.

The basic write() method takes a single buffer as an argument:

public abstract int write(ByteBuffer src) throws IOException

As with reads (and unlike OutputStreams), this method is not guaranteed to write the complete contents of the buffer if the channel is nonblocking. Again, however, the cursor-based nature of buffers enables you to easily call this method again and again until the buffer is fully drained and the data has been completely written:

while (buffer.hasRemaining() && channel.write(buffer) != -1) ;

It is often useful to be able to write data from several buffers onto one socket. This is called a gather. For example, you might want to store the HTTP header in one buffer and the HTTP body in another buffer. The implementation might even fill the two buffers simultaneously using two threads or overlapped I/O. These two methods accept an array of ByteBuffer objects as arguments, and drain each one in turn:

public final long write(ByteBuffer[] dsts) throws IOException
public final long write(ByteBuffer[] dsts, int offset, int length)
    throws IOException

The first variant drains all the buffers. The second method drains length buffers, starting with the one at offset.

Closing

Just as with regular sockets, you should close a channel when you’re done with it to free up the port and any other resources it may be using:

public void close() throws IOException

Closing an already closed channel has no effect. Attempting to write data to or read data from a closed channel throws an exception. If you’re uncertain whether a channel has been closed, check with isOpen():

public boolean isOpen()

Naturally, this returns false if the channel is closed, true if it’s open (close() and isOpen() are the only two methods declared in the Channel interface and shared by all channel classes).

Starting in Java 7, SocketChannel implements AutoCloseable, so you can use it in try-with-resources.

ServerSocketChannel

The ServerSocketChannel class has one purpose: to accept incoming connections. You cannot read from, write to, or connect a ServerSocketChannel. The only operation it supports is accepting a new incoming connection. The class itself only declares four methods, of which accept() is the most important. ServerSocketChannel also inherits several methods from its superclasses, mostly related to registering with a Selector for notification of incoming connections. And finally, like all channels, it has a close() method that shuts down the server socket.

Creating server socket channels

The static factory method ServerSocketChannel.open() creates a new ServerSocketChannel object. However, the name is a little deceptive. This method does not actually open a new server socket. Instead, it just creates the object. Before you can use it, you need to call the socket() method to get the corresponding peer ServerSocket. At this point, you can configure any server options you like, such as the receive buffer size or the socket timeout, using the various setter methods in ServerSocket. Then connect this ServerSocket to a SocketAddress for the port you want to bind to. For example, this code fragment opens a ServerSocketChannel on port 80:

try {
  ServerSocketChannel server = ServerSocketChannel.open();
  ServerSocket socket = serverChannel.socket();
  SocketAddress address = new InetSocketAddress(80);
  socket.bind(address);
} catch (IOException ex) {
  System.err.println("Could not bind to port 80 because " + ex.getMessage());
}

In Java 7, this gets a little simpler because ServerSocketChannel now has a bind() method of its own:

try {
  ServerSocketChannel server = ServerSocketChannel.open();
  SocketAddress address = new InetSocketAddress(80);
  server.bind(address);
} catch (IOException ex) {
  System.err.println("Could not bind to port 80 because " + ex.getMessage());
}

A factory method is used here rather than a constructor so that different virtual machines can provide different implementations of this class, more closely tuned to the local hardware and OS. However, this factory is not user configurable. The open() method always returns an instance of the same class when running in the same virtual machine.

Accepting connections

Once you’ve opened and bound a ServerSocketChannel object, the accept() method can listen for incoming connections:

public abstract SocketChannel accept() throws IOException

accept() can operate in either blocking or nonblocking mode. In blocking mode, the accept() method waits for an incoming connection. It then accepts that connection and returns a SocketChannel object connected to the remote client. The thread cannot do anything until a connection is made. This strategy might be appropriate for simple servers that can respond to each request immediately. Blocking mode is the default.

A ServerSocketChannel can also operate in nonblocking mode. In this case, the accept() method returns null if there are no incoming connections. Nonblocking mode is more appropriate for servers that need to do a lot of work for each connection and thus may want to process multiple requests in parallel. Nonblocking mode is normally used in conjunction with a Selector. To make a ServerSocketChannel nonblocking, pass false to its configureBlocking() method.

The accept() method is declared to throw an IOException if anything goes wrong. There are several subclasses of IOException that indicate more detailed problems, as well as a couple of runtime exceptions:

ClosedChannelException

You cannot reopen a ServerSocketChannel after closing it.

AsynchronousCloseException

Another thread closed this ServerSocketChannel while accept() was executing.

ClosedByInterruptException

Another thread interrupted this thread while a blocking ServerSocketChannel was waiting.

NotYetBoundException

You called open() but did not bind the ServerSocketChannel’s peer ServerSocket to an address before calling accept(). This is a runtime exception, not an IOException.

SecurityException

The security manager refused to allow this application to bind to the requested port.

The Channels Class

Channels is a simple utility class for wrapping channels around traditional I/O-based streams, readers, and writers, and vice versa. It’s useful when you want to use the new I/O model in one part of a program for performance, but still interoperate with legacy APIs that expect streams. It has methods that convert from streams to channels and methods that convert from channels to streams, readers, and writers:

public static InputStream newInputStream(ReadableByteChannel ch)
public static OutputStream newOutputStream(WritableByteChannel ch)
public static ReadableByteChannel newChannel(InputStream in)
public static WritableByteChannel newChannel(OutputStream out)
public static Reader newReader (ReadableByteChannel channel,
    CharsetDecoder decoder, int minimumBufferCapacity)
public static Reader newReader (ReadableByteChannel ch, String encoding)
public static Writer newWriter (WritableByteChannel ch, String encoding)

ServerSocketChannel implements neither of these because you can’t read from or write to it.

Asynchronous Channels (Java 7)

Java 7 introduces the AsynchronousSocketChannel and AsynchronousServerSocketChannel classes. These behave like and have almost the same interface as SocketChannel and ServerSocketChannel (though they are not subclasses of those classes). However, unlike SocketChannel and ServerSocketChannel, reads from and writes to asynchronous channels return immediately, even before the I/O is complete. The data read or written is further processed by a Future or a CompletionHandler. The connect() and accept() methods also execute asynchronously and return Futures. Selectors are not used.

For example, suppose a program needs to perform a lot of initialization at startup. Some of that involves network connections that are going to take several seconds each. You can start several asynchronous operations in parallel, then perform your local initializations, and then request the results of the network operations:

SocketAddress address = new InetSocketAddress(args[0], port);
AsynchronousSocketChannel client = AsynchronousSocketChannel.open();
Future<Void> connected = client.connect(address);

ByteBuffer buffer = ByteBuffer.allocate(74);

// wait for the connection to finish
connected.get();

// read from the connection
Future<Integer> future = client.read(buffer);

// do other things...

// wait for the read to finish...
future.get();

// flip and drain the buffer
buffer.flip();
WritableByteChannel out = Channels.newChannel(System.out);
out.write(buffer);

The advantage of this approach is that the network connections run in parallel while the program does other things. When you’re ready to process the data from the network, but not before, you stop and wait for it by calling Future.get(). You could achieve the same effect with thread pools and callables, but this is perhaps a little simpler, especially if buffers are a natural fit for your application.

This approach fits the situation where you want to get results back in a very particular order. However, if you don’t care about order, if you can process each network read independently of the others, then you may be better off using a CompletionHandler instead. For example, imagine you’re writing a search engine web spider that feeds pages into some backend. Because you don’t care about the order of the responses returned, you can spawn a large number of AsynchronousSocketChannel requests and give each one a CompletionHandler that stores the results in the backend.

The generic CompletionHandler interface declares two methods: completed(), which is invoked if the read finishes successfully; and failed(), which is invoked on an I/O error. For example, here’s a simple CompletionHandler that prints whatever it received on System.out:

class LineHandler implements CompletionHandler<Integer, ByteBuffer> {

  @Override
  public void completed(Integer result, ByteBuffer buffer) {
    buffer.flip();
    WritableByteChannel out = Channels.newChannel(System.out);
    try {
      out.write(buffer);
    } catch (IOException ex) {
      System.err.println(ex);
    }
  }

  @Override
  public void failed(Throwable ex, ByteBuffer attachment) {
    System.err.println(ex.getMessage());
  }
}

When you read from the channel you pass a buffer, an attachment, and a CompletionHandler to the read() method:

ByteBuffer buffer = ByteBuffer.allocate(74);
CompletionHandler<Integer, ByteBuffer> handler = new LineHandler();
channel.read(buffer, buffer, handler);

Here I’ve made the attachment the buffer itself. This is one way to push the data read from the network into the CompletionHandler where it can handle it. Another common pattern is to make the CompletionHandler an anonymous inner class and the buffer a final local variable so it’s in scope inside the completion handler.

Although you can safely share an AsynchronousSocketChannel or AsynchronousServerSocketChannel between multiple threads, no more than one thread can read from this channel at a time and no more than one thread can write to the channel at a time. (One thread can read and another thread can write simultaneously, though.) If a thread attempts to read while another thread has a pending read, the read() method throws a ReadPendingException. Similarly, if a thread attempts to write while another thread has a pending write, the write() method throws a WritePendingException.

Socket Options (Java 7)

Beginning in Java 7, SocketChannel, ServerSocketChannel, AsynchronousServerSocketChannel, AsynchronousSocketChannel, and DatagramChannel all implement the new NetworkChannel interface. The primary purpose of this interface is to support the various TCP options such as TCP_NODELAY, SO_TIMEOUT, SO_LINGER, SO_SNDBUF, SO_RCVBUF, and SO_KEEPALIVE. The options have the same meaning in the underlying TCP stack whether set on a socket or a channel. However, the interface to these options is a little different. Rather than individual methods for each supported option, the channel classes each have just three methods to get, set, and list the supported options:

<T> T getOption(SocketOption<T> name) throws IOException
<T> NetworkChannel setOption(SocketOption<T> name, T value) throws IOException
Set<SocketOption<?>> supportedOptions()

The SocketOption class is a generic class specifying the name and type of each option. The type parameter determines whether the option is a boolean, Integer, or NetworkInterface. The StandardSocketOptions class provides constants for each of the 11 options Java recognizes:

  • SocketOption StandardSocketOptions.IP_MULTICAST_IF
  • SocketOption StandardSocketOptions.IP_MULTICAST_LOOP
  • SocketOption StandardSocketOptions.IP_MULTICAST_TTL
  • SocketOption StandardSocketOptions.IP_TOS
  • SocketOption StandardSocketOptions.SO_BROADCAST
  • SocketOption StandardSocketOptions.SO_KEEPALIVE
  • SocketOption StandardSocketOptions.SO_LINGER
  • SocketOption StandardSocketOptions.SO_RCVBUF
  • SocketOption StandardSocketOptions.SO_REUSEADDR
  • SocketOption StandardSocketOptions.SO_SNDBUF
  • SocketOption StandardSocketOptions.TCP_NODELAY

For example, this code fragment opens a client network channel and sets SO_LINGER to 240 seconds:

NetworkChannel channel = SocketChannel.open();
channel.setOption(StandardSocketOptions.SO_LINGER, 240);

Different channels and sockets support different options. For instance, ServerSocketChannel supports SO_REUSEADDR and SO_RCVBUF but not SO_SNDBUF. Trying to set an option the channel doesn’t support throws an UnsupportedOperationException.

Example 7 is a simple program to list all supported socket options for the different types of network channels.

import java.io.*;
import java.net.*;
import java.nio.channels.*;

public class OptionSupport {

  public static void main(String[] args) throws IOException {
    printOptions(SocketChannel.open());
    printOptions(ServerSocketChannel.open());
    printOptions(AsynchronousSocketChannel.open());
    printOptions(AsynchronousServerSocketChannel.open());
    printOptions(DatagramChannel.open());
  }

  private static void printOptions(NetworkChannel channel) throws IOException {
    System.out.println(channel.getClass().getSimpleName() + " supports:");
    for (SocketOption<?> option : channel.supportedOptions()) {
      System.out.println(option.name() + ": " + channel.getOption(option));
    }
    System.out.println();
    channel.close();
  }

}

Here’s the output showing which options are supported by which types of channels, and what the default values are:

SocketChannelImpl supports:
SO_OOBINLINE: false
SO_REUSEADDR: false
SO_LINGER: -1
SO_KEEPALIVE: false
IP_TOS: 0
SO_SNDBUF: 131072
SO_RCVBUF: 131072
TCP_NODELAY: false

ServerSocketChannelImpl supports:
SO_REUSEADDR: true
SO_RCVBUF: 131072

UnixAsynchronousSocketChannelImpl supports:
SO_KEEPALIVE: false
SO_REUSEADDR: false
SO_SNDBUF: 131072
TCP_NODELAY: false
SO_RCVBUF: 131072

UnixAsynchronousServerSocketChannelImpl supports:
SO_REUSEADDR: true
SO_RCVBUF: 131072

DatagramChannelImpl supports:
IP_MULTICAST_TTL: 1
SO_BROADCAST: false
SO_REUSEADDR: false
IP_MULTICAST_IF: null
IP_TOS: 0
IP_MULTICAST_LOOP: true
SO_SNDBUF: 9216
SO_RCVBUF: 196724

Readiness Selection

For network programming, the second part of the new I/O APIs is readiness selection, the ability to choose a socket that will not block when read or written. This is primarily of interest to servers, although clients running multiple simultaneous connections with several windows open—such as a web spider or a browser—can take advantage of it as well.

In order to perform readiness selection, different channels are registered with a Selector object. Each channel is assigned a SelectionKey. The program can then ask the Selector object for the set of keys to the channels that are ready to perform the operation you want to perform without blocking.

The Selector Class

The only constructor in Selector is protected. Normally, a new selector is created by invoking the static factory method Selector.open():

public static Selector open() throws IOException

The next step is to add channels to the selector. There are no methods in the Selector class to add a channel. The register() method is declared in the SelectableChannel class. Not all channels are selectable—in particular, FileChannels aren’t selectable—but all network channels are. Thus, the channel is registered with a selector by passing the selector to one of the channel’s register methods:

public final SelectionKey register(Selector sel, int ops)
     throws ClosedChannelException
public final SelectionKey register(Selector sel, int ops, Object att)
     throws ClosedChannelException

This approach feels backward to me, but it’s not hard to use. The first argument is the selector the channel is registering with. The second argument is a named constant from the SelectionKey class identifying the operation the channel is registering for. The SelectionKey class defines four named bit constants used to select the type of the operation:

  • SelectionKey.OP_ACCEPT
  • SelectionKey.OP_CONNECT
  • SelectionKey.OP_READ
  • SelectionKey.OP_WRITE

These are bit-flag int constants (1, 2, 4, etc.). Therefore, if a channel needs to register for multiple operations in the same selector (e.g., for both reading and writing on a socket), combine the constants with the bitwise or operator (|) when registering:

channel.register(selector,  SelectionKey.OP_READ | SelectionKey.OP_WRITE);

The optional third argument is an attachment for the key. This object is often used to store state for the connection. For example, if you were implementing a web server, you might attach a FileInputStream or FileChannel connected to the local file the server streams to the client.

After the different channels have been registered with the selector, you can query the selector at any time to find out which channels are ready to be processed. Channels may be ready for some operations and not others. For instance, a channel could be ready for reading but not writing.

There are three methods that select the ready channels. They differ in how long they wait to find a ready channel. The first, selectNow(), performs a nonblocking select. It returns immediately if no connections are ready to be processed now:

public abstract int selectNow() throws IOException

The other two select methods are blocking:

public abstract int select() throws IOException
public abstract int select(long timeout) throws IOException

The first method waits until at least one registered channel is ready to be processed before returning. The second waits no longer than timeout milliseconds for a channel to be ready before returning 0. These methods are useful if your program doesn’t have anything to do when no channels are ready to be processed.

When you know the channels are ready to be processed, retrieve the ready channels using selectedKeys():

public abstract Set<SelectionKey> selectedKeys()

You iterate through the returned set, processing each SelectionKey in turn. You’ll also want to remove the key from the iterator to tell the selector that you’ve handled it. Otherwise, the selector will keep telling you about it on future passes through the loop.

Finally, when you’re ready to shut down the server or when you no longer need the selector, you should close it:

public abstract void close() throws IOException

This step releases any resources associated with the selector. More importantly, it cancels all keys registered with the selector and interrupts up any threads blocked by one of this selector’s select methods.

The SelectionKey Class

SelectionKey objects serve as pointers to channels. They can also hold an object attachment, which is how you normally store the state for the connection on that channel.

SelectionKey objects are returned by the register() method when registering a channel with a selector. However, you don’t usually need to retain this reference. The selectedKeys() method returns the same objects again inside a Set. A single channel can be registered with multiple selectors.

When retrieving a SelectionKey from the set of selected keys, you often first test what that key is ready to do. There are four possibilities:

public final boolean isAcceptable()
public final boolean isConnectable()
public final boolean isReadable()
public final boolean isWritable()

This test isn’t always necessary. In some cases, the selector is only testing for one possibility and will only return keys to do that one thing. But if the selector does test for multiple readiness states, you’ll want to test which one kicked the channel into the ready state before operating on it. It’s also possible that a channel is ready to do more than one thing.

Once you know what the channel associated with the key is ready to do, retrieve the channel with the channel() method:

public abstract SelectableChannel channel()

If you’ve stored an object in the SelectionKey to hold state information, you can retrieve it with the attachment() method:

public final Object attachment()

Finally, when you’re finished with a connection, deregister its SelectionKey object so the selector doesn’t waste any resources querying it for readiness. I don’t know that this is absolutely essential in all cases, but it doesn’t hurt. You do this by invoking the key’s cancel() method:

public abstract void cancel()

However, this step is only necessary if you haven’t closed the channel. Closing a channel automatically deregisters all keys for that channel in all selectors. Similarly, closing a selector invalidates all keys in that selector.

Reference