Sprinkler - 高级同步对象





5.00/5 (1投票)
演示了一种线程同步和内存共享技术。
引言
Sprinkler 是一个高级同步对象,类似于信号量,但具有附加的功能。Sprinkler 允许您将数据从内部线程外部化。在高并发环境中,有很多情况下主线程需要等待一个或多个线程,并且需要在所有线程之间共享数据。例如,您可能希望将内部线程抛出的异常外部化到主线程。
背景
在过去,我们设计了一个异步执行队列(一种用于大规模执行并发任务的架构)。我们的工作方法是 TDD,所以我们从第一天起就开始考虑测试我们的框架。我们很快了解到,我们需要某种屏障,帮助我们的主线程等待(带超时)我们的框架的内部线程。即使这样还不够,因为有些情况下测试期望一个异常,但由于异常是由内部线程抛出的,无法通过简单地使用 Java throw 将其外部化到主线程。此外,有时测试需要来自内部线程的额外数据,我们希望找到一种好的方法来传输它。为了满足我们的需求,我们需要某种信号量和共享内存。因此,Sprinkler 诞生了。
使用代码
Sprinkler 基于单例设计模式。它包含两个主要方法:await 和 release。Sprinkler 支持多个信号量(上下文)。每次调用 await 和 release 时,您必须提供一个唯一的标识符(上下文)。
以下是 Sprinkler 功能的一个示例 - 数据和异常外部化
public class TestSprinkler {
@Test
public void testAwait_ReleaserDeliversData() {
final int CONTEXT = 1;
final String DATA = "bla bla";
// release will occur sometime in the future
ExecutorServiceFactory.getCachedThreadPoolExecutor().submit(new Callable<void>() {
@Override
public Void call() throws Exception {
Sprinkler.getInstance().release(CONTEXT, DATA);
return null;
}
});
// waiting for the releaser thread
String data = (String) Sprinkler.getInstance().await(CONTEXT, 10000);
Assert.assertEquals(DATA, data);
}
@Test
public void testAwait_InnerThreadExternalizeException() {
final int CONTEXT = 1;
final String EXCEPTION_MESSAGE = "test inner thread exception message";
// release will occur sometime in the future - simulate exception in the releaser thread
ExecutorServiceFactory.getCachedThreadPoolExecutor().submit(new Callable<void>() {
@Override
public Void call() throws Exception {
Sprinkler.getInstance().release(CONTEXT, new RuntimeException(EXCEPTION_MESSAGE));
return null;
}
});
Throwable thrown = null;
try {
Sprinkler.getInstance().await(CONTEXT, 10000);
} catch (Throwable t) {
// if the releaser thread delivers exception it will be externelized to this thread
thrown = t;
}
Assert.assertTrue(thrown instanceof SprinklerException);
Assert.assertEquals(EXCEPTION_MESSAGE, thrown.getCause().getMessage());
}
@Test
public void testAwait_Timeout() {
final int CONTEXT = 1;
ExecutorServiceFactory.getCachedThreadPoolExecutor().submit(new Callable<void>() {
@Override
public Void call() throws Exception {
Thread.sleep(10000);
return null;
}
});
Throwable thrown = null;
try {
Sprinkler.getInstance().await(CONTEXT, 1);
} catch (Throwable t) {
thrown = t;
}
Assert.assertTrue(thrown.getCause() instanceof TimeoutException);
}
}
Sprinkler 源代码
public class Sprinkler {
private static Sprinkler _instance = new Sprinkler();
private final ConcurrentMap<Integer, SprinklerData> _data =
new ConcurrentHashMap<Integer, SprinklerData>();
private Sprinkler() {}
public static Sprinkler getInstance() {
return _instance;
}
public void reset() {
_data.clear();
}
/**
* Locks the calling thread until someone will release it, or timeout will occur.
*
* @return data sent by releaser
*/
public Object await(int key, long timeout) {
SprinklerData data = null;
try {
data = getData(key);
doAwait(data.getLatch(), timeout);
externalizeException(data);
} finally {
_data.remove(key);
}
return data != null ? data.getInternal() : null;
}
public void release(int key) {
release(key, null, null);
}
public synchronized void release(int key, Object internalData) {
release(key, internalData, null);
}
public synchronized void release(int key, Throwable ex) {
release(key, null, ex);
}
/**
* Releases the lock on the waiting thread(s) for the given key, notifies them about
* the given exception.
*/
public synchronized void release(int key, Object internalData, Throwable ex) {
SprinklerData data = getData(key);
data.setInternal(internalData);
data.setAlreadyReleased(true);
if (ex != null) {
data.setException(ex);
}
notify(data.getLatch());
}
private synchronized SprinklerData getData(int key) {
SprinklerData data = _data.get(key);
if (data == null) {
data = new SprinklerData();
_data.put(key, data);
}
return data;
}
private void externalizeException(SprinklerData data) {
if (!isAlreadyReleased(data)) {
throw new SprinklerException(new TimeoutException());
}
Throwable thrown = data.getException();
if (thrown != null) {
throw new SprinklerException(thrown);
}
}
private void doAwait(CountDownLatch latch, long timeout) {
try {
latch.await(timeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException ex) {
throw new SprinklerException(ex);
}
}
private synchronized boolean isAlreadyReleased(SprinklerData data) {
return data.isAlreadyReleased();
}
private void notify(CountDownLatch lock) {
lock.countDown();
}
private static class SprinklerData {
private final CountDownLatch _latch;
private boolean _isAlreadyReleased = false;
private Throwable _thrown;
private Object _internal;
public SprinklerData() {
_latch = new CountDownLatch(1);
}
public Object getInternal() {
return _internal;
}
public void setInternal(Object data) {
_internal = data;
}
public CountDownLatch getLatch() {
return _latch;
}
public boolean isAlreadyReleased() {
return _isAlreadyReleased;
}
public void setAlreadyReleased(boolean isAlreadyReleased) {
_isAlreadyReleased = isAlreadyReleased;
}
public Throwable getException() {
return _thrown;
}
public void setException(Throwable thrown) {
_thrown = thrown;
}
@Override
public String toString() {
return String.format(
"SprinklerData [latch.count=%s, isAlreadyReleased=%s, internal=%s, thrown.message=%s]",
_latch == null ? "null latch" : _latch.getCount(),
_isAlreadyReleased,
_internal,
_thrown == null ? "null" : _thrown.getMessage());
}
}
}
关注点
实现此目的的另一种方法是使用线程池。线程池为您提供了一种集中管理所有当前运行线程的方式。由于它控制着池中运行的所有线程,因此您可以使用它来对线程调用等待。
示例
public class TestThreadPoolJoiner {
@Test
public void testJoin() throws InterruptedException, ExecutionException {
final int TASKS = 10;
final AtomicInteger executedTasks = new AtomicInteger(0);
ThreadPoolJoiner joiner = new ThreadPoolJoiner();
for (int i = 0; i < TASKS; i++) {
joiner.submit(new Callable<Integer>() {
@Override
public Integer call() {
return executedTasks.incrementAndGet();
}
});
}
joiner.join();
Assert.assertEquals(TASKS, executedTasks.get());
}
}
public class ThreadPoolJoiner extends ThreadPoolExecutor {
Collection<Future<?>> _tasks = new CopyOnWriteArrayList<Future<?>>();
public ThreadPoolJoiner() {
super(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
}
public void join() throws InterruptedException, ExecutionException {
for (Future<?> currTask : _tasks) {
currTask.get();
}
}
@Override
public <T> Future<T> submit(Callable<T> task) {
Future<T> ret = super.submit(task);
_tasks.add(ret);
return ret;
}
}