65.9K
CodeProject 正在变化。 阅读更多。
Home

使用 C# 的管道和过滤器模式

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.91/5 (8投票s)

2016年4月20日

CPOL

1分钟阅读

viewsIcon

51919

使用 C# 实现管道和过滤器模式。

引言

管道和过滤器模式在需要对对象执行一系列过滤(处理)以将其转换为可用状态的情况下非常有用且整洁,如下面的图片所示。

本文中使用的代码是管道和过滤器模式的完整通用实现。

背景

理解消息传递模式对于理解企业集成模式至关重要。

Using the Code

这里的代码考虑了存在一个对象仓库 AgentStatusRepository,它有一个 static 属性 Agents 来返回 List<Agent> AgentStatus 列表是管道的输入,管道需要对其进行各种过滤。

AgentStatus 类(步骤 1)

 public class AgentStatus
    {
        public string AgentId { get; set; }
        public string PresenceStatus { get; set; }
        public int CurrentWorkload { get; set; }
        public int MaxWorkload { get; set; }
        public DateTime StatusUpdateDatetime { get; set; }
    }

PresenceStatus 的可能值

  • 可用
  • 忙碌
  • 研究
  • 离线

以下是需要对从 AgentStatusRepository 获取的 List<AgentStatus> 对象(上图中的 Object1)执行的过滤列表

  • AgentAvailabilityFilter (上图中的 Filter1)- 过滤 List<Agent>,其中 Agent.PresenceStatus == 'Available'
  • AgentWorkloadFilter (上图中的 Filter2)- 过滤从 Filter1 过滤后的列表,其中 Agent.CurrentWorkload < Agent.MaxWorkload
  • AgentPresenceUpdateDatetimeFilter (上图中的 Filter3)- 过滤从 Filter2 过滤后的列表,其中 (DateTime.UtcNow - Agent.StatusUpdateDatetime).TotalMinutes < 3

AgentStatusRepository 代码

 public class AgentStatusRepository
    {
        public static List<Agent> Agents
        {
            get
            {
                return new List<Agent>
                {
                    new Agent {AgentId = "agent_id_1", 
                    PresenceStatus = "Available", CurrentWorkload = 1, 
                    MaxWorkload = 2, StatusUpdateDatetime = DateTime.UtcNow },
                    new Agent {AgentId = "agent_id_2", 
                    PresenceStatus = "Busy", CurrentWorkload = 2, 
                    MaxWorkload = 2, StatusUpdateDatetime = DateTime.UtcNow },
                    new Agent {AgentId = "agent_id_3", 
                    PresenceStatus = "Available", CurrentWorkload = 2, 
                    MaxWorkload = 3, StatusUpdateDatetime = DateTime.UtcNow.AddMinutes (-5) },
                    new Agent {AgentId = "agent_id_4", 
                    PresenceStatus = "Research", CurrentWorkload = 2, 
                    MaxWorkload = 3, StatusUpdateDatetime = DateTime.UtcNow},
                    new Agent {AgentId = "agent_id_5", 
                    PresenceStatus = "Available", CurrentWorkload = 2, 
                    MaxWorkload = 3, StatusUpdateDatetime = DateTime.UtcNow },
                    new Agent {AgentId = "agent_id_6", 
                    PresenceStatus = "Available", CurrentWorkload = 5, 
                    MaxWorkload = 5, StatusUpdateDatetime = DateTime.UtcNow}
                };
            }
        }
    }

管道和过滤器类图

IFilter 代码

/// <summary>
    /// A filter to be registered in the message processing pipeline
    /// </summary>
    /// <typeparam name="T"></typeparam>
    public interface IFilter<T>
    {
        /// <summary>
        /// Filter implementing this method would perform processing on the input type T
        /// </summary>
        /// <param name="input">The input to be executed by the filter</param>
        /// <returns></returns>
        T Execute(T input);       
    }

Pipeline 代码

 /// <summary>
    /// An abstract Pipeline with a list of filters and abstract Process method
    /// </summary>
    /// <typeparam name="T"></typeparam>
    public abstract class Pipeline<T>
    {
        /// <summary>
        /// List of filters in the pipeline
        /// </summary>
        protected readonly List<IFilter<T>> filters = new List<IFilter<T>>();

        /// <summary>
        /// To Register filter in the pipeline
        /// </summary>
        /// <param name="filter">A filter object implementing IFilter interface</param>
        /// <returns></returns>
        public Pipeline<T> Register(IFilter<T> filter)
        {
            filters.Add(filter);
            return this;
        }

        /// <summary>
        /// To start processing on the Pipeline
        /// </summary>
        /// <param name="input">
        /// The input object on which filter processing would execute</param>
        /// <returns></returns>
        public abstract T Process(T input);       
    }

AgentSelectionPipelineConcretePipeline)代码

    /// <summary>
    /// Pipeline which to select final list of applicable agents
    /// </summary>
    public class AgentSelectionPipeline : Pipeline<IEnumerable<Agent>>
    {
        /// <summary>
        /// Method which executes the filter on a given Input
        /// </summary>
        /// <param name="input">Input on which filtering 
        /// needs to happen as implementing in individual filters</param>
        /// <returns></returns>
        public override IEnumerable<Agent> Process(IEnumerable<Agent> input)
        {
            foreach (var filter in filters)
            {
                input = filter.Execute(input);
            }

            return input;
        }       
    }

AgentAvailabilityFilter 代码(ConcreteFilter 1)

  /// <summary>
    /// This output of this filter is the list of all available agents
    /// </summary>
    public class AgentAvailabilityFilter : IFilter<IEnumerable<Agent>>
    {
        public IEnumerable<Agent> Execute(IEnumerable<Agent> input)
        {
            if (input == null || input.Count() < 1)
            {
                return input;
            }             
            return input.Where(agent => agent.PresenceStatus == "Available");
        }
    }

AgentWorkloadFilter 代码(ConcreteFilter 2)

    /// <summary>
    /// The output of this filter is the list of Agent for which CurrentWorkload is less than MaxWorklod
    /// </summary>
    public class AgentWorkloadFilter : IFilter<IEnumerable<Agent>>
    {
        public IEnumerable<Agent> Execute(IEnumerable<Agent> input)
        {
            if (input == null || input.Count() < 1)
            {
                return input;
            }

            return input.Where(agent => agent.CurrentWorkload < agent.MaxWorkload);
        }
    }

AgentPresenceUpdateDatetimeFilter 代码(ConcreteFilter 3)

    /// <summary>
    /// The output of this filter is the list of 
    /// all those agents who has StatusUpdateDatetime less than 3 minutes
    /// </summary>
    public class AgentPresenceUpdateDatetimeFilter : IFilter<IEnumerable<Agent>>
    {
        public IEnumerable<Agent> Execute(IEnumerable<Agent> input)
        {
            if (input == null || input.Count() < 1)
            {
                return input;
            }

            return input.Where(agent => 
            (DateTime.UtcNow - agent.StatusUpdateDatetime).TotalMinutes < 3) ;
        }
    }

PipelineFilter,如何使用代码

    public class Program
    {
        static void Main(string[] args)
        {
            //Get the Agents from repository
            var agentsStatus = AgentStatusRepository.Agents;

            //Construct the Pipeline object
            AgentSelectionPipeline agentStatusPipeline = new AgentSelectionPipeline();
            
            //Register the filters to be executed
             agentStatusPipeline.Register(new AgentAvailabilityFilter())
                .Register(new AgentWorkloadFilter())
                .Register(new AgentPresenceUpdateDatetimeFilter());

            //Start pipeline processing
            var agentsStatus_1 = agentStatusPipeline.Process(agentsStatus);            
        }
    }
© . All rights reserved.