基础类, 用于创建一个用户线程管理器来检查用户作业
用于创建用户线程管理器以检查同步或异步调用的模式。此类的专有版本可用于正常的 Windows 服务,并检查作业状态。

通用线程管理器
这是一个用于创建用户线程管理器的简单基础。通过线程管理器,您可以检查作业的活动。使用此通用模式,您可以:
- 了解任务/线程的状态、启动/关闭时间、上次执行时间,以及在出现错误时获取上次错误描述
- 启动/停止线程/任务
- 从数据库表加载配置到配置文件
- 将状态保存到数据库表或配置文件中
- 调用用户同步/异步方法
- 拥有用户恢复阶段
- 使用单个 Windows 服务检查所有任务/线程,或使用多个 Windows 服务用于分布式系统
您可以使用这些类来创建您的 Windows 服务,并将线程状态保存到数据库/配置文件中,并记录主要活动。
我的任务是什么?
我的任务名为 StaticTask
,它是一个始终开启的单线程。当您启动任务时,线程管理器会抛出此族的一个线程。当您停止任务时,任务管理器会关闭任务的作业。当任务开启时,在循环中,它会在每个睡眠时间调用用户类的一个方法。
主要类是 StaticTask 和 TaskManager
TaskManager
TaskManager
将 StaticTask
与任务向量连接起来。这是一个 StaticTask
类的数组。要填充它,您应该使用:
SetNewTask()
:加载新的用户StaticTask
类。SetTasks()
:完成任务向量。当所有任务都加载后,必须调用SetTasks()
来完成任务向量。此方法会调用虚方法OnChangeTasksStatus()
来警告状态已更改。任务的状态为INITED
。Start()
方法:要启动所有任务,在循环中调用向量中加载的StaticTasks
对象的AsyncRun
方法。
在Start()
之后,状态将变为STARTED
。
在循环结束时,Start()
方法会调用VerifyTaskStatus
。
这是一个内部线程,用于验证任务状态VerifyTaskStatus
每隔DELTA_CHANGETIME
就会调用IsStatusChange
来检查一个或多个任务的状态是否已更改。
如果状态已更改,该方法会调用虚方法OnChangeTasksStatus
,否则调用OnChangeConfiguration
。
用户可以使用OnChangeTasksStatus
将任务状态保存到数据库表或配置文件中。ModifiedStatusTasks
方法会告诉您哪些任务已更改。
用户可以使用OnChangeConfiguration
来检查配置状态是否已更改。
例如,用户可以将状态更改为TO_START
/TO_STOP
,并且OnChangeConfiguration
方法可以使用start()
/stop()
任务方法。Stop()
方法用于停止所有任务。
它在循环中调用任务的Stop()
方法。之后,任务将变为STOPPED
。
其他方法
SetTask
用于更改任务:例如,用于重新初始化任务。GetTask
用于获取任务对象。GetTasks
用于获取所有任务对象。
StaticTask
StaticTask
是一个用于检查循环执行的类。在这个循环中,有一个名为 "Execute()
" 的虚方法。此外,在循环中,还有一个 Sleep
,因此 Execute()
方法会在每个 Sleep
(wait) 时间被调用。StaticTask
有一个名为 status 的属性(TaskStatus Enum
),用于枚举 Tasks
状态。构造方法接受以下参数:
- Wait
TaskConfig
类:它有一个字典(键,值)来配置用户任务。- Log 函数
delegate(string desc,string category)
构造函数将状态设置为 INITED
。Main
方法是 "Run()
"。它调用虚方法:
BeginTask
:在状态变为 STARTING 之后变为 STARTED。例如,它用于连接到数据库。Execute:
在循环中,并且每隔Sleep
(Wait
) 时间。它用于检查循环的速度。EndTask
:在状态变为 STOPPING 之前。例如,它用于断开与数据库的连接。OnError
:当Execute()
抛出异常时。例如,它用于断开与数据库的连接和/或日志记录。
Run()
方法不能抛出异常,因为有外部的 try
-catch
。当发生错误时:Run()
将状态更改为 ERROR
并填充一个 public string errordesc
。因此没有未处理的异常。当您调用 Stop()
时,循环将终止。此方法还会调用回调函数以将状态设置为 STOPPED
或 ERROR
。
- 当
StaticTask
变为STARTED
时,设置StartTime
。 - 当
StaticTask
变为STOPPED
/ERROR
时,设置EndTime
。 - 每次
StaticTask
成功调用Execute()
时,设置LastTime
。
使用代码:我的示例
我的示例是一个 Windows 窗体应用程序。在此项目中,有两个 TaskManager
:
NotifierManager
:它包含一个事件生成器,名为NotifierTask
。它生成一个事件并将其保存到队列中。每次从任务调用此方法时,Execute()
方法都会执行同步调用。ProcessEventManager
:它包含两个事件处理器,名为ProcessEventTask
。它处理队列中获取的事件。每次从任务调用此方法时,Execute()
方法都会执行异步调用。
任务使用队列进行通信。这是通过 EventQueue
类的实例创建的。
NotifierTask 继承 StaticTask
构造函数设置 wait、config、log。Execute()
实现事件通知器。每次调用此方法时,它都会使用 GenericEvent
类创建一个新事件。
public override void Execute(bool sync, bool recovery)
{
/* very simple */
if(event_id==0)
event_id=1;
else
event_id = 0;
GenericEvent ge = new GenericEvent(event_id);
((NotifierConfig)config).event_queue.AddEvent(ge);
log(base.config.name + " Execute event ID = " + event_id, "LOG");
base.Execute(sync, recovery);
}
然后将此新事件添加到 EventQueue
中。实例存储在 config
对象中。
此类别有两个方法,Add
和 GetEvent
。这些方法受 lock
变量 lock_event
保护。因此,这是线程安全的。
ArrayList event_list;
static object lock_event = new object();
public int AddEvent(GenericEvent ev)
{
lock (lock_event)
{
return event_list.Add(ev);
}
}
public int CountEvent()
{
return event_list.Count;
}
public GenericEvent GetEvent()
{
lock (lock_event)
{
if (event_list.Count > 0)
{
GenericEvent tmp = (GenericEvent)event_list[0];
event_list.RemoveAt(0);
return tmp;
}
}
return null;
}
如果您的作业在分布式系统中运行,最好使用数据库表并定义 Add
/GetEvent
过程来访问该表。当使用的记录被锁定并被跳过时,另一个进程会获取一条新消息。您也可以使用像 MSMQ 这样的系统队列。
ProcessEventTask 继承 StaticTask
这比 NotifierTask
更复杂,因为 Execute()
方法使用异步调用来执行作业。
在 Execute()
方法中:
- 有一个第一个循环用于移除已结束的异步调用。
- 如果异步调用数量少于最大值,则抛出一个新的异步调用。
- 获取下一个事件(
GetEvent
方法是线程安全的)。 - 通过新事件,
Execute()
方法可以创建一个新的事件处理器。
调用CreateProcessEvent()
。
public override void Execute(bool sync, bool recovery)
{
for (int i = CountAsyncCall()-1; i >=0 ; i--)
{
BaseExecutor ex = (BaseExecutor)_async_calls[i];
if (ex.ar.IsCompleted)
{
_async_calls.Remove(ex);
if (!ex.stato)
{
throw new Exception(ex.error);
}
}
}
if (CountAsyncCall() < max_executor_num)
{
GenericEvent corrent_event = event_queue.GetEvent();
if (corrent_event == null)
return;
BaseProcessEvent ex = CreateProcessEvent(corrent_event);
_async_calls.Add(ex.AsyncExecute());
}
base.Execute(sync, recovery);
}
BaseProcessEvent:通用事件处理器
BaseProcessEvent
是一个事件处理器。Main
方法是 Execute()
。此方法在循环中调用所有操作(参见 BaseAction
类的 Process()
方法),这些操作之前已通过 Add()
方法加载。有同步调用 Execute()
和异步调用 AsyncExecute
。在这种情况下,变量 ar(AsyncResult)
被填充。
public abstract class BaseAction
{
public abstract void Execute(string id, BaseAction ac_old);
}
在我的事件处理器中,有三个 ProcessEvents (ProcessEvent1, ProcessEvent2, ProcessEvent3
)。CreateProcessEvent
方法决定创建哪个 ProcessEvent
,这取决于事件类型。
public override BaseProcessEvent CreateProcessEvent(GenericEvent _generic_event)
{
switch(_generic_event.Id)
{
case "0": return new ProcessEvent1(_generic_event.Id, _orchestratorLog);
case "1": return new ProcessEvent2(_generic_event.Id, _orchestratorLog);
case "3": return new ProcessEvent3(_generic_event.Id, _orchestratorLog);
}
return null;
}
ProcessEvent1
继承自 BaseProcessEvent
:其操作是:
CreateAction
ProcessAction
LastAction
ProcessEvent2
继承自 BaseProcessEvent
:其操作是:
CreateAction
内务管理
LastAction
ProcessEvent3
继承自 BaseProcessEvent
:其操作是:
CreateAction
UpdateAction
LastAction
在我的示例中,所有操作都进行日志记录和休眠。
override public void Process(string Exeid,BaseAction ac_old)
{
log(Exeid + " " + id + " ProcessAction begin", "LOG");
Thread.Sleep(1000);
log(Exeid + " " + id + " ProcessAction end", "LOG");
return;
}
关注点
异步调用使用 BeginInvoke()
函数创建。BeginInvoke()
使用线程池机制(有关详细信息,请参阅 MSDN 文档)。
...
AsyncExecute asyncRunDelegate = new AsyncExecute(Run);
async_call= asyncRunDelegate.BeginInvoke(false,CallbackMethod, null);
...
...
void CallbackMethod(IAsyncResult ar)
{
AsyncResult result = (AsyncResult) ar;
AsyncExecute caller = (AsyncExecute) result.AsyncDelegate ;
string formatString = (string)ar.AsyncState ;
caller.EndInvoke(ar);
}
...
历史
- 2011 年 8 月 23 日:初始版本