65.9K
CodeProject 正在变化。 阅读更多。
Home

C++ 中使用信号量的多线程和进程间信号。

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.38/5 (5投票s)

2009年9月24日

CPOL

2分钟阅读

viewsIcon

75499

downloadIcon

1233

C++ 中使用信号量进行多线程和进程间信号传递。

引言

信号量和互斥锁被广泛用于进程间和多线程信号传递。本文演示了在 Unix 系统调用之上对创建互斥锁、信号量和线程进行面向对象封装。 类似于 .NET 框架的实现,这里 Semaphore 类也可以用于共享(系统范围)和本地信号量,除了互斥锁之外。

背景

需要具备信号量、互斥锁和线程的基本知识。

使用代码

本文提供了以面向对象方式创建信号量和互斥锁的类。这些接口的灵感来自 .NET 框架中信号量和互斥锁的实现。

以下示例展示了如何创建和使用本地信号量进行多线程信号传递。在程序中,我们创建并启动了两个线程。回调函数中调用了 ::localSemaphore.WaitOne()。因此,两个线程将调用两次 WaitOne()。这意味着每个线程都在等待获取信号量中的一个空闲位置。但是,由于初始计数(即最初可用的空闲位置数)设置为 0,WaitOne() 调用会阻止它们继续执行。需要两次 Release() 调用或一次 Release(2) 调用来帮助它们。 getchar() 函数会暂停程序,直到用户按下任意键。当用户按下任意键时,程序调用下一个方法,即 ::localSemaphore.Release(2)。 这会向正在等待的两个线程发出信号。由于线程现在可以继续执行(它们成功获取了一个空闲位置,即 WaitOne() 调用返回),它们的回调例程将继续执行完毕。程序通过调用 Join() 方法等待每个线程完成。

#include <stdlib.h>
#include <iostream>
#include "Thread.h"
#include "Semaphore.h"

using namespace mom;
//declear the callback method
var_t callbackMethod(var_t arg);
//define the 'local semaphore' instance
Semaphore localSemaphore((unsigned int)0, (unsigned int)2);
//initial-count = 0 (no room available to enter), max-size = 2 

int main(int argc, char** argv) {

    std::cout <<std::endl
              <<std::endl;
    //create two thread objects
    Thread thread1(callbackMethod);
    Thread thread2(callbackMethod);

    //lets start the threads
    thread1.Start((var_t)"1");
    thread2.Start((var_t)"2");
    
    std::cout <<"press any key to signal"
              <<std::endl;
    //two threads were started and they are waiting to enter the semaphore 
    getchar();
    
    //now providing two rooms that can be occupied 
    ::localSemaphore.Release(2);

    //main thread will wait for the threads to complete their routine
    thread1.Join();
    std::cout <<"thread1 exited"
              <<std::endl;
    thread2.Join();
    std::cout <<"thread2 exited"
              <<std::endl;

    std::cout <<"press any key to exit"
              <<std::endl;
    getchar();
    return (EXIT_SUCCESS);
}

var_t callbackMethod(var_t arg) {

    std::cout <<"Thread "
              <<(const char*)arg
              <<" callback_method called .. "
              <<"waiting for signal to exit.."
              <<std::endl;

    //try to occupy a room in the semaphore. block the thread until sucess
    ::localSemaphore.WaitOne();
}

相同的信号量类也可以用于创建系统范围的共享信号量,多个进程可以使用它来进行信号传递。假设我们有两个进程,它们将共享一个系统范围的信号量来相互发出信号。

让我们定义进程 1

#include <stdlib.h>
#include <iostream>
#include "Semaphore.h"
/*
 * 
 */
using namespace mom;
int main(int argc, char** argv) {

    std::cout <<std::endl;
    //lets create a system wide semaphore with sem_id 111
    Semaphore systemWideSemaphore((unsigned short)50, 0, 10);
    //initial-count: 0 (no room available to occupy)

    std::cout <<"Process 1 Started... waiting for for signal from Process 2"
              <<std::endl;
    //wait util a room is available to occupy on the semaphore (sem_id 100)
    systemWideSemaphore.WaitOne();    
    std::cout <<"Signal received from Process2.. exiting Process1"
              <<std::endl;

    return (EXIT_SUCCESS);
}

对于进程 2,我们有

#include <stdlib.h>
#include <iostream>
#include "Semaphore.h"
/*
 * 
 */
using namespace mom;
int main(int argc, char** argv) {

    std::cout <<std::endl;
    //lets retrieve the system wide semaphore with sem_id 111
    Semaphore systemWideSemaphore((unsigned short)50, 0, 10);
   /* std::cout <<"Process 2 Started... press any key to signal Process1"
              <<std::endl;
    getchar();*/
    //Process1 is already waiting to occupy a room in the semaphore
    //release/provide a room in the semaphore so that Process1 can continue
    systemWideSemaphore.Release();
    return (EXIT_SUCCESS);
}

进程 1 调用信号量的 WaitOne() 方法来获取一个空闲位置。由于初始计数为 0,它会一直等待,直到有空闲位置释放给它。进程 2 完成这项工作。启动后,进程 2 使用给定的键检索信号量,并调用 Release() 为进程 1 提供一个空闲位置。因此,在进程 2 中调用 systemWideSemaphore.Release() 方法后,进程 1 退出。

源代码

请在此处找到类的源代码

Semaphore.h:

/* 
 * File:   Semaphore.h
 * Author: Souvik Chatterjee
 * This class is a C# like object oriented wrapper
 * for local(in process) and shared(system wide) semaphore
 * This file defines the interface for the Semaphore class
 *
 * Important: I designed the class inspired
 * by the .NET framework's implementation of Semaphore
 * In .Net framework, apart from Mutex there is a Semaphore class
 * which can be used as both local and shared 
 * semaphores. I am not sure of .Net internal implementation.
 * But this C++ class internally uses two completely 
 * different implementation UNIX semget C system call
 * and UNIX pthread C system call for shared and local semaphores
 * respectively.
 */

#ifndef _SEMAPHORE_H
#define    _SEMAPHORE_H


namespace mom {    
    class Semaphore {
    public:
        //This constructor creates(or retrieves existing)
        //system wide semaphore with the given sem_id
        //using this call 65535 different system wide semaphores can be created
        //this shared semaphore is implemented with UNIX semget C system call
        Semaphore(unsigned short sem_id, unsigned int initial_count, unsigned int max_count);

        //This constructor creates local(in process) semaphore
        //in UNIX local semaphore in a MUTEX. 
        //this local semaphore internally uses UNIX pthread mutext
        Semaphore(unsigned int initial_count, unsigned int max_count);
        
        //waits until succeeds to acquire a room in the semaphore object
        void WaitOne();
        //releases one room among the acquired rooms in the semaphore objet
        void Release();
        //releases specified number(release_count) rooms 
        //among the acquired rooms in the semaphore objet
        void Release(int release_count);
        //destructor
        virtual ~Semaphore();

    private:        
        //internal flag to determine if the created semaphore is local or shared
        bool _is_local;
        //internal handle for the created semaphore instance
        void* _semaphore_instance_ptr;
    };
}

#endif    /* _SEMAPHORE_H */

Semaphore.cpp:

/* 
 * File:   SemaphoreWrapper.cpp
 * Author: Souvik Chatterjee
 * This file defines the implementation
 * of the Semaphore interface declared in Semaphore.h
 * Semaphore class uses __shared_semaphore class
 * for shared(system wide) semaphore which is basically an object
 * oriented wrapper over UNIX semget.
 * On the other hand, it uses __local_semaphore class
 * for local(in process) semaphore which is basically an object 
 * oriented wrapper over UNIX pthread mutex.
 * Reference: to know understand the UNIX system calls
 * used throughout this implementation please refer to 
 * Open Group (http://www.unix.org/single_unix_specification/) and 
 * The Linux Programmer's Guide (https://tldp.cn/LDP/lpg/)
 */

#include "Semaphore.h"
#include "Mutex.h"
#include "Thread.h"
#include <iostream>
#include <sys/sem.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/sem.h>
#include <fcntl.h>
#include <errno.h>

namespace mom {
    //------------------- System wide shared semaphore 
    //                   (class __shared_semaphore)-------------------
    class __shared_semaphore {
    public:
        // creates (or retrieves existing) system wide(shared)
        // semaphore with given semaphore id(sem_id)
        // initial_count refers to the initially available
        // rooms that can be acquired by _wait_one method
        // max_count refers to the maximum number of rooms
        // that can be acquired by _wait_one_method
        __shared_semaphore(unsigned short sem_id, 
                 unsigned int initial_count, unsigned int max_count);
        
        // waits untils succeeds to acquire a room in the semaphore object
        void _wait_one();
        // releases specified number(release_count) of rooms
        // among the acquired rooms in the semaphore object
        // if no rooms are currently occupied, it simply ignores
        // the call. you an implement it with a custom exception 
        // thrown
        void _release(unsigned int release_count);

    private:
        // releases specified number(release_count) of rooms
        // among the acquired rooms in the semaphore object
        // it can not provided rooms exceeding the <max_count>.
        // any such attempt will be simply ignored. you can 
        // implement this with a custom xception thrown
        void _release_internal(unsigned int release_count);
        //holds the key of the shared semaphore object
        key_t _sem_key;
        //holds the maximum count for the semaphore object        
        unsigned int _max_count;
        //holds the semaphore id retrieved from the system
        int _sem_id;              
    };

    __shared_semaphore::__shared_semaphore(unsigned short sem_id, 
                   unsigned int initial_count, unsigned int max_count) {        
        //set the key
        _sem_key = (key_t)sem_id;
        //set max count
        _max_count = max_count;
        //set wait instruction
                
        //set sem id to not set i.e. -1
        _sem_id = -1;
        
        //define the semaphore creation mode
        // IPC_CREATE: create a new semaphore if already
        // there is no sem_id associated with sem_key
        // IPC_EXCL: the semget function will fail if there
        // is already sem_id exists associated with the sem_key
        // S_IRUSR: owner has the read permission on semaphore object
        // S_IWUSR: owner has the write permission on semaphore object
        // S_IROTH: read permission on semaphore object for others
        // S_IWOTH: write permission on semaphore object for others
        mode_t sem_mode = IPC_CREAT | IPC_EXCL | S_IRUSR | S_IWUSR | 
                          S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH;

        //lets try to retrieve the semaphore id for
        //the existing semaphore(if any) associated with the sem_key
        //it will return -1 if there is no semaphore 
        //available in the system associated with the given key
        _sem_id = semget(_sem_key, 0, 0);

        if( _sem_id == -1 ) { //okay no semaphore found in the system for the given key
            //now lets create a new semaphore with the sem_key and with sem_mode
            _sem_id = semget(_sem_key, 1, sem_mode); 
            //lets assume it failed due to some reason..
            //if you use this code, I will recommend to use 
            //proper object oriented exception handling here
            if(_sem_id == -1) {
                if (errno == EEXIST) {
                    perror("IPC error 1: semget");
                }
                else {
                    perror("IPC error 2: semget");
                }
                exit(1);
            }
            //this process created the semaphore first
            //lets provide <initial_count> number of rooms
            _release_internal(initial_count);
        }
    }

    void __shared_semaphore::_wait_one() {
        
        sembuf sem_instruction;
        sem_instruction.sem_num = 0;
        sem_instruction.sem_op = -1;
        sem_instruction.sem_flg = SEM_UNDO;
        //execute the semop system call on the semaphore 
        //with the prepared wait instruction
        if(semop(_sem_id, &sem_instruction, 1)!=-1) {
            //for proper functionality, this line of code is required
            //it sets the semaphore's current value 
            //in the system which other process can feel
            //i am not very much sure why it is required. 
            //I used it after doing a lots of debugging
            //please put a comment in the article 
            //if you have the detailed information for it
            semctl( _sem_id, 0, SETVAL, semctl(_sem_id, 0, GETVAL, 0));
        }
    }

    void __shared_semaphore::_release(unsigned int release_count) {
        
        if(semctl(_sem_id, 0, GETNCNT, 0) > 0) {
        //if atleast one process is waiting for a resource
            _release_internal(release_count);
        }
        else {
            //no process is waiting fo the resource.. 
            //so simply ignored the call.. you should throw some
            // custom exception from here
        }
    }

    void __shared_semaphore::_release_internal(unsigned int release_count) {
        
        if(semctl(_sem_id, 0, GETVAL, 0) < _max_count) {
            sembuf sem_instruction;
            sem_instruction.sem_num = 0;
            sem_instruction.sem_op = release_count;
            sem_instruction.sem_flg = IPC_NOWAIT|SEM_UNDO;
            //execute the semop system call on the semaphore 
            //with the prepared signal instruction
            if(semop(_sem_id, &sem_instruction, 1) != -1) {
                //for proper functionality, this line of code is required
                //it sets the semaphore's current value 
                //in the system which other process can feel
                //i am not very much sure why it is required. 
                //I used it after doing a lots of debugging
                //please put a comment in the article 
                //if you have the detailed information for it
                semctl( _sem_id, 0, SETVAL, semctl(_sem_id, 0, GETVAL, 0));            
            }
        }
        else {
            //ignored the call. you should thorw some custo exception
        }
    }
    
//----------------------- Local semaphore --------------(class __local_semaphore)--------
    class __local_semaphore {
    public:
        //creates a logical couting semaphore using mutex.
        //This semaphore has no scope out side the process
        //inwhich its running. so it can be used 
        //for inter-thread signalling but not interprocess signalling
        __local_semaphore(unsigned int initial_count, unsigned int max_count);
        void _wait_one();
        void _release(unsigned int release_count);

    private:
        unsigned int _initial_count;
        unsigned int _max_count;
        Mutex* _wait_handle;
        bool _waiting;
    };

    __local_semaphore::__local_semaphore(unsigned int initial_count, 
                         unsigned int max_count) {        
        _initial_count = initial_count;
        _max_count = max_count;
        _wait_handle = new Mutex();
        _wait_handle->Lock();
        _waiting = false;
    }

    void __local_semaphore::_wait_one() {
        if(_initial_count == _max_count) {
            _waiting = true;
            _wait_handle->Lock();
        }
        else if(_initial_count < _max_count) {
            _initial_count ++;
        }
    }
    void __local_semaphore::_release(unsigned int release_count) {
        _initial_count -= release_count;
        if(_waiting) {
            _waiting = false;
            _wait_handle->Unlock();
        }
    }
    
    
    //----------------------- Semphore (wrapper)------------(class Semaphore)----
   //create a system wide semaphore with the sem_id provided
    Semaphore::Semaphore(unsigned short sem_id, 
               unsigned int initial_count, unsigned int max_count) {
        _is_local = false;
        __shared_semaphore* shared_semaphore = 
           new __shared_semaphore(sem_id, initial_count, max_count);
        _semaphore_instance_ptr = (void*)shared_semaphore;
    }

    //create a local semaphore
    Semaphore::Semaphore(unsigned int initial_count, unsigned int max_count) {
        _is_local = true;
        __local_semaphore* local_semaphore = 
           new __local_semaphore(initial_count, max_count);
        _semaphore_instance_ptr = (void*)local_semaphore;
    }
    
    //block the caller until it succeeds to occupy a room
    void Semaphore::WaitOne() {
        if(_is_local) {
            ((__local_semaphore*)_semaphore_instance_ptr)->_wait_one();
        }
        else {
            ((__shared_semaphore*)_semaphore_instance_ptr)->_wait_one();
        }
    }

    //release <release_count> occupied rooms
    void Semaphore::Release(int release_count) {
         if(_is_local) {
            ((__local_semaphore*)_semaphore_instance_ptr)->_release(release_count);
        }
        else {
            ((__shared_semaphore*)_semaphore_instance_ptr)->_release(release_count);
        }
    }

    //release an occupied room
    void Semaphore::Release() {
        Release(1);
    }
    
    Semaphore ::~Semaphore() {
        if(_is_local) {
            __local_semaphore* __semaphore_ptr = 
                     (__local_semaphore*)_semaphore_instance_ptr;
            delete __semaphore_ptr;
        }
        else {
            __shared_semaphore* __semaphore_ptr = 
                   (__shared_semaphore*)_semaphore_instance_ptr;
            delete __semaphore_ptr;
        }
        _semaphore_instance_ptr = NULL;
    }
//-----------------------------------------------------------------------
}

Mutex.h:

/* 
 * File:   Mutex.h
 * Author: Souvik Chatterjee
 * This file declares the interface for the Mutex class
 * Mutex class is a wrapper over the pthread mutex.
 * It provides an C# like object oriented implementation 
 * of unix pthread mutex
 */

#ifndef _MUTEX_H
#define    _MUTEX_H
#include <pthread.h>

class Mutex {

public:
    //Mutext::Lock() gains a lock on the MUTEX 
    void Lock();

    //Mutext::Unlock() releases the MUTEX 
    void Unlock();

private:
    //unix pthread instance
    pthread_mutex_t _mutex;
};

#endif    /* _MUTEX_H */

Mutex.cpp:

/* 
 * File:   Mutex.cpp
 * Author: Souvik Chatterjee 
 * This CPP File Contains the implementation of the header Mutex.h
*/

#include <stdio.h>
#include <ios>
#include <pthread.h>

#include "Mutex.h"
//---------------------------------------
/*
 * Mutext::Lock() gains a lock on the MUTEX 
*/
void Mutex::Lock() {
    //execute pthread mutex lock system call
    //with member pthread mutext instance
    //pass the reference of the pthread mutex instance
    pthread_mutex_lock(&_mutex);
}

//--------------------------------------
/*
 * Mutext::Unlock() releases the MUTEX
*/
void Mutex::Unlock() {
    //execute pthread mutex unlock system call 
    //with member pthread mutext instance
    //pass the reference of the pthread mutex instance
    pthread_mutex_unlock(&_mutex);
}
//--------------------------------------

Thread.h:

/* 
 * File:   Thread.h
 * Author: Souvik Chatterjee
 *
 * Created on August 12, 2009, 2:54 AM
 */

#ifndef _THREAD_H
#define    _THREAD_H

#include<pthread.h>
#include <stdio.h>
#include <ios>

namespace mom {
    typedef void* var_t;
    typedef var_t (*thread_start_t)(var_t);
    
    class Thread {    
    public:
        Thread(thread_start_t thread_start);
        void Start(var_t thread_args);
        void Join();
        static int Sleep(unsigned long millisecs) {
            long sec = (long)(millisecs / 1000);
            long nsec = (millisecs - (sec*1000))*1000;
            timespec delay = {sec, nsec};
            int return_val = nanosleep(&delay, (timespec*)NULL);
            return return_val;
        }

    private:
        thread_start_t _thread_start;
        pthread_t _thread;
    };
}
#endif    /* _THREAD_H */

Thread.cpp:

/* 
 * File:   Thread.cpp
 * Author: Souvik Chatterjee
 * 
 * Created on August 12, 2009, 2:54 AM
 */

#include "Thread.h"

namespace mom {
    Thread :: Thread(thread_start_t thread_start) {
        _thread_start = thread_start;
    }
    void Thread :: Start(var_t thread_args) {
        pthread_create(&_thread, NULL, _thread_start, thread_args);
    }
    void Thread :: Join() {
        pthread_join(_thread, NULL);
    }
}
© . All rights reserved.