C++ 中使用信号量的多线程和进程间信号。
C++ 中使用信号量进行多线程和进程间信号传递。
引言
信号量和互斥锁被广泛用于进程间和多线程信号传递。本文演示了在 Unix 系统调用之上对创建互斥锁、信号量和线程进行面向对象封装。 类似于 .NET 框架的实现,这里 Semaphore
类也可以用于共享(系统范围)和本地信号量,除了互斥锁之外。
背景
需要具备信号量、互斥锁和线程的基本知识。
使用代码
本文提供了以面向对象方式创建信号量和互斥锁的类。这些接口的灵感来自 .NET 框架中信号量和互斥锁的实现。
以下示例展示了如何创建和使用本地信号量进行多线程信号传递。在程序中,我们创建并启动了两个线程。回调函数中调用了 ::localSemaphore.WaitOne()
。因此,两个线程将调用两次 WaitOne()
。这意味着每个线程都在等待获取信号量中的一个空闲位置。但是,由于初始计数(即最初可用的空闲位置数)设置为 0,WaitOne()
调用会阻止它们继续执行。需要两次 Release()
调用或一次 Release(2)
调用来帮助它们。 getchar()
函数会暂停程序,直到用户按下任意键。当用户按下任意键时,程序调用下一个方法,即 ::localSemaphore.Release(2)
。 这会向正在等待的两个线程发出信号。由于线程现在可以继续执行(它们成功获取了一个空闲位置,即 WaitOne()
调用返回),它们的回调例程将继续执行完毕。程序通过调用 Join()
方法等待每个线程完成。
#include <stdlib.h>
#include <iostream>
#include "Thread.h"
#include "Semaphore.h"
using namespace mom;
//declear the callback method
var_t callbackMethod(var_t arg);
//define the 'local semaphore' instance
Semaphore localSemaphore((unsigned int)0, (unsigned int)2);
//initial-count = 0 (no room available to enter), max-size = 2
int main(int argc, char** argv) {
std::cout <<std::endl
<<std::endl;
//create two thread objects
Thread thread1(callbackMethod);
Thread thread2(callbackMethod);
//lets start the threads
thread1.Start((var_t)"1");
thread2.Start((var_t)"2");
std::cout <<"press any key to signal"
<<std::endl;
//two threads were started and they are waiting to enter the semaphore
getchar();
//now providing two rooms that can be occupied
::localSemaphore.Release(2);
//main thread will wait for the threads to complete their routine
thread1.Join();
std::cout <<"thread1 exited"
<<std::endl;
thread2.Join();
std::cout <<"thread2 exited"
<<std::endl;
std::cout <<"press any key to exit"
<<std::endl;
getchar();
return (EXIT_SUCCESS);
}
var_t callbackMethod(var_t arg) {
std::cout <<"Thread "
<<(const char*)arg
<<" callback_method called .. "
<<"waiting for signal to exit.."
<<std::endl;
//try to occupy a room in the semaphore. block the thread until sucess
::localSemaphore.WaitOne();
}
相同的信号量类也可以用于创建系统范围的共享信号量,多个进程可以使用它来进行信号传递。假设我们有两个进程,它们将共享一个系统范围的信号量来相互发出信号。
让我们定义进程 1
#include <stdlib.h>
#include <iostream>
#include "Semaphore.h"
/*
*
*/
using namespace mom;
int main(int argc, char** argv) {
std::cout <<std::endl;
//lets create a system wide semaphore with sem_id 111
Semaphore systemWideSemaphore((unsigned short)50, 0, 10);
//initial-count: 0 (no room available to occupy)
std::cout <<"Process 1 Started... waiting for for signal from Process 2"
<<std::endl;
//wait util a room is available to occupy on the semaphore (sem_id 100)
systemWideSemaphore.WaitOne();
std::cout <<"Signal received from Process2.. exiting Process1"
<<std::endl;
return (EXIT_SUCCESS);
}
对于进程 2,我们有
#include <stdlib.h>
#include <iostream>
#include "Semaphore.h"
/*
*
*/
using namespace mom;
int main(int argc, char** argv) {
std::cout <<std::endl;
//lets retrieve the system wide semaphore with sem_id 111
Semaphore systemWideSemaphore((unsigned short)50, 0, 10);
/* std::cout <<"Process 2 Started... press any key to signal Process1"
<<std::endl;
getchar();*/
//Process1 is already waiting to occupy a room in the semaphore
//release/provide a room in the semaphore so that Process1 can continue
systemWideSemaphore.Release();
return (EXIT_SUCCESS);
}
进程 1 调用信号量的 WaitOne()
方法来获取一个空闲位置。由于初始计数为 0,它会一直等待,直到有空闲位置释放给它。进程 2 完成这项工作。启动后,进程 2 使用给定的键检索信号量,并调用 Release()
为进程 1 提供一个空闲位置。因此,在进程 2 中调用 systemWideSemaphore.Release()
方法后,进程 1 退出。
源代码
请在此处找到类的源代码
Semaphore.h:
/*
* File: Semaphore.h
* Author: Souvik Chatterjee
* This class is a C# like object oriented wrapper
* for local(in process) and shared(system wide) semaphore
* This file defines the interface for the Semaphore class
*
* Important: I designed the class inspired
* by the .NET framework's implementation of Semaphore
* In .Net framework, apart from Mutex there is a Semaphore class
* which can be used as both local and shared
* semaphores. I am not sure of .Net internal implementation.
* But this C++ class internally uses two completely
* different implementation UNIX semget C system call
* and UNIX pthread C system call for shared and local semaphores
* respectively.
*/
#ifndef _SEMAPHORE_H
#define _SEMAPHORE_H
namespace mom {
class Semaphore {
public:
//This constructor creates(or retrieves existing)
//system wide semaphore with the given sem_id
//using this call 65535 different system wide semaphores can be created
//this shared semaphore is implemented with UNIX semget C system call
Semaphore(unsigned short sem_id, unsigned int initial_count, unsigned int max_count);
//This constructor creates local(in process) semaphore
//in UNIX local semaphore in a MUTEX.
//this local semaphore internally uses UNIX pthread mutext
Semaphore(unsigned int initial_count, unsigned int max_count);
//waits until succeeds to acquire a room in the semaphore object
void WaitOne();
//releases one room among the acquired rooms in the semaphore objet
void Release();
//releases specified number(release_count) rooms
//among the acquired rooms in the semaphore objet
void Release(int release_count);
//destructor
virtual ~Semaphore();
private:
//internal flag to determine if the created semaphore is local or shared
bool _is_local;
//internal handle for the created semaphore instance
void* _semaphore_instance_ptr;
};
}
#endif /* _SEMAPHORE_H */
Semaphore.cpp:
/*
* File: SemaphoreWrapper.cpp
* Author: Souvik Chatterjee
* This file defines the implementation
* of the Semaphore interface declared in Semaphore.h
* Semaphore class uses __shared_semaphore class
* for shared(system wide) semaphore which is basically an object
* oriented wrapper over UNIX semget.
* On the other hand, it uses __local_semaphore class
* for local(in process) semaphore which is basically an object
* oriented wrapper over UNIX pthread mutex.
* Reference: to know understand the UNIX system calls
* used throughout this implementation please refer to
* Open Group (http://www.unix.org/single_unix_specification/) and
* The Linux Programmer's Guide (https://tldp.cn/LDP/lpg/)
*/
#include "Semaphore.h"
#include "Mutex.h"
#include "Thread.h"
#include <iostream>
#include <sys/sem.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/sem.h>
#include <fcntl.h>
#include <errno.h>
namespace mom {
//------------------- System wide shared semaphore
// (class __shared_semaphore)-------------------
class __shared_semaphore {
public:
// creates (or retrieves existing) system wide(shared)
// semaphore with given semaphore id(sem_id)
// initial_count refers to the initially available
// rooms that can be acquired by _wait_one method
// max_count refers to the maximum number of rooms
// that can be acquired by _wait_one_method
__shared_semaphore(unsigned short sem_id,
unsigned int initial_count, unsigned int max_count);
// waits untils succeeds to acquire a room in the semaphore object
void _wait_one();
// releases specified number(release_count) of rooms
// among the acquired rooms in the semaphore object
// if no rooms are currently occupied, it simply ignores
// the call. you an implement it with a custom exception
// thrown
void _release(unsigned int release_count);
private:
// releases specified number(release_count) of rooms
// among the acquired rooms in the semaphore object
// it can not provided rooms exceeding the <max_count>.
// any such attempt will be simply ignored. you can
// implement this with a custom xception thrown
void _release_internal(unsigned int release_count);
//holds the key of the shared semaphore object
key_t _sem_key;
//holds the maximum count for the semaphore object
unsigned int _max_count;
//holds the semaphore id retrieved from the system
int _sem_id;
};
__shared_semaphore::__shared_semaphore(unsigned short sem_id,
unsigned int initial_count, unsigned int max_count) {
//set the key
_sem_key = (key_t)sem_id;
//set max count
_max_count = max_count;
//set wait instruction
//set sem id to not set i.e. -1
_sem_id = -1;
//define the semaphore creation mode
// IPC_CREATE: create a new semaphore if already
// there is no sem_id associated with sem_key
// IPC_EXCL: the semget function will fail if there
// is already sem_id exists associated with the sem_key
// S_IRUSR: owner has the read permission on semaphore object
// S_IWUSR: owner has the write permission on semaphore object
// S_IROTH: read permission on semaphore object for others
// S_IWOTH: write permission on semaphore object for others
mode_t sem_mode = IPC_CREAT | IPC_EXCL | S_IRUSR | S_IWUSR |
S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH;
//lets try to retrieve the semaphore id for
//the existing semaphore(if any) associated with the sem_key
//it will return -1 if there is no semaphore
//available in the system associated with the given key
_sem_id = semget(_sem_key, 0, 0);
if( _sem_id == -1 ) { //okay no semaphore found in the system for the given key
//now lets create a new semaphore with the sem_key and with sem_mode
_sem_id = semget(_sem_key, 1, sem_mode);
//lets assume it failed due to some reason..
//if you use this code, I will recommend to use
//proper object oriented exception handling here
if(_sem_id == -1) {
if (errno == EEXIST) {
perror("IPC error 1: semget");
}
else {
perror("IPC error 2: semget");
}
exit(1);
}
//this process created the semaphore first
//lets provide <initial_count> number of rooms
_release_internal(initial_count);
}
}
void __shared_semaphore::_wait_one() {
sembuf sem_instruction;
sem_instruction.sem_num = 0;
sem_instruction.sem_op = -1;
sem_instruction.sem_flg = SEM_UNDO;
//execute the semop system call on the semaphore
//with the prepared wait instruction
if(semop(_sem_id, &sem_instruction, 1)!=-1) {
//for proper functionality, this line of code is required
//it sets the semaphore's current value
//in the system which other process can feel
//i am not very much sure why it is required.
//I used it after doing a lots of debugging
//please put a comment in the article
//if you have the detailed information for it
semctl( _sem_id, 0, SETVAL, semctl(_sem_id, 0, GETVAL, 0));
}
}
void __shared_semaphore::_release(unsigned int release_count) {
if(semctl(_sem_id, 0, GETNCNT, 0) > 0) {
//if atleast one process is waiting for a resource
_release_internal(release_count);
}
else {
//no process is waiting fo the resource..
//so simply ignored the call.. you should throw some
// custom exception from here
}
}
void __shared_semaphore::_release_internal(unsigned int release_count) {
if(semctl(_sem_id, 0, GETVAL, 0) < _max_count) {
sembuf sem_instruction;
sem_instruction.sem_num = 0;
sem_instruction.sem_op = release_count;
sem_instruction.sem_flg = IPC_NOWAIT|SEM_UNDO;
//execute the semop system call on the semaphore
//with the prepared signal instruction
if(semop(_sem_id, &sem_instruction, 1) != -1) {
//for proper functionality, this line of code is required
//it sets the semaphore's current value
//in the system which other process can feel
//i am not very much sure why it is required.
//I used it after doing a lots of debugging
//please put a comment in the article
//if you have the detailed information for it
semctl( _sem_id, 0, SETVAL, semctl(_sem_id, 0, GETVAL, 0));
}
}
else {
//ignored the call. you should thorw some custo exception
}
}
//----------------------- Local semaphore --------------(class __local_semaphore)--------
class __local_semaphore {
public:
//creates a logical couting semaphore using mutex.
//This semaphore has no scope out side the process
//inwhich its running. so it can be used
//for inter-thread signalling but not interprocess signalling
__local_semaphore(unsigned int initial_count, unsigned int max_count);
void _wait_one();
void _release(unsigned int release_count);
private:
unsigned int _initial_count;
unsigned int _max_count;
Mutex* _wait_handle;
bool _waiting;
};
__local_semaphore::__local_semaphore(unsigned int initial_count,
unsigned int max_count) {
_initial_count = initial_count;
_max_count = max_count;
_wait_handle = new Mutex();
_wait_handle->Lock();
_waiting = false;
}
void __local_semaphore::_wait_one() {
if(_initial_count == _max_count) {
_waiting = true;
_wait_handle->Lock();
}
else if(_initial_count < _max_count) {
_initial_count ++;
}
}
void __local_semaphore::_release(unsigned int release_count) {
_initial_count -= release_count;
if(_waiting) {
_waiting = false;
_wait_handle->Unlock();
}
}
//----------------------- Semphore (wrapper)------------(class Semaphore)----
//create a system wide semaphore with the sem_id provided
Semaphore::Semaphore(unsigned short sem_id,
unsigned int initial_count, unsigned int max_count) {
_is_local = false;
__shared_semaphore* shared_semaphore =
new __shared_semaphore(sem_id, initial_count, max_count);
_semaphore_instance_ptr = (void*)shared_semaphore;
}
//create a local semaphore
Semaphore::Semaphore(unsigned int initial_count, unsigned int max_count) {
_is_local = true;
__local_semaphore* local_semaphore =
new __local_semaphore(initial_count, max_count);
_semaphore_instance_ptr = (void*)local_semaphore;
}
//block the caller until it succeeds to occupy a room
void Semaphore::WaitOne() {
if(_is_local) {
((__local_semaphore*)_semaphore_instance_ptr)->_wait_one();
}
else {
((__shared_semaphore*)_semaphore_instance_ptr)->_wait_one();
}
}
//release <release_count> occupied rooms
void Semaphore::Release(int release_count) {
if(_is_local) {
((__local_semaphore*)_semaphore_instance_ptr)->_release(release_count);
}
else {
((__shared_semaphore*)_semaphore_instance_ptr)->_release(release_count);
}
}
//release an occupied room
void Semaphore::Release() {
Release(1);
}
Semaphore ::~Semaphore() {
if(_is_local) {
__local_semaphore* __semaphore_ptr =
(__local_semaphore*)_semaphore_instance_ptr;
delete __semaphore_ptr;
}
else {
__shared_semaphore* __semaphore_ptr =
(__shared_semaphore*)_semaphore_instance_ptr;
delete __semaphore_ptr;
}
_semaphore_instance_ptr = NULL;
}
//-----------------------------------------------------------------------
}
Mutex.h:
/*
* File: Mutex.h
* Author: Souvik Chatterjee
* This file declares the interface for the Mutex class
* Mutex class is a wrapper over the pthread mutex.
* It provides an C# like object oriented implementation
* of unix pthread mutex
*/
#ifndef _MUTEX_H
#define _MUTEX_H
#include <pthread.h>
class Mutex {
public:
//Mutext::Lock() gains a lock on the MUTEX
void Lock();
//Mutext::Unlock() releases the MUTEX
void Unlock();
private:
//unix pthread instance
pthread_mutex_t _mutex;
};
#endif /* _MUTEX_H */
Mutex.cpp:
/*
* File: Mutex.cpp
* Author: Souvik Chatterjee
* This CPP File Contains the implementation of the header Mutex.h
*/
#include <stdio.h>
#include <ios>
#include <pthread.h>
#include "Mutex.h"
//---------------------------------------
/*
* Mutext::Lock() gains a lock on the MUTEX
*/
void Mutex::Lock() {
//execute pthread mutex lock system call
//with member pthread mutext instance
//pass the reference of the pthread mutex instance
pthread_mutex_lock(&_mutex);
}
//--------------------------------------
/*
* Mutext::Unlock() releases the MUTEX
*/
void Mutex::Unlock() {
//execute pthread mutex unlock system call
//with member pthread mutext instance
//pass the reference of the pthread mutex instance
pthread_mutex_unlock(&_mutex);
}
//--------------------------------------
Thread.h:
/*
* File: Thread.h
* Author: Souvik Chatterjee
*
* Created on August 12, 2009, 2:54 AM
*/
#ifndef _THREAD_H
#define _THREAD_H
#include<pthread.h>
#include <stdio.h>
#include <ios>
namespace mom {
typedef void* var_t;
typedef var_t (*thread_start_t)(var_t);
class Thread {
public:
Thread(thread_start_t thread_start);
void Start(var_t thread_args);
void Join();
static int Sleep(unsigned long millisecs) {
long sec = (long)(millisecs / 1000);
long nsec = (millisecs - (sec*1000))*1000;
timespec delay = {sec, nsec};
int return_val = nanosleep(&delay, (timespec*)NULL);
return return_val;
}
private:
thread_start_t _thread_start;
pthread_t _thread;
};
}
#endif /* _THREAD_H */
Thread.cpp:
/*
* File: Thread.cpp
* Author: Souvik Chatterjee
*
* Created on August 12, 2009, 2:54 AM
*/
#include "Thread.h"
namespace mom {
Thread :: Thread(thread_start_t thread_start) {
_thread_start = thread_start;
}
void Thread :: Start(var_t thread_args) {
pthread_create(&_thread, NULL, _thread_start, thread_args);
}
void Thread :: Join() {
pthread_join(_thread, NULL);
}
}