使用 C# 的管道和过滤器模式






4.91/5 (8投票s)
使用 C# 实现管道和过滤器模式。
引言
管道和过滤器模式在需要对对象执行一系列过滤(处理)以将其转换为可用状态的情况下非常有用且整洁,如下面的图片所示。
本文中使用的代码是管道和过滤器模式的完整通用实现。
背景
理解消息传递模式对于理解企业集成模式至关重要。
Using the Code
这里的代码考虑了存在一个对象仓库 AgentStatusRepository
,它有一个 static
属性 Agents
来返回 List<Agent>
, 此 AgentStatus
列表是管道的输入,管道需要对其进行各种过滤。
AgentStatus 类(步骤 1)
public class AgentStatus
{
public string AgentId { get; set; }
public string PresenceStatus { get; set; }
public int CurrentWorkload { get; set; }
public int MaxWorkload { get; set; }
public DateTime StatusUpdateDatetime { get; set; }
}
PresenceStatus
的可能值
可用
忙碌
研究
离线
以下是需要对从 AgentStatusRepository
获取的 List<AgentStatus>
对象(上图中的 Object1
)执行的过滤列表
AgentAvailabilityFilter
(上图中的Filter1
)- 过滤List<Agent>
,其中Agent.PresenceStatus == 'Available'
AgentWorkloadFilter
(上图中的Filter2
)- 过滤从Filter1
过滤后的列表,其中Agent.CurrentWorkload < Agent.MaxWorkload
AgentPresenceUpdateDatetimeFilter
(上图中的Filter3
)- 过滤从Filter2
过滤后的列表,其中(DateTime.UtcNow - Agent.StatusUpdateDatetime).TotalMinutes < 3
AgentStatusRepository
代码
public class AgentStatusRepository
{
public static List<Agent> Agents
{
get
{
return new List<Agent>
{
new Agent {AgentId = "agent_id_1",
PresenceStatus = "Available", CurrentWorkload = 1,
MaxWorkload = 2, StatusUpdateDatetime = DateTime.UtcNow },
new Agent {AgentId = "agent_id_2",
PresenceStatus = "Busy", CurrentWorkload = 2,
MaxWorkload = 2, StatusUpdateDatetime = DateTime.UtcNow },
new Agent {AgentId = "agent_id_3",
PresenceStatus = "Available", CurrentWorkload = 2,
MaxWorkload = 3, StatusUpdateDatetime = DateTime.UtcNow.AddMinutes (-5) },
new Agent {AgentId = "agent_id_4",
PresenceStatus = "Research", CurrentWorkload = 2,
MaxWorkload = 3, StatusUpdateDatetime = DateTime.UtcNow},
new Agent {AgentId = "agent_id_5",
PresenceStatus = "Available", CurrentWorkload = 2,
MaxWorkload = 3, StatusUpdateDatetime = DateTime.UtcNow },
new Agent {AgentId = "agent_id_6",
PresenceStatus = "Available", CurrentWorkload = 5,
MaxWorkload = 5, StatusUpdateDatetime = DateTime.UtcNow}
};
}
}
}
管道和过滤器类图
IFilter
代码
/// <summary>
/// A filter to be registered in the message processing pipeline
/// </summary>
/// <typeparam name="T"></typeparam>
public interface IFilter<T>
{
/// <summary>
/// Filter implementing this method would perform processing on the input type T
/// </summary>
/// <param name="input">The input to be executed by the filter</param>
/// <returns></returns>
T Execute(T input);
}
Pipeline
代码
/// <summary>
/// An abstract Pipeline with a list of filters and abstract Process method
/// </summary>
/// <typeparam name="T"></typeparam>
public abstract class Pipeline<T>
{
/// <summary>
/// List of filters in the pipeline
/// </summary>
protected readonly List<IFilter<T>> filters = new List<IFilter<T>>();
/// <summary>
/// To Register filter in the pipeline
/// </summary>
/// <param name="filter">A filter object implementing IFilter interface</param>
/// <returns></returns>
public Pipeline<T> Register(IFilter<T> filter)
{
filters.Add(filter);
return this;
}
/// <summary>
/// To start processing on the Pipeline
/// </summary>
/// <param name="input">
/// The input object on which filter processing would execute</param>
/// <returns></returns>
public abstract T Process(T input);
}
AgentSelectionPipeline
(ConcretePipeline
)代码
/// <summary>
/// Pipeline which to select final list of applicable agents
/// </summary>
public class AgentSelectionPipeline : Pipeline<IEnumerable<Agent>>
{
/// <summary>
/// Method which executes the filter on a given Input
/// </summary>
/// <param name="input">Input on which filtering
/// needs to happen as implementing in individual filters</param>
/// <returns></returns>
public override IEnumerable<Agent> Process(IEnumerable<Agent> input)
{
foreach (var filter in filters)
{
input = filter.Execute(input);
}
return input;
}
}
AgentAvailabilityFilter
代码(ConcreteFilter
1)
/// <summary>
/// This output of this filter is the list of all available agents
/// </summary>
public class AgentAvailabilityFilter : IFilter<IEnumerable<Agent>>
{
public IEnumerable<Agent> Execute(IEnumerable<Agent> input)
{
if (input == null || input.Count() < 1)
{
return input;
}
return input.Where(agent => agent.PresenceStatus == "Available");
}
}
AgentWorkloadFilter
代码(ConcreteFilter
2)
/// <summary>
/// The output of this filter is the list of Agent for which CurrentWorkload is less than MaxWorklod
/// </summary>
public class AgentWorkloadFilter : IFilter<IEnumerable<Agent>>
{
public IEnumerable<Agent> Execute(IEnumerable<Agent> input)
{
if (input == null || input.Count() < 1)
{
return input;
}
return input.Where(agent => agent.CurrentWorkload < agent.MaxWorkload);
}
}
AgentPresenceUpdateDatetimeFilter
代码(ConcreteFilter
3)
/// <summary>
/// The output of this filter is the list of
/// all those agents who has StatusUpdateDatetime less than 3 minutes
/// </summary>
public class AgentPresenceUpdateDatetimeFilter : IFilter<IEnumerable<Agent>>
{
public IEnumerable<Agent> Execute(IEnumerable<Agent> input)
{
if (input == null || input.Count() < 1)
{
return input;
}
return input.Where(agent =>
(DateTime.UtcNow - agent.StatusUpdateDatetime).TotalMinutes < 3) ;
}
}
Pipeline
和 Filter
,如何使用代码
public class Program
{
static void Main(string[] args)
{
//Get the Agents from repository
var agentsStatus = AgentStatusRepository.Agents;
//Construct the Pipeline object
AgentSelectionPipeline agentStatusPipeline = new AgentSelectionPipeline();
//Register the filters to be executed
agentStatusPipeline.Register(new AgentAvailabilityFilter())
.Register(new AgentWorkloadFilter())
.Register(new AgentPresenceUpdateDatetimeFilter());
//Start pipeline processing
var agentsStatus_1 = agentStatusPipeline.Process(agentsStatus);
}
}