65.9K
CodeProject 正在变化。 阅读更多。
Home

基础类, 用于创建一个用户线程管理器来检查用户作业

starIconstarIconstarIconstarIconemptyStarIcon

4.00/5 (2投票s)

2011 年 8 月 23 日

MIT

6分钟阅读

viewsIcon

21078

downloadIcon

266

用于创建用户线程管理器以检查同步或异步调用的模式。此类的专有版本可用于正常的 Windows 服务,并检查作业状态。

WFA_StaticTaskSampleRid.gif

通用线程管理器

这是一个用于创建用户线程管理器的简单基础。通过线程管理器,您可以检查作业的活动。使用此通用模式,您可以:

  1. 了解任务/线程的状态、启动/关闭时间、上次执行时间,以及在出现错误时获取上次错误描述
  2. 启动/停止线程/任务 
  3. 从数据库表加载配置到配置文件
  4. 将状态保存到数据库表或配置文件中
  5. 调用用户同步/异步方法
  6. 拥有用户恢复阶段
  7. 使用单个 Windows 服务检查所有任务/线程,或使用多个 Windows 服务用于分布式系统

您可以使用这些类来创建您的 Windows 服务,并将线程状态保存到数据库/配置文件中,并记录主要活动。

我的任务是什么?

我的任务名为 StaticTask,它是一个始终开启的单线程。当您启动任务时,线程管理器会抛出此族的一个线程。当您停止任务时,任务管理器会关闭任务的作业。当任务开启时,在循环中,它会在每个睡眠时间调用用户类的一个方法。

主要类是 StaticTask 和 TaskManager

TaskManager

TaskManagerStaticTask 与任务向量连接起来。这是一个 StaticTask 类的数组。要填充它,您应该使用:

  • SetNewTask():加载新的用户 StaticTask 类。
  • SetTasks():完成任务向量。当所有任务都加载后,必须调用 SetTasks() 来完成任务向量。此方法会调用虚方法 OnChangeTasksStatus() 来警告状态已更改。任务的状态为 INITED
  • Start() 方法:要启动所有任务,在循环中调用向量中加载的 StaticTasks 对象的 AsyncRun 方法。
    Start() 之后,状态将变为 STARTED
    在循环结束时,Start() 方法会调用 VerifyTaskStatus
    这是一个内部线程,用于验证任务状态
    VerifyTaskStatus 每隔 DELTA_CHANGETIME 就会调用 IsStatusChange 来检查一个或多个任务的状态是否已更改。
    如果状态已更改,该方法会调用虚方法 OnChangeTasksStatus,否则调用 OnChangeConfiguration
    用户可以使用 OnChangeTasksStatus 将任务状态保存到数据库表或配置文件中。
    ModifiedStatusTasks 方法会告诉您哪些任务已更改。
    用户可以使用 OnChangeConfiguration 来检查配置状态是否已更改。
    例如,用户可以将状态更改为 TO_START/TO_STOP,并且 OnChangeConfiguration 方法可以使用 start()/stop() 任务方法。
  • Stop() 方法用于停止所有任务。
    它在循环中调用任务的 Stop() 方法。之后,任务将变为 STOPPED

其他方法

  • SetTask 用于更改任务:例如,用于重新初始化任务。
  • GetTask 用于获取任务对象。
  • GetTasks 用于获取所有任务对象。

StaticTask

StaticTask 是一个用于检查循环执行的类。在这个循环中,有一个名为 "Execute()" 的虚方法。此外,在循环中,还有一个 Sleep,因此 Execute() 方法会在每个 Sleep(wait) 时间被调用。StaticTask 有一个名为 status 的属性(TaskStatus Enum),用于枚举 Tasks 状态。构造方法接受以下参数:

  • Wait
  • TaskConfig 类:它有一个字典(键,值)来配置用户任务。
  • Log 函数 delegate(string desc,string category)

构造函数将状态设置为 INITEDMain 方法是 "Run()"。它调用虚方法:

  • BeginTask:在状态变为 STARTING 之后变为 STARTED。例如,它用于连接到数据库。
  • Execute: 在循环中,并且每隔 Sleep(Wait) 时间。它用于检查循环的速度。
  • EndTask:在状态变为 STOPPING 之前。例如,它用于断开与数据库的连接。
  • OnError:当 Execute() 抛出异常时。例如,它用于断开与数据库的连接和/或日志记录。

Run() 方法不能抛出异常,因为有外部的 try-catch。当发生错误时:Run() 将状态更改为 ERROR 并填充一个 public string errordesc。因此没有未处理的异常。当您调用 Stop() 时,循环将终止。此方法还会调用回调函数以将状态设置为 STOPPEDERROR

  • StaticTask 变为 STARTED 时,设置 StartTime
  • StaticTask 变为 STOPPED/ERROR 时,设置 EndTime
  • 每次 StaticTask 成功调用 Execute() 时,设置 LastTime

使用代码:我的示例

我的示例是一个 Windows 窗体应用程序。在此项目中,有两个 TaskManager

  • NotifierManager:它包含一个事件生成器,名为 NotifierTask。它生成一个事件并将其保存到队列中。每次从任务调用此方法时,Execute() 方法都会执行同步调用。
  • ProcessEventManager:它包含两个事件处理器,名为 ProcessEventTask。它处理队列中获取的事件。每次从任务调用此方法时,Execute() 方法都会执行异步调用。

任务使用队列进行通信。这是通过 EventQueue 类的实例创建的。

NotifierTask 继承 StaticTask

构造函数设置 wait、config、log。Execute() 实现事件通知器。每次调用此方法时,它都会使用 GenericEvent 类创建一个新事件。

public override void  Execute(bool sync, bool recovery)
{
	/* very simple */
	if(event_id==0)
		event_id=1;
	else
		event_id = 0;

	GenericEvent ge = new GenericEvent(event_id);

	((NotifierConfig)config).event_queue.AddEvent(ge);

	log(base.config.name + " Execute event ID = " + event_id, "LOG");

	base.Execute(sync, recovery);
}  

然后将此新事件添加到 EventQueue 中。实例存储在 config 对象中。
此类别有两个方法,AddGetEvent。这些方法受 lock 变量 lock_event 保护。因此,这是线程安全的。

ArrayList event_list;

static object lock_event = new object();

public int AddEvent(GenericEvent ev)
{
	lock (lock_event)
	{
		return event_list.Add(ev);
	}
}

public int CountEvent()
{
	return event_list.Count;
}

public GenericEvent GetEvent()
{
	lock (lock_event)
	{
		if (event_list.Count > 0)
		{
			GenericEvent tmp = (GenericEvent)event_list[0];
			event_list.RemoveAt(0);
			return tmp;
		}
	}
	return null;
}

如果您的作业在分布式系统中运行,最好使用数据库表并定义 Add/GetEvent 过程来访问该表。当使用的记录被锁定并被跳过时,另一个进程会获取一条新消息。您也可以使用像 MSMQ 这样的系统队列。

ProcessEventTask 继承 StaticTask

这比 NotifierTask 更复杂,因为 Execute() 方法使用异步调用来执行作业。

Execute() 方法中:

  1. 有一个第一个循环用于移除已结束的异步调用。
  2. 如果异步调用数量少于最大值,则抛出一个新的异步调用。
  3. 获取下一个事件(GetEvent 方法是线程安全的)。
  4. 通过新事件,Execute() 方法可以创建一个新的事件处理器。
    调用 CreateProcessEvent()
public override void  Execute(bool sync, bool recovery)
	{
		for (int i = CountAsyncCall()-1; i >=0 ; i--)
		{
			BaseExecutor ex = (BaseExecutor)_async_calls[i];
			if (ex.ar.IsCompleted)
			{
				_async_calls.Remove(ex);
				if (!ex.stato)
				{
					throw new Exception(ex.error);
				}
			}
		}

		if (CountAsyncCall() < max_executor_num)
		{
			GenericEvent corrent_event = event_queue.GetEvent();

			if (corrent_event == null)
				return;

			BaseProcessEvent ex = CreateProcessEvent(corrent_event);
			_async_calls.Add(ex.AsyncExecute());

		}

		base.Execute(sync, recovery);
	}

BaseProcessEvent:通用事件处理器

BaseProcessEvent 是一个事件处理器。Main 方法是 Execute()。此方法在循环中调用所有操作(参见 BaseAction 类的 Process() 方法),这些操作之前已通过 Add() 方法加载。有同步调用 Execute() 和异步调用 AsyncExecute。在这种情况下,变量 ar(AsyncResult) 被填充。

public abstract class BaseAction
{
    public abstract void Execute(string id, BaseAction ac_old);
}

在我的事件处理器中,有三个 ProcessEvents (ProcessEvent1, ProcessEvent2, ProcessEvent3)。CreateProcessEvent 方法决定创建哪个 ProcessEvent,这取决于事件类型。

public override BaseProcessEvent  CreateProcessEvent(GenericEvent _generic_event)
{
	switch(_generic_event.Id)
	{
		case "0": return new ProcessEvent1(_generic_event.Id, _orchestratorLog);
		case "1": return new ProcessEvent2(_generic_event.Id, _orchestratorLog);
		case "3": return new ProcessEvent3(_generic_event.Id, _orchestratorLog);
	}
	return null;
}

ProcessEvent1 继承自 BaseProcessEvent:其操作是:

  • CreateAction
  • ProcessAction
  • LastAction

ProcessEvent2 继承自 BaseProcessEvent:其操作是:

  • CreateAction
  • 内务管理
  • LastAction

ProcessEvent3 继承自 BaseProcessEvent:其操作是:

  • CreateAction
  • UpdateAction
  • LastAction

在我的示例中,所有操作都进行日志记录和休眠。

override public void Process(string Exeid,BaseAction ac_old)
{
	log(Exeid + "  " + id + "   ProcessAction begin", "LOG");
	Thread.Sleep(1000);
	log(Exeid + "  " + id + "   ProcessAction end", "LOG");
	return;
}

关注点

异步调用使用 BeginInvoke() 函数创建。BeginInvoke() 使用线程池机制(有关详细信息,请参阅 MSDN 文档)。

...
   AsyncExecute asyncRunDelegate = new AsyncExecute(Run);
   async_call= asyncRunDelegate.BeginInvoke(false,CallbackMethod, null);
...
...

void CallbackMethod(IAsyncResult ar)
{
	AsyncResult result = (AsyncResult) ar;
	AsyncExecute caller = (AsyncExecute) result.AsyncDelegate ;

	string formatString = (string)ar.AsyncState ;

	caller.EndInvoke(ar);
}
...

历史

  • 2011 年 8 月 23 日:初始版本
© . All rights reserved.