异步SocketChannel 并发写入





0/5 (0投票)
异步SocketChannel 并发写入
引言
Java 7 异步套接字 API 简单易用,但看起来并非一个完整的解决方案。主要问题在于完成处理程序应该如何实现。另一个问题是,在上次读/写操作完成之前,无法启动新的读/写操作,而并发管理会更好。
Using the Code
这里提供了一个类,解决了上述关于 write
操作的问题,并且可以并发使用。它包含一个队列,用于所有待处理的 write
请求。我们不能使用任何 concurrent.* 容器,因为它们都不具备此处所需的重要功能。但仍然可以实现无锁实现。如果大家感兴趣,请告诉我。:)
public static class AsynchronousSocketChannelWrapper
{
private final AsynchronousSocketChannel asynchronousSocketChannel;
private final Handler handler;
private final ReentrantLock lock;
private final Queue<ByteBuffer> queue;
private final ByteBuffer [] iov;
private boolean closed;
private class Handler implements CompletionHandler<Long, Integer>
{
public void completed( Long result, Integer attachment )
{
/* Called when the write operation completed. */
int iovc = 0;
lock.lock();
try {
/* Remove all sent buffers from the queue. */
int idx = 0;
for (;;) {
final ByteBuffer byteBuffer = queue.peek();
assert( byteBuffer == iov[idx] );
if (byteBuffer.remaining() > 0) {
/* Nobody knows will it happen or not,
* let's assume will not.
*/
assert( false );
}
iov[idx] = null;
queue.poll();
if ((++idx == iov.length) || (iov[idx] == null)) {
break;
}
}
if (queue.isEmpty())
return;
/* Queue is not empty, let's schedule new write requests.
* Would be stupid to schedule them one by one if more than one,
* let's join them though by 16.
*/
final Iterator<ByteBuffer> it = queue.iterator();
do {
iov[iovc] = it.next();
if (++iovc == iov.length)
break;
}
while (it.hasNext());
}
finally {
lock.unlock();
}
assert( iovc > 0 );
asynchronousSocketChannel.write(
iov, 0, iovc, 0, TimeUnit.SECONDS, null, this );
}
public void failed( Throwable exc, Integer attachment )
{
/* Called when the write operation failed,
* most probably the underlying socket is being closed.
*/
lock.lock();
try {
closed = true;
queue.clear();
}
finally {
lock.unlock();
}
}
}
public AsynchronousSocketChannelWrapper(
AsynchronousSocketChannel asynchronousSocketChannel )
{
this.asynchronousSocketChannel = asynchronousSocketChannel;
this.handler = new Handler();
this.lock = new ReentrantLock();
this.queue = new LinkedList<ByteBuffer>();
this.iov = new ByteBuffer[16];
}
public boolean write( ByteBuffer byteBuffer )
{
lock.lock();
try {
if (closed)
return false;
final boolean wasEmpty = queue.isEmpty();
queue.add( byteBuffer );
if (!wasEmpty)
return true;
}
finally {
lock.unlock();
}
iov[0] = byteBuffer;
asynchronousSocketChannel.write(
iov, 0, 1, 0, TimeUnit.SECONDS, null, handler );
return true;
}
}
如何使用?
只需将 AsynchronousSocketChannel
对象包装到该类的实例中,并使用其 write()
函数进行 write
操作。write()
函数如果操作已排队进行写入,则返回 true
;如果底层套接字通道已关闭,则返回 false
。