65.9K
CodeProject 正在变化。 阅读更多。
Home

C++11 并发

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.90/5 (13投票s)

2014年12月31日

CPOL

13分钟阅读

viewsIcon

36920

downloadIcon

732

C++11 中支持并发的各种功能

引言

我写这篇文章是为了介绍C++11作为核心语言和库支持的各种新特性。我之前已经发布了一篇关于C++11智能指针的文章。在接下来的文章中,我还会继续介绍其他一些特性。

在本文中,我们将讨论C++11对并发的支持。我(永远)不如Nicolai Josuttis(作者:“C++11标准库”)和Anthony Williams(作者:“C++11 Concurrency in Action”)那样擅长。因此,我将简要介绍我从这些巨头那里学到的东西,并发是一个无边无际的话题,您需要仔细阅读以上书籍才能了解完整的细节。不过,我希望这篇文章能作为您深入研究并发的入门,也许是一个不错的起点。

背景

由于所有现代处理器都采用多核,所有现代应用程序都试图利用它们,通过并行执行多项任务来提高性能。毫无疑问,你们中的大多数人可能在应用程序中遇到过使用线程进行多任务处理。但您可能已经使用了平台特定的线程功能,如Pthreads等。

使用线程进行编程会变成一场噩梦,因为它会引入许多程序员在串行编程中可能没有遇到过的新问题。例如,在没有协作的情况下损坏多个线程访问的数据、死锁等。显然,这些问题需要得到纠正,并且有几种方法可以做到这一点,我们将在本文中不予介绍。这将是线程同步技术的一部分。

在本文中,我们将了解C++11如何通过将线程作为语言特性的一部分来使程序员的生活更轻松。由于这是语言的一部分,因此无需花费任何精力就可以将应用程序移植到不同平台。

语言提供了高级特性和低级特性。高级特性就像是低级特性的包装器,允许您使用它们而无需过多担心内部细节。程序员也可以自由使用低级特性以获得更好的控制。

高级接口

std::async

对于新手程序员来说,最好从高级接口开始,而不是纠结于低级接口。因此,我们将从高级接口std::async开始,以启动异步任务。std::async是一个接口,它可以使任何可调用对象(可以是函数、lambda表达式、函数对象或所谓的仿函数)在后台作为单独的线程异步运行,*如果可能*。什么叫“如果可能”?是的,我们稍后会讨论

一旦async启动,它就会与一个std::future对象关联。这个对象用于保存任务的输出,通常是您的函数在单独线程中运行的返回值。有些函数可能根本不返回任何内容。但即使在这种情况下,我们也可能需要返回的std::future对象。为什么?等等,我们一步一步来。

让我们看看如何使用std::async的代码。

int Func1( )
{ 
   //To do some processing
   return result; 
} 
 
int Func2( )
{ 
  //To do some processing
   return result; 
} 
 
int main( )
{ 
  int iResult1 = Func1( ); 
  int iResult2 = Func2( ); 
  return 0; 
}

在上面的代码中,Func1首先被调用,然后是Func2。由于执行是串行的,Func2在Func1完成执行之前不会被执行。现在让我们异步调用函数Func1。

int Func1( )
{ 
   //Do some processing
   return result; 
} 
 
int Func2( )
{ 
  //Do some processing
   return result; 
} 
 
int main( )
{
   //Start the Func1 in asynchronous mode and get the associated furture object
   std::future<int> fut = std::async(Func1);
 
   //Get the result from the future
   int iResult1 = fut.get();
 
   int iResult2 = Func2();
 
   return 0;
 }

在上面的代码中,我们使用std::async异步启动Func1。这将尝试在一个新线程中异步执行该函数。与Func1执行关联的future对象被返回并存储在fut中。之后,主(主要线程)继续执行函数调用Func2。这是一个阻塞调用,因为它由执行主函数的同一线程执行。

为什么我们需要future?

Future对象可用于

  • 获取调用函数的返回值。返回值可以是返回值,也可以是异常(如果被调用的函数抛出任何异常)。Future是一个模板类型,它被特化为函数的返回类型。
  • 确保传递的函数被调用。这一点很重要。您还记得我们说“async尝试异步调用函数”吗?如果它没有被调用,future对象就可以用来强制执行,当我们处理函数的结果以继续前进时。

因此,即使您不关心函数的返回值/函数返回void类型,也最好存储future对象。因为我们可能需要这个future对象来强制执行,如果它没有被async启动的话。

Future是一个共享对象,用于在两个不同线程之间共享状态。调用其get()方法可能会导致以下情况之一:

  • 如果async在一个单独的线程中启动了功能并完成了执行,get()可能会返回结果。
  • 如果功能已启动但执行尚未完成,则get()将成为一个阻塞调用,直到功能完成并返回结果。
  • 如果由于某种原因功能尚未开始,get()的行为将像一个同步函数调用,并阻塞直到它完成执行。这一点非常重要,因为如果功能无法启动,而执行正在寻找Func1的结果,这是实现它的唯一方法。
Async尝试并行执行???

是的。Async通过启动一个新线程来尝试并行执行给定的功能。但它可能无法做到,因为

  • 没有更多可用资源来启动新线程
  • 编程环境不支持多线程。

因此,get()确保功能要么异步调用,要么同步调用(在最坏的情况下)。如果async()无法在单独的线程中启动函数,它将推迟调用,并在用户通过调用get()方法显式请求时执行。

您可以使用lambda表达式代替std::async中的函数。

std::future<int> fut = std::async([ ] { } );

启动策略

您可以通过std::async显式指定功能如何启动。std::async除了可调用对象之外,还可以接受启动策略作为参数。

std::launch::async
  • 使用此启动策略,我们可以强制std::async仅在异步模式下启动功能。如果std::async无法做到这一点,系统将抛出system_error异常。
  • 在这种情况下,future对象的析构函数将阻塞执行,直到传递的函数执行完毕并且future准备就绪(这是Herb Sutter提出的)。但我用VS2012尝试时,没有找到这个功能。
std::launch::deferred
  • 此策略允许用户对任务进行惰性评估。使用此启动策略,被调用的函数将无限期推迟,直到对返回的future对象调用get()。传递的函数仅在显式调用get()时才会被调用。创建async后,如果检查返回的future对象的状态,它将是std::future_status::deferred。

处理异常

  • Future对象足够智能,可以携带结果(在执行成功的情况下)以及异常(如果函数抛出任何异常)。如果被调用的函数抛出异常并且没有处理该异常,那么它将通过future传播给调用者。
  • 调用future的get()方法将返回结果或异常。调用者可以准备好处理后台任务抛出的异常。
Wait()

future的get()只能调用一次。之后,future对象将变为void,我的意思是,调用get()之后唯一有效的操作是销毁。对future对象的任何其他调用都将导致未定义行为。

Future提供了另一个名为wait()的接口,它为用户提供了一种等待后台操作完成的方式。这会强制线程启动(如果尚未启动),并等待其完成。此方法可以调用多次。

有两个wait接口。

  • wait_for():用于等待指定的时间段。如果时间段是绝对的,可以使用此接口。
std::future<int> fut = std::async( func );
//I would like to wait for 500 millisecons before proceeding further 
fut.wait_for( std::chrono::millisecond(500) );
  •   wait_until():用于等待直到达到指定的时间点。如果要指定相对时间段,可以使用此接口。

上述函数返回以下之一:

  • future_status::deferred:操作尚未开始。没有调用wait()或get()来强制执行。
  • future_status::timeout:后台任务已启动并在进行中,但指定的时间已过。
  • future_status::ready:任务已完成,您可以准备品尝结果。

可以通过将持续时间设置为0来使用Wait函数进行轮询。这将持续检查future对象的状态,直到它变为ready。

while( fut.wait_for( chrono::second(0) == future_status::ready )
{
   //Do proessinng further
}

    您应该小心上面的循环。因为后台任务可能根本没有启动。在这种情况下,while循环将永远不会结束。因此,请确保任务已启动,然后轮询其结果。

if( fut.wait_for( chrono::seconds(0) != future::status::deferred )
{
 
    while( fut.wait_for( chrono::second(0) == future_status::ready )
    {
      //Do proessing further
    }
}

    另一种确保任务在轮询时启动的方法是通过std::async显式传递async启动策略(std::launch::async)。要小心这种轮询,因为它会浪费宝贵的CPU时间。

任务参数

如何将参数传递给要异步执行的任务?这可以通过两种方式完成。

  •  通过异步调用lambda表达式,然后lambda表达式通过传递所需参数来调用可调用对象。
void Print(int num)
{
   int i = 0;
   for( ; i < num; ++i )
   {
      cput<<"Hello world"<<endl;
   }
}
 
cout<<"Enter the number : ";
int number;
cin>>number;
 
//Passing number by value as the capture clause is by value [=]
std::async( [=] { Print(number); } );
  •  我们可以直接通过async接口传递参数。
void Print(int num)
{
   int i = 0;
   for( ; i < num; ++i )
   {
      cout<<"Hello world"<<endl;
   }
}
 
 
void main()
{
cout<<"Enter the number : ";
int number;
cin>>number;
 
//Passing number by value as the capture clause is by value (=)
std::async(Print,number );
}

在上面的情况下,数字是按值传递的。

按引用传递

首先,让我们看看如何按引用传递参数。我将重写用于按值传递的代码。

在lambda表达式的情况下,捕获子句可以更改为引用。

void Print(int num)
{
  
}
cout<<"Enter the number : ";
int number;
cin>>number;
 
//Passing number by value as the capture clause is by value (=)
std::async( [&] { Print(number); } );

 另一种方法可以使用std::ref方法。

void Print(int &num)
{
 
}
void main()
{
cout<<"Enter the number : ";
int number;
cin>>number;
 
//Passing number by value as the capture clause is by value (=)
std::async(Print,std::ref(number));
}

 您应该非常小心地使用按引用传递方法,因为

  • 您通过“按引用传递”为自己设置陷阱,这会引起数据竞争。除非您有适当的同步机制,否则参数更容易损坏。
  • 您有责任在异步操作完成之前保持变量的有效性。让我们看下面的代码。假设操作Foo是一个耗时任务,并且以下代码是一个真正的问题。
void Foo(int& num )
{
  //A long running task which uses the num
}
void main()
{ 
   cout<<"Enter the number : ";
   int number;
   cin>>number; 
   auto f = std::async( std::launch::async,Foo, std::ref(number) )
}

在上面的代码中,main函数在Foo完成之前结束,并且number超出了作用域。

因此,作为经验法则,在异步操作中,按值传递参数通常比按引用传递参数更好。

低级接口

现在,让我们看看低级接口。

std::thread

低级接口是std::thread,用于以异步方式启动任务。

创建的线程也可以在分离模式下运行。在分离模式下,即使

  • 与之关联的线程对象超出作用域,创建的线程也可以运行。
  • 主线程在创建的线程完成工作之前完成执行。

分离线程的问题在于无法控制它们,用户有责任确保它们不访问任何已超出作用域的对象。因此,同样,按值传递参数总是更好的,这样线程将使用自己的副本而不是依赖共享资源。

在async的情况下,get()方法用于获取指定任务的结果/异常。同样,在std::threads的情况下,我们可以通过调用join方法来确保任务在继续之前完成。join()方法是一个阻塞调用,因此调用线程必须等待任务完成其执行。

线程要么应该在分离模式下运行,要么您必须通过调用join方法等待线程执行完成。否则,创建的线程将继续执行,而与该线程关联的线程对象将超出作用域,程序将被中止。

线程ID

每个线程都被分配了一个唯一的线程ID。它是std::thread::id类型。但是,我们能用获得的线程ID做的很少,比如比较和输出它们。已终止线程的ID可能会分配给任何新创建的线程。

Promise

您可能想知道如何进行线程之间的通信,例如将参数传递给函数以及获取线程执行函数的返回结果。

传递参数:传递参数可以这样做

void AsyncFunc(int x,int y )
{
    //Do the processing
} 
void main()
{
   std::thread t1(AsyncFunc,10,12);
   :
   :
   //Wait for the result of the async operation
   t1.join();
}

返回值:显然,通过按引用传递参数,我们可以获取已执行函数的返回值。如前所述,不建议按引用传递,因为它存在固有的问题,例如在执行完成之前保持传递的变量的有效性等。还有另一种机制可以从线程获取结果,称为std::promise。

在std::async()的情况下,async负责发送结果,而我们负责取回结果,std::future就是为此而设计的。因此,std::future用于在调用者处解包结果,而打包由async()完成。

std::promise用于设置线程函数的输出,然后调用者可以使用该输出。输出可以是结果/异常。

总而言之,std::future用于检索由调用者使用的结果。std::promise用于由创建的std::thread设置结果。因此,std::future没有设置值的接口,而std::promise没有获取值的接口。

让我们看一个小的例子。

std::promise g_prom;
void ThreadFun( )
{
	long Result = 0;
	int i = 0, j = 0;
    try
    {
	   //Do some long processing
	   for( ; i < 100; i++ )
	   {
		  j = 0;
		  for( ; j < 100; j++ )
		  {
		    	Result += ( i * j );
		  }
	   }
    }
    catch(std::exception e)
    {
        g_prom.set_exception(e);
    }
    
	//Set the result to the promise if the long running process is done
	g_prom.set_value( Result );

}

int main()
{
	//Get the future from the promise
	std::future fut = g_prom.get_future();
	
	//Launch the thread
	std::thread t( ThreadFun );

	//Wait for the thread to complete by querying the result from the future
	t.join();

	long value = 0;
	try
	{
		value  = fut.get();			
        cout<<"Value = "<<value<<endl;
	}
	catch(...)
	{
       cout<<"Exception is thrown...."<<endl;
	}
}

在上面的例子中,main使用std::thread异步启动函数ThreadFunc。启动后,main等待ThreadFunc的结果。ThreadFunc使用set_value()在全局promise对象g_prom中设置结果,该对象在ThreadFunc和main中都可以访问。Promise内部有一个共享状态,用于存储结果/异常。在异常的情况下,使用set_exception方法。

在main()中,我们使用future对象获取结果,因为这是获取它的唯一方法。在调用promise的get_future()时,它会使用其共享状态创建一个future对象并返回它。因此,状态在promise和future之间共享。每当在promise对象中设置值/异常时,其共享状态就会就绪。

如果线程仍在执行,而您尝试调用从promise检索到的future的get(),则这是一个阻塞调用。此执行将阻塞,直到共享状态就绪。

因此,我们已经看到了启动并发任务的各种方法。但是并发任务有其自身的问题,如数据竞争、死锁等。C++11还提供了线程同步机制和原子操作。我们将在接下来的文章中讨论它们。

关于代码

我没有添加详尽的代码示例,因为这些概念可以用简短的代码片段来解释。我在讨论每个特性时都添加了代码片段。我只添加了微小的示例。确保使用VS12以上的Visual Studio版本来构建代码。

致谢

我想感谢Nicolai M.Josuttis的著作“C++11标准库”,该书详细介绍了C++11的语言和库特性。我将这本书作为参考来撰写本文。

© . All rights reserved.