65.9K
CodeProject 正在变化。 阅读更多。
Home

Sprinkler - 高级同步对象

starIconstarIconstarIconstarIconstarIcon

5.00/5 (1投票)

2013年7月30日

CPOL

2分钟阅读

viewsIcon

20152

downloadIcon

101

演示了一种线程同步和内存共享技术。

引言

Sprinkler 是一个高级同步对象,类似于信号量,但具有附加的功能。Sprinkler 允许您将数据从内部线程外部化。在高并发环境中,有很多情况下主线程需要等待一个或多个线程,并且需要在所有线程之间共享数据。例如,您可能希望将内部线程抛出的异常外部化到主线程。

背景

在过去,我们设计了一个异步执行队列(一种用于大规模执行并发任务的架构)。我们的工作方法是 TDD,所以我们从第一天起就开始考虑测试我们的框架。我们很快了解到,我们需要某种屏障,帮助我们的主线程等待(带超时)我们的框架的内部线程。即使这样还不够,因为有些情况下测试期望一个异常,但由于异常是由内部线程抛出的,无法通过简单地使用 Java throw 将其外部化到主线程。此外,有时测试需要来自内部线程的额外数据,我们希望找到一种好的方法来传输它。为了满足我们的需求,我们需要某种信号量和共享内存。因此,Sprinkler 诞生了。

使用代码

Sprinkler 基于单例设计模式。它包含两个主要方法:await 和 release。Sprinkler 支持多个信号量(上下文)。每次调用 await 和 release 时,您必须提供一个唯一的标识符(上下文)。

以下是 Sprinkler 功能的一个示例 - 数据和异常外部化

public class TestSprinkler {

    @Test
    public void testAwait_ReleaserDeliversData() {
        
        final int CONTEXT = 1;
        final String DATA = "bla bla";

        // release will occur sometime in the future
        ExecutorServiceFactory.getCachedThreadPoolExecutor().submit(new Callable<void>() {
            
            @Override
            public Void call() throws Exception {
                
                Sprinkler.getInstance().release(CONTEXT, DATA);
                
                return null;
            }
            
        });

        // waiting for the releaser thread
        String data = (String) Sprinkler.getInstance().await(CONTEXT, 10000);
        Assert.assertEquals(DATA, data);
    }

    @Test
    public void testAwait_InnerThreadExternalizeException() {
        
        final int CONTEXT = 1;
        final String EXCEPTION_MESSAGE = "test inner thread exception message";
        
        // release will occur sometime in the future - simulate exception in the releaser thread
        ExecutorServiceFactory.getCachedThreadPoolExecutor().submit(new Callable<void>() {
            
            @Override
            public Void call() throws Exception {
                
                Sprinkler.getInstance().release(CONTEXT, new RuntimeException(EXCEPTION_MESSAGE));
                
                return null;
            }
            
        });
        
        Throwable thrown = null;
        try {
            Sprinkler.getInstance().await(CONTEXT, 10000);
        } catch (Throwable t) {
            // if the releaser thread delivers exception it will be externelized to this thread
            thrown = t;
        }
        Assert.assertTrue(thrown instanceof SprinklerException);
        Assert.assertEquals(EXCEPTION_MESSAGE, thrown.getCause().getMessage());
    }

    @Test
    public void testAwait_Timeout() {
        
        final int CONTEXT = 1;
        
        ExecutorServiceFactory.getCachedThreadPoolExecutor().submit(new Callable<void>() {
            
            @Override
            public Void call() throws Exception {
                
                Thread.sleep(10000);
                
                return null;
            }
            
        });
        
        Throwable thrown = null;
        try {
            Sprinkler.getInstance().await(CONTEXT, 1);
        } catch (Throwable t) {
            thrown = t;
        }
        Assert.assertTrue(thrown.getCause() instanceof TimeoutException);
    }
}

Sprinkler 源代码

public class Sprinkler {
    
    private static Sprinkler _instance = new Sprinkler();
    
    private final ConcurrentMap<Integer, SprinklerData> _data =
            new ConcurrentHashMap<Integer, SprinklerData>();
    
    private Sprinkler() {}
    
    public static Sprinkler getInstance() {
        
        return _instance;
    }
    
    public void reset() {
        
        _data.clear();
    }
    
    /**
     * Locks the calling thread until someone will release it, or timeout will occur. 
     * 
     * @return data sent by releaser
     */
    public Object await(int key, long timeout) {
        
        SprinklerData data = null;
        try {
            data = getData(key);
            doAwait(data.getLatch(), timeout);
            externalizeException(data);
        } finally {
            _data.remove(key);
        }
        
        return data != null ? data.getInternal() : null;
    }
    
    public void release(int key) {
        
        release(key, null, null);
    }
    
    public synchronized void release(int key, Object internalData) {
        
        release(key, internalData, null);
    }
    
    public synchronized void release(int key, Throwable ex) {
        
        release(key, null, ex);
    }
    
    /**
     * Releases the lock on the waiting thread(s) for the given key, notifies them about
     * the given exception.
     */
    public synchronized void release(int key, Object internalData, Throwable ex) {
        
        SprinklerData data = getData(key);
        data.setInternal(internalData);
        data.setAlreadyReleased(true);
        if (ex != null) {
            data.setException(ex);
        }
        notify(data.getLatch());
    }
    
    private synchronized SprinklerData getData(int key) {
        
        SprinklerData data = _data.get(key);
        if (data == null) {
            data = new SprinklerData();
            _data.put(key, data);
        }
        
        return data;
    }
    
    private void externalizeException(SprinklerData data) {
        
        if (!isAlreadyReleased(data)) {
            throw new SprinklerException(new TimeoutException());
        }
        Throwable thrown = data.getException();
        if (thrown != null) {
            throw new SprinklerException(thrown);
        }
    }
    
    private void doAwait(CountDownLatch latch, long timeout) {
        
        try {
            latch.await(timeout, TimeUnit.MILLISECONDS);
        } catch (InterruptedException ex) {
            throw new SprinklerException(ex);
        }
    }
    
    private synchronized boolean isAlreadyReleased(SprinklerData data) {
        
        return data.isAlreadyReleased();
    }
    
    private void notify(CountDownLatch lock) {
        
        lock.countDown();
    }
    
    private static class SprinklerData {
        
        private final CountDownLatch _latch;
        private boolean _isAlreadyReleased = false;
        private Throwable _thrown;
        private Object _internal;
        
        public SprinklerData() {
            
            _latch = new CountDownLatch(1);
        }
        
        public Object getInternal() {
            
            return _internal;
        }
        
        public void setInternal(Object data) {
            
            _internal = data;
        }
        
        public CountDownLatch getLatch() {
            
            return _latch;
        }
        
        public boolean isAlreadyReleased() {
            
            return _isAlreadyReleased;
        }
        
        public void setAlreadyReleased(boolean isAlreadyReleased) {
            
            _isAlreadyReleased = isAlreadyReleased;
        }
        
        public Throwable getException() {
            
            return _thrown;
        }
        
        public void setException(Throwable thrown) {
            
            _thrown = thrown;
        }
        
        @Override
        public String toString() {
            
            return String.format(
                    "SprinklerData [latch.count=%s, isAlreadyReleased=%s, internal=%s, thrown.message=%s]",
                    _latch == null ? "null latch" : _latch.getCount(),
                    _isAlreadyReleased,
                    _internal,
                    _thrown == null ? "null" : _thrown.getMessage());
        }
    }
}

关注点

实现此目的的另一种方法是使用线程池。线程池为您提供了一种集中管理所有当前运行线程的方式。由于它控制着池中运行的所有线程,因此您可以使用它来对线程调用等待。

示例

public class TestThreadPoolJoiner {
    
    @Test
    public void testJoin() throws InterruptedException, ExecutionException {
        
        final int TASKS = 10;
        final AtomicInteger executedTasks = new AtomicInteger(0);
        ThreadPoolJoiner joiner = new ThreadPoolJoiner();
        
        for (int i = 0; i < TASKS; i++) {
            joiner.submit(new Callable<Integer>() {
                
                @Override
                public Integer call() {
                    
                    return executedTasks.incrementAndGet();
                }
            });
        }
        
        joiner.join();
        Assert.assertEquals(TASKS, executedTasks.get());
    }
}

public class ThreadPoolJoiner extends ThreadPoolExecutor {
    
    Collection<Future<?>> _tasks = new CopyOnWriteArrayList<Future<?>>();
    
    public ThreadPoolJoiner() {
        
        super(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
    }
    
    public void join() throws InterruptedException, ExecutionException {
        
        for (Future<?> currTask : _tasks) {
            currTask.get();
        }
    }
    
    @Override
    public <T> Future<T> submit(Callable<T> task) {
        
        Future<T> ret = super.submit(task);
        _tasks.add(ret);
        
        return ret;
    }
} 
© . All rights reserved.