【Linux】生产者消费者模型:基于阻塞队列和环形队列 | 单例模式线程池

一.线程的同步与互斥

死锁问题

死锁是指在一组进程中的各个进程均占有不会释放的资源,但因互相申请被其他进程所站用不会释放的资源而处于的一种永久等待状态。

互斥

当多线程并发执行并都需要访问临界资源时,因为每个线程都是不同的执行流,这就有可能导致数据不一致问题,为了避免此问题的发生,就需要对这块临界资源增加一些限制,一次只能有一个线程访问临界资源,即线程互斥

那么如何实现互斥?

答案是加锁!

当线程申请所成功,才能向后执行,否则阻塞等待,该进程访问完成后,释放锁,然后其它线程再来申请锁。

注意,在有锁保护的临界区中,线程仍然可以被切换,并且会连带着锁一起被切换,在这期间,其它的线程依旧不能访问临界区,因为它们没有申请到锁,锁仍然被那个线程拿着。

关于锁的一些函数

同步

上面可能导致一个问题,当在纯互斥的环境下,可能一个线程对锁的竞争能力很强,导致它释放锁后,又马上申请到了锁,这样就一直是这一个线程持有锁,而其它线程无法申请到锁,也就无法访问临界区,造成这些线程的饥饿问题

为了解决这个问题,我们可以让所有阻塞的线程排成一个队,当一个线程释放锁后,就排到队尾,然后由位于队首的线程申请锁,这样就很好地避免了线程的饥饿问题。

在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问
题,叫做同步

如何实现线程同步?

答案是条件变量!

让所有阻塞等待的线程都到条件变量队列下等待,当一个线程释放锁时,就唤醒一个条件变量队列中的线程。

关于条件变量的一些函数


二.生产者消费者模型

什么是生产者消费者模型

生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。

生产者和消费者彼此之间不直接通讯,而通过一个共享容器来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给共享容器,消费者不找生产者要数据,而是直接从共享容器里取,共享容器就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个共享容器就是用来给生产者和消费者解耦的。

在生产者消费者模型中,生产者有两项任务

  1. 获取数据
  2. 生产数据

消费者也有两项任务:

  1. 消费数据
  2. 处理加工数据

这因为生产者和消费者有着这些任务,当生产者在获取数据的时候,消费者可以消费数据,当消费者在处理加个数据的时候,生产者可以生产数据,而获取数据和处理加工数据都是独立进行的,不需要共享容器的参与,这就提高了生产者和消费者的并发度

321原则

我们可以把生产者消费者模型简单记成 “321” 原则

  • 3是指有三种关系:消费者和消费者(互斥关系),生产者和生产者(互斥关系),消费者和生产者(互斥和同步关系)
  • 2是指有两个角色:生产者和消费者
  • 1是指有一个共享容器

优点

  • 解耦
  • 支持并发
  • 支持忙闲不均


三.基于阻塞队列的生产者消费者模型

这个需要用到锁和条件变量。

阻塞队列就是生产者和消费者的共享容器,生产者是从数据到阻塞队列中,消费者从阻塞队列中拿数据。

需要注意的是:

  • 当阻塞队列为空时,消费者不可以从阻塞队列中拿数据,此时消费者进入条件变量队列下等待,当消费了一个数据,就可以唤醒一个生产者生产了
  • 当阻塞队列满时,生产者不可以向阻塞队列中生产数据,此时生产者进入条件变量队列下等待,当生产了一个数据,就可以唤醒一个消费者消费了

源码BlockQueue.hpp

#include <iostream>
#include <vector>
#include <string>
#include <pthread.h>
#include <queue>

using namespace std;


class thread_data
{
public:
    string name;
    pthread_t tid;
};

static const int defaultcap=5;

template<class T>
class BlockQueue
{
public:
    BlockQueue(int cap=defaultcap)
    :_maxcap(cap)
    {   
        //初始化锁和条件变量
        pthread_mutex_init(&mutex,nullptr);
        pthread_cond_init(&_c_cond,nullptr);
        pthread_cond_init(&_p_cond,nullptr);
    }


    void push(const T&data)
    {
        pthread_mutex_lock(&mutex);  //加锁要在判断是否能合法生产前,因为判断也是访问临界资源,并且判断语句不具有原子性
        while(_q.size()>=_maxcap)   //使用while,防止伪唤醒
        {
            pthread_cond_wait(&_p_cond,&mutex);   //当函数成功返回时,锁会被自动释放
        }

        _q.push(data);   //生产一个数据
        pthread_cond_signal(&_c_cond);   //唤醒一个消费者线程
        pthread_mutex_unlock(&mutex);   //解锁
    }

    T pop()
    {
        pthread_mutex_lock(&mutex);
        while(_q.size()<=0)   //同样要先判断是否能合法消费
        {
            pthread_cond_wait(&_c_cond,&mutex);
        }

        T data=_q.front();   //消费一个数据
        _q.pop();
        pthread_cond_signal(&_p_cond);   //唤醒一个生产者线程
        pthread_mutex_unlock(&mutex);   //解锁

        return data;
    }

    ~BlockQueue()
    {
        //销毁锁和条件变量
        pthread_mutex_destroy(&mutex);

        pthread_cond_destroy(&_c_cond);
        pthread_cond_destroy(&_p_cond);
    }
private:
    queue<T> _q;   //阻塞队列
    int _maxcap;   //最大容量

    pthread_mutex_t mutex;  //锁

    pthread_cond_t _c_cond;  //消费者的条件变量
    pthread_cond_t _p_cond;  //生产者的条件变量
};

四.基于环形队列的生产者消费者模型

POSIX信号量

初始化信号量

#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数:
pshared:0表示线程间共享,非零表示进程间共享
value:信号量初始值

 销毁信号量

int sem_destroy(sem_t *sem);

 等待信号量

功能:等待信号量,会将信号量的值减1
int sem_wait(sem_t *sem); //P()

 发布信号量

功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1。
int sem_post(sem_t *sem);//V()

环形队列

  • 当环形队列为空或为满时,生产者和消费者才会相遇

  • 消费者不能超过生产者,生产者不能超过消费者一个圈 

  • 生产者关心的是还剩多少剩余空间,消费者关心的是现有多少数据

  • 生产者和消费者访问下标的行为互斥的,所以需要用到锁

源码RingQueue.hpp

#include <iostream>
#include <vector>
#include <string>
#include <pthread.h>
#include <queue>
#include <semaphore.h>

using namespace std;

static const int defaultcap=5;

template<class T>
class RingQueue
{
public:
    void P(sem_t &sem)
    {
        sem_wait(&sem);
    }

    void V(sem_t &sem)
    {
        sem_post(&sem);
    }

    void Lock(pthread_mutex_t &mutex)
    {
        pthread_mutex_lock(&mutex);
    }

    void Unlock(pthread_mutex_t &mutex)
    {
        pthread_mutex_unlock(&mutex);
    }
public:
    RingQueue(int cap=defaultcap)
    :_cap(cap),_p_step(0),_c_step(0),Ringqueue(cap)
    {
        sem_init(&cdata_sem,0,0);    //初始有0个数据
        sem_init(&pspace_sem,0,_cap);   //初始有所有的空间

        pthread_mutex_init(&c_mutex,nullptr);
        pthread_mutex_init(&p_mutex,nullptr);
    }

    void push(const T&t)
    {
        P(pspace_sem);

        Lock(p_mutex);    //注意P操作在上锁前,因为P操作并不会访问下标,且P操作是原子的
        Ringqueue[_p_step]=t;
        _p_step++;
        _p_step%=_cap;
        Unlock(p_mutex);

        V(cdata_sem);
    }

    void  pop(T*out)
    {
        P(cdata_sem);

        Lock(c_mutex);
        *out=Ringqueue[_c_step];
        _c_step++;
        _c_step%=_cap;
        Unlock(c_mutex);

        V(pspace_sem);
    }

    ~RingQueue()
    {
        sem_destroy(&cdata_sem);
        sem_destroy(&pspace_sem);

        pthread_mutex_destroy(&c_mutex);
        pthread_mutex_destroy(&p_mutex);
    }
private:
    vector<T> Ringqueue;   //使用数组模拟循环队列

    int _cap;  //最大容量

    int _p_step;  //生产者下标
    int _c_step;  //消费者下标

    sem_t cdata_sem;   //现有资源信号量
    sem_t pspace_sem;  //剩余空间信号量

    pthread_mutex_t c_mutex;   //消费者的锁
    pthread_mutex_t p_mutex;   //生产者的锁
};

五.基于单例模式的线程池的简单实现

 其实线程池就是利用的生产者消费者模型。

线程池是什么

一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。

线程池维护着多个线程,等待着监督管理者分配可并发执行的任务

避免了在处理短时间任务时创建与销毁线程的代价

线程池不仅能够保证内核的充分利用,还能防止过分调度。

可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量

线程池的应用场景

  1. 需要大量的线程来完成任务,且完成任务的时间比较短。 WEB服务器完成网页请求这样的任务,使用线程池技术是非常合适的。因为单个任务小,而任务数量巨大,你可以想象一个热门网站的点击次数。 但对于长时间的任务,比如一个Telnet连接请求,线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大多了。
  2. 对性能要求苛刻的应用,比如要求服务器迅速响应客户请求。
  3. 接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。突发性大量客户请求,在没有线程池情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可能使内存到达极限,出现错误。

单例模式

单例模式就是类只能实例化一个对象。

这样可以节省空间。

什么是饿汉模式?什么是懒汉模式?

  • 吃完饭, 立刻洗碗, 这种就是饿汉方式,因为下一顿吃的时候可以立刻拿着碗就能吃饭.
  • 吃完饭, 先把碗放下, 然后下一顿饭用到这个碗了再洗碗, 就是懒汉方式;懒汉方式最核心的思想是 "延时加载",从而能够优化服务器的启动速度

饿汉模式实现单例模式


//单例模式的核心是只能实例化一个对象,所以要禁止生成拷贝构造和赋值重载,以防生成多余的对象
//为了只实例化一个对象,要把构造函数设置成私有,并设置一个专门的函数来获取单例对象
//所以这个函数不可以是普通的类内函数,因为要想调用普通的类内函数,必须要先有对象,但构造函数已经私有了,不可能再创建对象,所以要把这个函数设置成静态的
template <class T>
class Singleton 
{
public:
    static Singleton<T>* GetInstance() //获取单例对象
    {
        return &data;
    }
private:
    Singleton()   //构造函数私有

    Singleton(const Singleton&)=delete;   //禁止生成拷贝构造
    const Singleton& operator=(const Singleton&)=delete;   //禁止生成赋值重载

    ~Singleton()  //析构函数私有
private: 
    static Singleton<T> data;
};

懒汉模式实现单例模式

template <class T>
class Singleton 
{
public:
    static Singleton<T>* GetInstance() //获取单例对象
    {
        if(init==nullptr)
        {
            init=new Singleton<T>();
        }
        return init;
    }
private:
    Singleton()   //构造函数私有

    Singleton(const Singleton&)=delete;   //禁止生成拷贝构造
    const Singleton& operator=(const Singleton&)=delete;   //禁止生成赋值重载

    ~Singleton()  //析构函数私有
private: 
    static Singleton<T>* init;
};

但是其实这样存在一个严重的问题,就是线程不安全,所以要加锁。 

C++类内创建线程须知

C++的类内成员函数是默认传一个参数this指针的,这就不符合线程创建所需要的函数特征,即参数必须是:void*

所以在类内,我们把这个函数声明为 static ,但是声明成 static 就不能访问类内成员了,所以线程创建函数再传一个 this 指针过去

基于懒汉模式的简易线程池源码threadpool.hpp

#include <iostream>
#include <vector>
#include <string>
#include <unistd.h>
#include <pthread.h>
#include <queue>

using namespace std;

static const int defaultnum = 5;

struct ThreadInfo
{
    pthread_t tid;
    string name;
};

template <class T>
class ThreadPool
{
public:
    void Lock(pthread_mutex_t &mutex)
    {
        pthread_mutex_lock(&mutex);
    }

    void Unlock(pthread_mutex_t &mutex)
    {
        pthread_mutex_unlock(&mutex);
    }

    void ThreadSleep(pthread_cond_t &cond, pthread_mutex_t &mutex)
    {
        pthread_cond_wait(&cond, &mutex);
    }

    void Wakeup(pthread_cond_t &cond)
    {
        pthread_cond_signal(&cond);
    }

    bool IsEmpty()
    {
        return _tasks.empty();
    }

public:
    static void *HandlerThread(void *args) // 在类内创建线程要声明成static,因为类内函数默认有一个参数this指针,这就不符合创建线程所需函数的特征(参数必须是void*)
    {
        ThreadPool<T> *tp = static_cast<ThreadPool<T> *>(args);
        while (true)
        {
            tp->Lock(tp->mutex);
            while (tp->IsEmpty())
            {
                tp->ThreadSleep(tp->cond, tp->mutex);
            }

            T t = tp->pop();
            tp->Unlock(tp->mutex);
            cout << "执行任务..." << endl;
        }
    }

    void Start() // 启动线程池,即创建线程
    {
        for (int i = 0; i < _threads.size(); i++)
        {
            _threads[i].name = "thread-" + to_string(i + 1);
            pthread_create(&_threads[i].tid, nullptr, HandlerThread, this); // 声明成static就不能访问类内成员,所传个this指针过去
        }
    }

    T pop()
    {
        T t = _tasks.front();
        _tasks.pop();
        return t;
    }

    void Push(const T &t) // 为线程派发任务
    {
        Lock(mutex);
        _tasks.push(t);
        Wakeup(cond);
        Unlock(mutex);
    }

    //基于懒汉模式的单例模式
    static ThreadPool<T> *GetInstace()  //获取单例对象
    {
        if (_tp == nullptr)   //因为只有第一次实例化对象时,_tp才为nullptr,后面每次获取单例对象都不为空了,此时加锁是没有必要的,为了保证效率,这里采用双层判断,如果_tp为空才加锁并实例化对象
        {
            pthread_mutex_lock(&_lock);
            if (_tp == nullptr)
            {
                _tp = new ThreadPool<T>();
                cout << "first create  Instance " << endl;
            }
            pthread_mutex_unlock(&_lock);
        }

        return _tp;
    }

private:
    ThreadPool(int num = defaultnum)   //构造函数私有
        : _threads(num)
    {
        pthread_mutex_init(&mutex, nullptr);
        pthread_cond_init(&cond, nullptr);
    }

    ThreadPool(const ThreadPool<T> &) = delete;   //禁止生成拷贝构造
    const ThreadPool<T> & operator=(const ThreadPool<T>&) = delete;  //禁止生成赋值重载

    ~ThreadPool()   //析构函数私有
    {
        pthread_mutex_destroy(&mutex);
        pthread_cond_destroy(&cond);
    }

private:
    vector<ThreadInfo> _threads;
    queue<T> _tasks; // 线程的任务

    pthread_mutex_t mutex;
    pthread_cond_t cond;

    static pthread_mutex_t _lock;   //为了实例单例对象时的线程安全

    static ThreadPool<T> *_tp;
};

//类内静态变量一般在类外初始化
template <class T>
ThreadPool<T> *ThreadPool<T>::_tp = nullptr;   

template <class T>
pthread_mutex_t ThreadPool<T>::_lock = PTHREAD_MUTEX_INITIALIZER;

 六.常见的锁

其实上面我们用到的都是互斥锁,还有一些其它常见的锁。

  • 悲观锁:在每次取数据时,总是担心数据会被其他线程修改,所以会在取数据前先加锁(读锁,写锁,行锁等),当其他线程想要访问数据时,被阻塞挂起。
  • 乐观锁:每次取数据时候,总是乐观的认为数据不会被其他线程修改,因此不上锁。但是在更新数据前,会判断其他数据在更新前有没有对数据进行修改。主要采用两种方式:版本号机制和CAS操作。
  • CAS操作:当需要更新数据时,判断当前内存值和之前取得的值是否相等。如果相等则用新值更新。若不等则失败,失败则重试,一般是一个自旋的过程,即不断重试。
  • 自旋锁:把频繁申请访问临界区的锁,称为自旋锁

上文使用的互斥锁。如果申请锁不成功,那就一直被阻塞挂起,直到锁被释放,这种适合,访问临界区时间长的场景

而自旋锁是即使你申请失败了,也不会被阻塞挂起,而是一直访问,这种适合访问临界区时间短的场景

linux中也有一批关于自旋锁的接口:

 用法都和互斥锁的类似。


🐬🤖本篇文章到此就结束了, 若有错误或是建议的话,欢迎小伙伴们指出;🕊️👻

😄😆希望小伙伴们能支持支持博主啊,你们的支持对我很重要哦;🥰🤩

😍😁谢谢你的阅读。😸😼

我的博客即将同步至腾讯云开发者社区,邀请大家一同入驻:https://cloud.tencent.com/developer/support-plan?invite_code=27ajwbqqwluss

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
THE END
分享
二维码
< <上一篇
下一篇>>