65.9K
CodeProject 正在变化。 阅读更多。
Home

异步SocketChannel 并发写入

emptyStarIconemptyStarIconemptyStarIconemptyStarIconemptyStarIcon

0/5 (0投票)

2014年4月28日

CPOL
viewsIcon

10741

异步SocketChannel 并发写入

引言

Java 7 异步套接字 API 简单易用,但看起来并非一个完整的解决方案。主要问题在于完成处理程序应该如何实现。另一个问题是,在上次读/写操作完成之前,无法启动新的读/写操作,而并发管理会更好。

Using the Code

这里提供了一个类,解决了上述关于 write 操作的问题,并且可以并发使用。它包含一个队列,用于所有待处理的 write 请求。我们不能使用任何 concurrent.* 容器,因为它们都不具备此处所需的重要功能。但仍然可以实现无锁实现。如果大家感兴趣,请告诉我。:)

    public static class AsynchronousSocketChannelWrapper
    {
        private final AsynchronousSocketChannel asynchronousSocketChannel;
        private final Handler handler;
        private final ReentrantLock lock;
        private final Queue<ByteBuffer> queue;
        private final ByteBuffer [] iov;
        private boolean closed;

        private class Handler implements CompletionHandler<Long, Integer>
        {
            public void completed( Long result, Integer attachment )
            {
                /* Called when the write operation completed. */
                int iovc = 0;
                lock.lock();
                try {
                    /* Remove all sent buffers from the queue. */
                    int idx = 0;
                    for (;;) {
                        final ByteBuffer byteBuffer = queue.peek();
                        assert( byteBuffer == iov[idx] );
                        if (byteBuffer.remaining() > 0) {
                            /* Nobody knows will it happen or not,
                             * let's assume will not.
                             */
                            assert( false );
                        }

                        iov[idx] = null;
                        queue.poll();

                        if ((++idx == iov.length) || (iov[idx] == null)) {
                            break;
                        }
                    }

                    if (queue.isEmpty())
                        return;

                    /* Queue is not empty, let's schedule new write requests.
                     * Would be stupid to schedule them one by one if more than one,
                     * let's join them though by 16.
                     */
                    final Iterator<ByteBuffer> it = queue.iterator();
                    do {
                        iov[iovc] = it.next();
                        if (++iovc == iov.length)
                            break;
                    }
                    while (it.hasNext());
                }
                finally {
                    lock.unlock();
                }

                assert( iovc > 0 );
                asynchronousSocketChannel.write(
                        iov, 0, iovc, 0, TimeUnit.SECONDS, null, this );
            }

            public void failed( Throwable exc, Integer attachment )
            {
                /* Called when the write operation failed,
                 * most probably the underlying socket is being closed.
                 */
                lock.lock();
                try {
                    closed = true;
                    queue.clear();
                }
                finally {
                    lock.unlock();
                }
            }
        }

        public AsynchronousSocketChannelWrapper(
                AsynchronousSocketChannel asynchronousSocketChannel )
        {
            this.asynchronousSocketChannel = asynchronousSocketChannel;
            this.handler = new Handler();
            this.lock = new ReentrantLock();
            this.queue = new LinkedList<ByteBuffer>();
            this.iov = new ByteBuffer[16];
        }

        public boolean write( ByteBuffer byteBuffer )
        {
            lock.lock();
            try {
                if (closed)
                    return false;

                final boolean wasEmpty = queue.isEmpty();
                queue.add( byteBuffer );

                if (!wasEmpty)
                    return true;
            }
            finally {
                lock.unlock();
            }

            iov[0] = byteBuffer;
            asynchronousSocketChannel.write(
                    iov, 0, 1, 0, TimeUnit.SECONDS, null, handler );

            return true;
        }
    } 

如何使用?

只需将 AsynchronousSocketChannel 对象包装到该类的实例中,并使用其 write() 函数进行 write 操作。write() 函数如果操作已排队进行写入,则返回 true ;如果底层套接字通道已关闭,则返回 false

© . All rights reserved.