在 .NET 中使用任务并行库进行并行操作
描述 Parallel.Invoke 和 Tasks 的使用
引言
本文介绍了如何使用 .NET Framework 中提供的任务并行库 (TPL) 和并行 LINQ (PLINQ) 功能。这些功能是在 .NET Framework 3.5 中引入的,并已更新以包含 .NET Framework 5.0 中更高级的功能。其中最新的功能是 async
和 await
关键字,但本文不介绍这些关键字。相反,它展示了如何利用 Task
类,这是一个用于在 .NET 中执行多线程操作的更高级别类。
背景
顺序或同步执行耗时长的进程或操作会花费时间,并可能影响整个应用程序的性能。因此,不建议以顺序方式在单个线程上运行这些耗时的操作。最好是利用处理器和操作系统的多线程功能。鉴于当今市场上几乎所有处理器都具有多核,编写不利用处理器这些能力的耗时操作代码是浪费的。
在 CPU 的多个核心中并行执行耗时操作的概念称为多线程或并行编程。.NET Framework 从一开始就对多线程提供了广泛的支持。位于 System.Threading
命名空间中的 Thread
类提供了一个非常底层的 API 来创建多个线程并在每个线程中并行执行不同的操作。虽然它在完成工作方面非常有用,但其广泛的 API 使得中级用户的学习曲线相当陡峭。在 .NET Framework 3.5 版本中引入了一个更高级别的类,可以使工作变得更加容易。位于 System.Threading.Tasks
命名空间中的 Task
类提供了一个更简洁的 API,可以非常有效地执行多线程操作。此外,还引入了一个 Parallel
类来以并行方式调用方法。
本文介绍了这些类在执行耗时操作中的使用。源代码附加在文章中,可以单独下载。
Using the Code
我们希望在集合上执行某些 LINQ 操作。它是 LINQ to Entities,并且实体不是从真实数据源填充的,而是用示例硬编码数据填充的,但这也不是本文的重点。
用作数据源的纯 CLR 对象 (POCO) 类如下所示
public class Employee
{
public int empID { get; set; }
public string empName { get; set; }
public double salary { get; set; }
public int deptID { get; set; }
public string deptName { get; set; }
public Emplyee(int empID, string empName, double salary, int deptID, string deptName)
{
this.empID = empID;
this.empName = empName;
this.salary = salary;
this.deptID = deptID;
this.deptName = deptName;
}
}
还需要添加另一个类来存储 group by LINQ 操作的结果
public class GroupResult
{
public int key { get; set; }
public List<emplyee> emps { get; set; }
}
现在,让我们添加将用作 LINQ 操作数据源的集合。
class Program
{
public List<emplyee> employees = null;
public Program()
{
employees = new List<emplyee>() {
new Emplyee(1000, "emp1", 3200, 10, "dept1"),
new Emplyee(1001, "emp2", 4400, 10, "dept1"),
new Emplyee(1002, "emp3", 2800, 10, "dept1"),
new Emplyee(1003, "emp4", 4500, 20, "dept2"),
new Emplyee(1004, "emp5", 5200, 20, "dept2"),
new Emplyee(1005, "emp6", 3800, 20, "dept2"),
new Emplyee(1006, "emp7", 2900, 30, "dept3"),
new Emplyee(1007, "emp8", 4100, 30, "dept3"),
new Emplyee(1008, "emp9", 4400, 30, "dept3"),
new Emplyee(1009, "emp10", 2700, 40, "dept4"),
new Emplyee(10010, "emp11", 3600, 40, "dept4"),
new Emplyee(10011, "emp12", 5100, 40, "dept4")
};
}
}
需要在上面类中追加以下需要异步调用的操作。请将它们追加到上面的类中。
public List<Emplyee> getEmployees()
{
Func<List<Emplyee>> getEmps = () =>
{
return (from emp in employees.AsParallel() select emp).ToList<Emplyee>();
};
var emps = getEmps();
return emps;
}
public Emplyee getEmployee(int empID)
{
Func<int, Emplyee> getEmp = (int empid) =>
{
var foundEmp = from emp in employees.AsParallel()
where emp.empID == empid select emp;
return foundEmp.FirstOrDefault<Emplyee>();
};
var requestedEmp = getEmp(empID);
return requestedEmp;
}
public List<GroupResult> getEmployeeGroupDept()
{
Func<List<GroupResult>> getEmpGroupDept = () =>
{
var grpResult = from emp in employees.AsParallel()
group emp by emp.deptID into groups select new GroupResult
{
key = groups.Key,
emps = groups.ToList<Emplyee>()
};
return grpResult.ToList<GroupResult>();
};
var groupResult = getEmpGroupDept();
return groupResult;
}
public void ParallelAction1()
{
Action parallel_action1 = () =>
{
Thread.Sleep(3000);
Console.WriteLine("Parallel Action 1 invoked");
};
parallel_action1();
}
public string ParallelAction2()
{
Func<string> parallel_action2 = () =>
{
Thread.Sleep(3000);
return "Parallel Action 2 invoked";
};
var result = parallel_action2();
return result;
}
public int ParallelAction3(int no)
{
Func<int> factorial = () =>
{
var fact = 1;
for (int i = 1; i <= no ; i++)
{
Thread.Sleep(2000);
fact = fact * i;
}
return fact;
};
var factResult = factorial();
return factResult;
}
Func<string, string> dispStr = (string str) =>
{
Thread.Sleep(2000);
return string.Format("the entered string is: {0}", str);
};
Func<string, string> toUpperStr = (string str) =>
{
Thread.Sleep(2000);
return string.Format("the upper case of the string is: {0}", str.ToUpper());
};
Func<string, string, string> concatStr = (string str1, string str2) =>
{
Thread.Sleep(2000);
return string.Format("the concatenated string is: {0}", str1 + str2);
};
Func<object, bool> isString = (object str) =>
{
Thread.Sleep(2000);
return str is String;
};
Program
类的 Main
函数包含两个 Action:runSynchronous
和 runAsynchronous
。runSynchronous
同步调用上述函数,而 runAsynchronous
异步调用它们。将 Main
函数追加到 Program
类中,描述如下
static void Main(string[] args)
{
var watch = Stopwatch.StartNew();
var p = new Program();
Action runSynchronous = () =>
{
Console.WriteLine(p.dispStr("Lambda"));
Console.WriteLine(p.toUpperStr("lower case"));
Console.WriteLine(p.concatStr("Hello", "World"));
Console.WriteLine(p.isString(Convert.ToInt32(10)));
var employees = p.getEmployees();
Console.WriteLine("All Employees" + Environment.NewLine);
foreach (var emp in employees)
{
Console.WriteLine("EmpID: " + emp.empID + " EmpName: " +
emp.empName + " Salary: " + emp.salary + " DeptID: " +
emp.deptID + " DeptName: " + emp.deptName + Environment.NewLine);
}
var foundEmp = p.getEmployee(1002);
Console.WriteLine("Found Employee" + Environment.NewLine);
Console.WriteLine("EmpID: " + foundEmp.empID + " EmpName: " +
foundEmp.empName + " Salary: " + foundEmp.salary + " DeptID: " +
foundEmp.deptID + " DeptName: " + foundEmp.deptName + Environment.NewLine);
var grpResult = p.getEmployeeGroupDept();
Console.WriteLine("Group Employees By DeptID" + Environment.NewLine);
foreach (var grp in grpResult)
{
Console.WriteLine("Key: " + grp.key + Environment.NewLine);
foreach (var emp in grp.emps)
{
Console.WriteLine("EmpID: " + emp.empID + " EmpName: " +
emp.empName + " Salary: " + emp.salary + " DeptID: " +
emp.deptID + " DeptName: " + emp.deptName + Environment.NewLine);
}
}
p.ParallelAction1();
var result = p.ParallelAction2();
Console.WriteLine(result);
Console.WriteLine("The factorial is: " + p.ParallelAction3(Convert.ToInt32(4)));
};
Action runAsynchronous = () =>
{
Task t1 = new Task(() =>
{
Console.WriteLine(p.dispStr("Lambda"));
});
Task t2 = new Task(() =>
{
Console.WriteLine(p.toUpperStr("lower case"));
});
Task t3 = new Task(() =>
{
Console.WriteLine(p.concatStr("Hello", "World"));
});
Task<bool> t4 = new Task<bool>(() =>
{
return p.isString(Convert.ToInt32(10));
});
Task t5 = new Task(() =>
{
var employees = p.getEmployees();
Console.WriteLine("All Employees" + Environment.NewLine);
foreach (var emp in employees)
{
Console.WriteLine("EmpID: " + emp.empID + " EmpName: " +
emp.empName + " Salary: " + emp.salary + " DeptID: " +
emp.deptID + " DeptName: " + emp.deptName + Environment.NewLine);
}
});
Task t6 = new Task(() =>
{
var foundEmp = p.getEmployee(1002);
Console.WriteLine("Found Employee" + Environment.NewLine);
Console.WriteLine("EmpID: " + foundEmp.empID + " EmpName: " +
foundEmp.empName + " Salary: " + foundEmp.salary + " DeptID: " +
foundEmp.deptID + " DeptName: " + foundEmp.deptName + Environment.NewLine);
});
Task t7 = new Task(() =>
{
var grpResult = p.getEmployeeGroupDept();
Console.WriteLine("Group Employees By DeptID" + Environment.NewLine);
foreach (var grp in grpResult)
{
Console.WriteLine("Key: " + grp.key + Environment.NewLine);
foreach (var emp in grp.emps)
{
Console.WriteLine("EmpID: " + emp.empID + " EmpName: " +
emp.empName + " Salary: " + emp.salary + " DeptID: " +
emp.deptID + " DeptName: " + emp.deptName + Environment.NewLine);
}
}
});
t1.Start();
t2.Start();
t3.Start();
t4.Start();
t5.Start();
t6.Start();
t7.Start();
Task[] tArray = { t1, t2, t3, t4, t5, t6, t7 };
try
{
Task.WaitAll(tArray);
if (t4.IsCompleted)
{
var result = t4.Result.ToString();
Console.WriteLine(result);
}
Parallel.Invoke(() =>
{
p.ParallelAction1();
},
() =>
{
var result = p.ParallelAction2();
Console.WriteLine(result);
},
() =>
{
Console.WriteLine("The factorial is: " + p.ParallelAction3(Convert.ToInt32(4)));
});
}
catch (Exception e)
{
throw (new NotImplementedException(e.InnerException.ToString().Trim()));
}
};
try
{
//runSynchronous();
runAsynchronous();
}
catch (Exception e)
{
Console.WriteLine(e.InnerException.ToString().Trim());
}
finally
{
watch.Stop();
Console.WriteLine("Execution time: " + watch.ElapsedMilliseconds);
}
Console.ReadKey();
}
使用 runSynchronous
操作执行代码并记下执行时间,然后对 runAsynchronous
操作执行相同的操作。两种方法之间的执行时间会有很大差异。异步的那种速度更快。
在大多数情况下,首选使用 Task
类异步执行操作,因为它具有高级 API 来控制线程的工作。如果需要更底层的精确控制,可以使用 Thread
类。Parallel
类还提供了并行调用操作的方法,但它们不那么受欢迎,因为在 Parallel.Invoke
的情况下,操作系统会自行决定操作的执行顺序,并且该顺序是随机的。因此,在可能发生死锁的情况下不推荐使用它,因为在这种情况下,我们将不得不手动处理它。无论如何,本文只是触及了**.NET 中并行编程**概念的表面,还有更多高级场景和方法来处理它们。
关注点
当我开始学习 .NET 中的并行编程时,我花了些时间才理解这个概念。但现在我知道它值得花时间,因为它在处理文件处理、网络连接和异步 I/O 等耗时操作时非常有用。