65.9K
CodeProject 正在变化。 阅读更多。
Home

SyncEvent:“缺失”的 Java 事件类

starIconstarIconstarIconstarIconstarIcon

5.00/5 (1投票)

2015年6月30日

CPOL

7分钟阅读

viewsIcon

9911

downloadIcon

82

本文介绍了一个名为 SyncEvent 的通用事件类,它支持标准的线程同步操作,如在多个事件上进行信号通知和等待。

引言

如果您使用异步操作(例如线程)进行编程,迟早您会需要一种机制,允许一个线程暂停并等待另一个线程发出某种“确认”(信号通知)。(基本思想是“当您完成工作时,请通知我,以便我等待。”)这种机制在计算机科学中被称为“事件”,大多数操作系统都在其 API 中提供了这种功能。然而,Java 并没有提供开箱即用的实现此完整机制的类,而只提供了构建一个所需的底层功能。此外,它也没有提供一种直接的方式来让一个线程等待多个事件,直到第一个事件发生,而这有时是必需的。SyncEvent 类将所有这些功能打包到一个 Java 类中,可以作为线程同步操作的便捷方法使用。

背景

我来自微软背景,后来转向 Java 编写一些简单的 Android 应用。Android 大量使用异步操作(例如,所有网络相关的调用都不能从主进程线程调用),令我惊讶的是,Java 和 Android 都未提供开箱即用的此类功能,而这在微软操作系统和 .NET 框架中非常常见。我在网上寻找了一个通用的类,但没有找到,于是我决定构建 SyncEvent 类,包含最需要的功能。

SyncEvent 类说明

让我们看看 SyncEvent 类的接口。

  • 我们有一个构造函数,它接受一个 String 作为输入参数,该参数仅用于日志记录,以区分不同的事件(您的程序中很可能不止一个事件)。
  • 主要方法如下(它们都按照 javadoc 规范进行了文档记录):
    • waitForEvent:此方法暂停当前线程,等待事件的信号。您可以指定等待是无限期/有期的,以及在方法返回时事件是否自动清除信号状态(默认值为 true)。
    • signalEvent:唤醒一个正在等待此事件的线程。
    • resetEvent:手动清除事件的信号状态。
    • isSignalled:返回此事件是否已被信号通知。您也可以在此处指定是否希望在返回时自动清除信号状态。
    • getName:返回事件的名称,该名称在构造函数中指定。
  • 还有两个 static 方法:
    • WaitForMultipleEvents:允许等待多个事件,一旦其中任何一个事件发生就唤醒(从 V1.2 版本开始,它已使用队列和相同的 SyncEvent 类重新实现,以实现最“基本”的代码)。
    • WaitForMultipleEventsExecutor:先前版本的 WaitForMultipleEvents,使用 ExecutorCompletionService 类实现(仍作为一种替代实现)。

package com.fdm.syncevent;

import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import android.util.Pair;

public class SyncEvent {
    final static ILogger log = new LogcatLogger("SyncEvent");

    Object sync;
    boolean syncSignalled;     	// the boolean is needed to save the status of signal notification, 
				// when the signal comes before the wait operation    
    String syncName;

    public SyncEvent(String name) {
        sync=new Object();
        syncSignalled=false;
        syncName=name;
    }
   

    /**
    * Gets the signal name specified during creation.
    * @return Returns the signal name used for logging.
    */
    public String getName() {
        return syncName;
    }

    /**
    * Waits indefinetely for the event to be signalled. 
    * The signalled status of the event is auto-cleared on return.
    * @return <li>false</li> if the wait operation has been interrupted 
    * (someone is trying to "kill" our thread), <li>true</li> is the event has been signalled.
    */
    public boolean waitForEvent() {
        return waitForEvent(true);
    }

    /**
    * Waits indefinetely for the event to be signalled.
    * @param  autoclear    if true the signalled status of the event is auto-cleared on return.
    * @return <li>false</li> if the wait operation has been interrupted 
    * (someone is trying to "kill" our thread), <li>true</li> is the event has been signalled.
    */
    public boolean waitForEvent(boolean autoclear) {
          synchronized (sync) {
                  // while is needed since Java's wait function could return even when no signal 
		  // has been raised (so called spurious wakeups)
                  while (!syncSignalled) {
                      //log.d("%s: Awaiting signal",syncName);
                      try {
                          sync.wait();
                      }
                      catch (InterruptedException e) {
                          return false;                     
                      }
                  }
                  if (autoclear)
                      syncSignalled=false;                       
          }

          return true;
    }
   

    /**
    * Waits for the event to be signalled up to the specified time. 
    * The signalled status is auto-cleared on return
    * @param  millis    maximum number of milliseconds to wait for
    * @return
    * <li>0</li> if the event has been signalled;
    * <li>-1</li> if the wait operation has been interrupted (someone is trying to "kill" our thread);
    * <li>-2</li> if the timeout has elapsed and the event has not been signalled
    */    
    public int waitForEvent(long millis) {
        return waitForEvent(millis, true);
    }
   
   

    /**
    * Waits for the event to be signalled up to the specified time.
    * @param  millis    maximum number of milliseconds to wait for
    * @param  autoclear    if true the signalled status of the event is auto-cleared on return.
    * @return
    * <li>0</li> if the event has been signalled;
    * <li>-1</li> if the wait operation has been interrupted (someone is trying to "kill" our thread);
    * <li>-2</li> if the timeout has elapsed and the event has not been signalled
    */    
    public int waitForEvent(long millis, boolean autoclear) {                 
        long startTime,elapsed;

        synchronized (sync) {
                // while is needed since Java's wait function could return even when no signal 
                // has been raised (so called spurious wakeups)
                  while (!syncSignalled) {
                      //log.d("%s: Awaiting signal",syncName);
                      try {
                          startTime=System.currentTimeMillis();
                          sync.wait(millis);
                          elapsed=System.currentTimeMillis()-startTime;
                          if (elapsed>=millis)  // if it is a real timeout exit
                              break;
                          else
                              millis-=elapsed;  // if it is a spurious wakeup go on waiting 
						// till the timeout has been reached
                      }
                      catch (InterruptedException e) {
                          return -1;                     
                      }
                  }
                  boolean signal=syncSignalled;
                  if (autoclear)
                      syncSignalled=false;                       
                  return (signal?0:-2);
          }
    }

   

    /**
        * Waits for the first of multiple events to be signalled up to the specified time.
        * The signalled status of the first event is auto-cleared on return
        * @param  millis    maximum number of milliseconds to wait for
        * @param  events    an array of one or more events to wait concurrently for
        * @return
        * <li>i</li>  i>=0, if the i-th event has been signalled;
        * <li>-1</li> if the wait operation has been interrupted
        * (someone is trying to "kill" our thread);
        * <li>-2</li> if the timeout has elapsed and none of the events have been signalled
        */         
        public static int waitForMultipleEventsExecutor(final long millis, final SyncEvent ...events) {
            int numEvents=events.length,ris=-1;                

            ExecutorService poolMultipleWait=Executors.newFixedThreadPool(numEvents);
            ExecutorCompletionService<Pair<Integer,Integer>>
            cs=new ExecutorCompletionService<Pair<Integer,Integer>>(poolMultipleWait);

            // we launch multiple waits in parallel
            for (int i=0;i<numEvents;i++) {
                final int index=i;
                cs.submit(new Callable<Pair<Integer,Integer>>() {
                    @Override
                    public Pair<Integer,Integer> call() throws Exception {
                        return new Pair<Integer,Integer>(index,events[index].waitForEvent(millis,false));
                    }
                });
            }

            Pair<Integer, Integer> signalled;
            try {
                log.d("waitForMultipleEvents: awaiting on %d events %d timeout",numEvents,millis);
                signalled = cs.take().get();
                if (signalled.second==0)
                    ris=signalled.first;
                else
                    ris=signalled.second;
                events[signalled.first].resetEvent(); // we clear the event status
                log.d("waitForMultipleEvents: received event %d (%s) result %d",
			signalled.first,events[signalled.first].syncName,signalled.second);
            }
            catch (InterruptedException e) {
                    ris=-1;  // we have been interrupted
            }
            catch (ExecutionException e) {
                log.e(e,"waitForMultipleEvents: exception");
            }
            finally {
                poolMultipleWait.shutdownNow();  // abort any other wait operation           
            }        

            return ris;       
        }    

    /**
    * Waits for the first of multiple events to be signalled up to the specified time. 
    * The signalled status of the first event is auto-cleared on return
    * @param  millis    maximum number of milliseconds to wait for
    * @param  events    an array of one or more events to wait concurrently for
    * @return
    * <li>i</li>  i>=0, if the i-th event has been signalled;
    * <li>-1</li> if the wait operation has been interrupted (someone is trying to "kill" our thread);
    * <li>-2</li> if the timeout has elapsed and none of the events have been signalled
    */         
    public static int waitForMultipleEvents (final long millis, final SyncEvent ...events) {
        int numEvents=events.length,ris=-1;                

        final SyncEvent eventQueueNotEmpty=new SyncEvent("eventQueueNotEmpty");
        final Queue<Pair<Integer,Integer>> 
		eventQueue=new ConcurrentLinkedQueue<Pair<Integer,Integer>>();

        Thread [] poolMultipleWait=new Thread[numEvents];

        // we launch multiple waits in parallel
        for (int i=0;i<numEvents;i++) {
            final int index=i;
            poolMultipleWait[i]=new Thread() {

                @Override
                public void run() {
                    eventQueue.add(new Pair<Integer,Integer>
			(index,events[index].waitForEvent(false)?0:-1));
                    eventQueueNotEmpty.signalEvent();
                    //log.d("waitForMultipleEvents: thread waiting on event %s exiting...",
                    //events[index].getName());
                }
            };
            poolMultipleWait[i].start();
        }
                   

        log.d("waitForMultipleEvents: awaiting on %d events %d timeout 
		(ActiveThreads %d)",numEvents,millis,Thread.activeCount());
        ris=eventQueueNotEmpty.waitForEvent(millis);

        // abort any other wait operation
        for (int i=0;i<numEvents;i++)
            if (poolMultipleWait[i].isAlive())
                poolMultipleWait[i].interrupt();            

        // if we have been signalled
        if (ris==0) {       
            // check the "first" signalled event
            Pair<Integer, Integer> signalled = eventQueue.peek();
            if (signalled.second==0) {
                ris=signalled.first;
                events[signalled.first].resetEvent(); // we clear the event status
            }
            else
                ris=signalled.second;

            log.d("waitForMultipleEvents: received event %d (%s) result %d",
		signalled.first,events[signalled.first].syncName,signalled.second);
        }

        return ris;       
    }
   
   

    /**
    * Waits indefinitely for the first of multiple events to be signalled. 
    * The signalled status of the first event is auto-cleared on return
    * @param    events    an array of one or more events to wait concurrently for
    * @return
    * <li>i</li> i>=0, if the i-th event has been signalled;
    * <li>-1</li> if the wait operation has been interrupted (someone is trying to "kill" our thread);
    */         
    public static int waitForMultipleEvents (final SyncEvent ...events) {
        int numEvents=events.length,ris=-1;                

        final SyncEvent eventQueueNotEmpty=new SyncEvent("eventQueueNotEmpty");
        final Queue<Pair<Integer,Integer>> 
		eventQueue=new ConcurrentLinkedQueue<Pair<Integer,Integer>>();

        Thread [] poolMultipleWait=new Thread[numEvents];

        // we launch multiple waits in parallel
        for (int i=0;i<numEvents;i++) {
            final int index=i;
            poolMultipleWait[i]=new Thread() {

                @Override
                public void run() {
                    eventQueue.add(new Pair<Integer,Integer>
			(index,events[index].waitForEvent(false)?0:-1));
                    eventQueueNotEmpty.signalEvent();
                    //log.d("waitForMultipleEvents: thread waiting on event %s exiting...",
                    //events[index].getName());
                }
            };
            poolMultipleWait[i].start();
        }
                   

        log.d("waitForMultipleEvents: awaiting on %d events (ActiveThreads %d)",
		numEvents,Thread.activeCount());
        ris=eventQueueNotEmpty.waitForEvent()?0:-1;

        // abort any other wait operation
        for (int i=0;i<numEvents;i++)
            if (poolMultipleWait[i].isAlive())
                poolMultipleWait[i].interrupt();            

        // if we have been signalled
        if (ris==0) {       
            // check the "first" signalled event
            Pair<Integer, Integer> signalled = eventQueue.peek();
            if (signalled.second==0) {
                ris=signalled.first;
                events[signalled.first].resetEvent(); // we clear the event status
            }
            else
                ris=signalled.second;

            log.d("waitForMultipleEvents: received event %d (%s) result %d",
		signalled.first,events[signalled.first].syncName,signalled.second);
        }

        return ris;       
    }
   

    /**
    * Waits indefinitely for the first of multiple events to be signalled. 
    * The signalled status of the first event is auto-cleared on return
    * @param    events    an array of one or more events to wait concurrently for
    * @return
    * <li>i</li> i>=0, if the i-th event has been signalled;
    * <li>-1</li> if the wait operation has been interrupted (someone is trying to "kill" our thread);
    */         
    public static int waitForMultipleEventsExecutor (final SyncEvent ...events) {
        int numEvents=events.length,ris=-1;

        ExecutorService poolMultipleWait=Executors.newFixedThreadPool(numEvents);
        ExecutorCompletionService<Pair<Integer,Boolean>> 
		cs=new ExecutorCompletionService<Pair<Integer,Boolean>>(poolMultipleWait);

        // we launch multiple waits in parallel
        for (int i=0;i<numEvents;i++) {
            final int index=i;
            cs.submit(new Callable<Pair<Integer,Boolean>>() {
                @Override
                public Pair<Integer,Boolean> call() throws Exception {
                    return new Pair<Integer,Boolean>(index,events[index].waitForEvent(false));
                }
            });
        }

        Pair<Integer, Boolean> signalled;
        try {
            log.d("waitForMultipleEvents: awaiting on %d events",numEvents);
            signalled = cs.take().get();
            if (signalled.second)
                ris=signalled.first;
            events[signalled.first].resetEvent(); // we clear the event status
            log.d("waitForMultipleEvents: received event %d (%s) result %b",
		signalled.first,events[signalled.first].getName(),signalled.second);
        }
        catch (InterruptedException e) {
            ris=-1;
        }
        catch (ExecutionException e) {
            log.e(e,"waitForMultipleEvents: exception");
        }
        finally {
            poolMultipleWait.shutdownNow();  // abort any other wait operation           
        }

        return ris;       
    }

   

    /**
    * Checks if the event has been signalled. The signal status is auto-cleared on return.
    * @return <li>true</li> if the event has been signalled;
    * <li>false</li>    otherwise
    */    
    public boolean isSignalled () {
        return isSignalled(true);
    }
   
   

    /**
    * Checks if the event has been signalled.
    * @param  autoclear    if true the signalled status of the event is auto-cleared on return.
    * @return <li>true</li> if the event has been signalled;
    * <li>false</li>    otherwise
    */    
    public boolean isSignalled (boolean autoclear) {
        synchronized (sync) {
            boolean ris=syncSignalled;
            if (autoclear)
                syncSignalled=false;
            return ris;
        }
    }

   
   

    /**
    * Signals the event, waking one of the threads waiting on the signal (can be more than one).
    */    
    public void signalEvent() {
          synchronized (sync) {
            syncSignalled=true;
            sync.notify();
          }
    }
   

    /**
    * Clears manually the signalled status of the event.
    */    
    public void resetEvent() {
          synchronized (sync) {
            syncSignalled=false;
          }       
    }
}

实现说明

SyncEvent 类处理了 Java 标准同步函数(Object 类的 wait 和 notify 函数)中遇到的大多数常见问题。

  • 丢失的事件:如果您在线程开始在某个对象上等待之前就发出该对象的 notify 函数,信号就会丢失(即,等待线程将继续等待而不会被唤醒)。此解决方案是众所周知的(参见 [1]),它基于将信号状态保存在一个名为 syncSignalled 的内部变量中。
  • 虚假唤醒:出于“无法解释”的原因,线程可能会从 wait 调用中唤醒,即使 notify() 方法未被调用。这就是所谓的虚假唤醒(例如,无原因的唤醒)。为了应对这种意外行为,syncSignalled 变量再次发挥作用,因为所有 waitForEvent 方法都会进入一个变量-等待循环,直到 syncSignalled 变量被设置。
  • 等待多个事件:Java 没有提供等待多个事件的同步函数(在 Windows API 中称为 WaitForMultipleObjects)。解决此问题的方法是创建多个线程(每个线程等待一个事件),每个线程在 waitForEvent 操作中暂停等待其中一个输入事件(在 WaitForMultipleEventsExecutor 方法中,线程使用 ExecutorService 类作为线程池)。然后,调用 static waitForMultipleEvents 方法的线程会暂停,等待哪个创建的线程“首先”唤醒,它使用一个队列和同一个 SyncEvent 类来通知线程的终止(在 WaitForMultipleEventsExecutor 中,队列和终止事件都由 ExecutorCompletionService 类处理)。一旦任何一个事件被信号通知,相应的创建线程就会结束,WaitForMultipleEvents 函数就会唤醒,收集结果,并向所有仍然等待其他未触发事件的剩余线程发送中断。在此处使用非自动清除的 waitForEvent 操作很重要,否则可能会丢失一些事件:例如,当两个事件相继发生信号,而 WaitForMultipleEvents 函数还没有足够的时间来中断所有剩余的事件等待线程。
  • InterruptedException:许多处理线程挂起的 Java 方法都会抛出不期望的 InterruptedException,即使大多数情况下软件并不需要处理它(例如,使用空的 catch 块进行 try)。InterruptedException 基本上是通知一个“等待”线程(例如,调用了 wait 或 sleep 的线程),有人试图“杀死”我们(例如,用户试图关闭进程,或者另一个线程明确调用了已挂起线程的 interrupt 方法)。由于挂起的线程未执行,通知的唯一方式是通过抛出异常来恢复它,这正如您所能想象的,就是著名的 InterruptedException 异常。如果您理解了该机制,您大概可以想象处理此异常的最佳方法是终止挂起线程中正在执行的任何操作,释放所有已分配的资源,然后退出线程。如果此异常可以沿着调用栈传播,那将很有用,但由于 Java 的受查异常,这并不总是可能的(例如,在具有不允许任何进一步 throw 子句的精确签名的方法中使用 wait 函数,例如在实现 Java 接口方法时),因此出于方便起见,SyncEvent 类中的所有方法都会在内部捕获 InterruptedException,并在被中断时返回一个特定的值(请参阅方法文档)。

Using the Code

首先,实例化 SyncEvent 对象。

SyncEvent connected = new SyncEvent("connected");
SyncEvent newPeer = new SyncEvent("newPeer");

在您希望等待事件的线程中,调用:

// the thread will suspend for indefinite time, awaiting the event
// returns true when the event is signalled or false when we have been interrupted
connected.waitForEvent();
// the thead will suspend awaiting the event for 10 seconds
// returns an 0 when the event is signalled, -1 when we have been interrupted 
// and -2 when the timeout elapsed without any event.
connected.waitForEvent(10000);

在您希望发出事件信号的线程中,调用:

connected.signalEvent();

如果您想在一个线程中等待多个事件:

// the thread will suspend indefinitely, awaiting both events
// returns i-th event where i>=0, if i-th event has been signalled 
// (0 for connected, 1 for newPeer) or  -1 when we have been interrupted
SyncEvent.waitForMultipleEvents(connected,newPeer);
// the thread will suspend awaiting both events for 10 seconds
// returns i-th event where i>=0, if i-th event has been signalled 
// (0 for connected, 1 for newPeer), -1 when we have been interrupted and -2 
// when the timeout elapsed without any event.
SyncEvent.waitForMultipleEvents(10000,connected,newPeer);

源代码

随附了一个名为 SyncEvent 的 Android Eclipse 项目,其中包含 SyncEvent 类和一个示例测试程序。它包含一个简单的 Activity,只有两个按钮:

  • Start(开始):它具有双重行为。
    • 如果不存在等待线程,它会创建一个名为“Thread 1”的线程,该线程仅在两个 SyncEvent 上无限等待。 调用“start”和“stop”(实际上,它通过使用 while 循环和 5 秒的定时等待来模拟无限等待,以测试“等待超时”方法,这些方法在早期版本中存在 bug)。每当收到 startstop 事件时,“Thread 1”会记录接收到的事件,然后重新开始在两个 SyncEvents 上无限等待。
    • 如果存在等待线程,它会向“Thread 1”发送一个随机事件(startstop)来唤醒它。
  • Abort(中止):中止等待线程。

如果您启动该 Activity,可以在 Eclipse LogCat 窗口中查看输出。该应用程序还会记录每次开始等待 SyncEvent 时的活动线程数,以检查所有线程创建/中断操作是否都能完美工作(即,线程计数不会随时间增加)。

以下是点击事件的主要代码,非常直接:

    @Override
    public void onClick(View v) {
        switch (v.getId()) {
            case R.id.startBtn:
                // Create Thread 1
                if (t==null) {
                    t=new Thread() {
                        public void run() {
            
                            log.i("Thread 1 started");
                            while (true) {
                                log.i("Awaiting start or stop signal from another thread");
                                int ris=SyncEvent.waitForMultipleEvents(5000,events);
                                if (ris==-1) {
                                    log.i("Thread 1 abort signal received");
                                    break;
                                }
                                else if (ris>=0)
                                    log.i("Thread 1 %s event received, processing...",events[ris].getName());
                                else
                                    log.i("Thread 1 timeout elapsed, restarting...");
                            }
                            log.i("Thread 1 exiting");
                            t=null;
                        };
                    };
                    t.start();
                }
                else {
                    int offset=(int) (System.currentTimeMillis()%2);
                    log.i("Signalling %s to thread 1",events[offset].getName());
                    events[offset].signalEvent();
            }

            break;
            
            case R.id.abortBtn:
                log.i("Signalling abort to thread 1");
                if (t!=null)
                    t.interrupt();
            break;        
        }
    }

备注

人们可能会注意到,所有这些操作都可以通过将常量值插入阻塞队列来简单实现。这显然是正确的,因为阻塞队列“在幕后”是队列和事件的组合。为避免误用,当您只想唤醒一个线程时(例如,发送一个不带数据的信号),应该使用事件;而当您要将数据发送给“消费者”线程时,则应使用阻塞队列。

参考文献

历史

  • V1.0 初始发布(2015 年 6 月 30 日)
  • V 1.1 修复了 SyncEvent 类源代码(2015 年 6 月 30 日)
  • V 1.2 修复了带超时的 waitForEvent 方法(以区分虚假唤醒和超时),使用 ConcurrentLinkedQueue、Threads 和 SyncEvent 为最“基本”的代码重新实现了 waitForMultipleEvents(旧的 waitForMultipleEvents 实现可在新的 waitForMultipleEventsExecutor 方法中找到),添加了 getName 方法并更新了 Android 测试应用程序(2015 年 7 月 8 日)
© . All rights reserved.