Linux 多线程

一.线程概念

线程是进程中的一条执行流程,是CPU调度运行程序的基本单元,linux线程的调度运行通过pcb来实现,因此linux下线程实际上就是一个pcb,而一个进程中又可以有多个pcb,并且共享进程大部分资源,因此线程也被称作轻量级进程
在这里插入图片描述

1.线程的优点

1.创建一个新线程的代价要比创建一个新进程小得多
2.与进程之间的切换相比,线程之间的切换需要操作系统做的工作要少很多
3.线程占用的资源要比进程少很多
4.能充分利用多处理器的可并行数量
5.在等待慢速I/O操作结束的同时,程序可执行其他的计算任务
6.计算密集型应用,为了能在多处理器系统上运行,将计算分解到多个线程中实现
7.I/O密集型应用,为了提高性能,将I/O操作重叠。线程可以同时等待不同的I/O操作。

2.线程的缺点

1.性能损失:
一个很少被外部事件阻塞的计算密集型线程往往无法与共它线程共享同一个处理器。如果计算密集型线程的数量比可用的处理器多,那么可能会有较大的性能损失,这里的性能损失指的是增加了额外的同步和调度开销,而可用的资源不变。
2.健壮性降低:
编写多线程需要更全面更深入的考虑,在一个多线程程序里,因时间分配上的细微偏差或者因共享了不该共享的变量而造成不良影响的可能性是很大的,换句话说线程之间是缺乏保护的。
3.缺乏访问控制:
进程是访问控制的基本粒度,在一个线程中调用某些OS函数会对整个进程造成影响。
4.编程难度高:
编写与调试一个多线程程序比单线程程序困难得多

3.线程之间的独有与共享:

独有:栈,上下文数据,信号屏蔽字,errno,标识符......
共享:虚拟地址空间,IO信息,信号处理方式,工作路径...

4.多进程与多线程

共同特点:分摊压力,提高处理性能充分利用资源
多线程:线程间通信更加灵活,线程的创建与销毁成本更低,线程间的调度切换成本更低。
多进程:稳定健壮性强。

二.线程控制

线程的接口都是库函数(操作系统并没有提供线程操作接口),因此在上层封装实现了一套线程库,实现在用户态创建一个线程,但是linux下程序调度只能通过pcb来完成,因此也在内核创建一个新的pcb出来,这时候,我们上层创建的线程叫做用户态线程,而把内核的pcb叫做轻量级进程。
我们在链接时在后面加上 -lpthread

1.线程的创建

int pthread_create(pthread_t *pidpthread_attr_t attr,void(*thr_entry(void *),void *arg);
pthread_t * tid : 使传入的tid实参获取线程id. 后续通过这个tid操作线程.(线程的操作句柄)

pthread_attr_t * atrr : 用于设置线程属性,通常置NULL.

void *( *thr_entry)(void *) : 线程入口函数,线程要进行的函数.

void* arg: 传递给线程入口函数的参数(若要传多个参数, 可以组成一个结构体把结构体传入.)
返回值:成功返回0;失败返回一个非0值
源码实现:
在这里插入图片描述
在这里插入图片描述
这时候我们就发现在5552下面多了一个5553,这个5553就是我们新创建出来的一个进程
ps -efL查看进程

2.线程的终止

如何退出一个线程:
在线程入口函数中return,(main函数return退出的是进程
在任意位置调用void pthread_exit(void * retval)
在任意位置调用int pthread_cancel(pthread_t tid);
实现
在这里插入图片描述

在这里插入图片描述

3.线程的等待

等待一个指定进程的退出,获取这个线程的退出返回值,释放那个资源,默认情况下,如果一个线程退出,如果不等待也会造成资源泄露
int pthread_join(pthread_t tid,voidretval);**
tid:表示要等待哪个线程退出
retval:是一个void*空间的地址,用于接收线程返回值
实现:
在这里插入图片描述
在这里插入图片描述

默认情况下线程退出,为了保存自己的退出返回值,因此线程占用的资源在退出后也不会被完全释放,需要被其他线程等待
线程等待不仅仅是因为为了释放资源,避免资源泄露而等待,还有一种,必须等到某个或所有线程退出后再继续运行,还有一种,等到某个或所有线程退出后在继续运行
但是,当我们不关心一个线程的返回值的时候,又不需要等待线程退出才能往下运行,这时候等待会导致性能降低,在这种场景之下,等待就不合适了,但是不等待又会资源泄露
基于这个需求就有了线程分离

4.线程的分离

目的:将线程的分离属性设置为detach状态
在设置线程的时候,线程有很多属性,其中有一个叫做分离属性,分离属性默认值——JOINABLE,表示线程退出之后不会自动释放资源,需要等待。
如果将线程的分离属性设置为其他值-DETACH,这时候则线程退出后将不需要被等待,而是直接释放资源
因为线程一旦设置的了分离属性,则退出后自动释放资源,则等待将毫无意义,所以设置了分离属性的线程不能被等待
int pthread_detach(pthread_t tid);
实现:
在这里插入图片描述
一个进程,既不关心返回值,也不想等待,这种线程就适合被分离
等待与分离
等待:线程有一个默认属性,默认值是joinable,表示线程退出后不会自动释放资源,需要被等待,获取返回值之后释放资源。
分离:将线程的分离属性设置为detach,线程退出之后自动释放资源。
应用场景:创建线程以后,根据是否需要等待的需求而定,而实际使用中只会使用其中一个。

三.线程安全

概念

多线程之间对同一个临界资源的访问操作是安全的
多线程同时修改一个临界资源有可能会造成数据二义

实现

:如何实现线程操作是安全的
同步:通过一个条件判断实现对资源获取的合理操作
互斥:保证执行流在同一时间对临街资源的唯一访问
互斥的实现:互斥锁
同步的实现:条件变量,信号量

互斥锁

本质:就是一个0/1的计数器,主要用于标记资源的访问状态 0-不可访问 1-可访问
操作:加锁,解锁
加锁:将状态设置为不可访问状态
解锁:将状态置为可访问状态
一个执行流在访问资源之前,本质自己也是个临界资源(同一个资源所有线程在访问的时候必须加同一把锁)。
因此互斥锁必须先保证自己是安全的-互斥锁的操作是一个原子操作
加锁
if(lock_status==0)
{
//阻塞
}
lock_status=0
实现:
1.定义互斥锁变量
pthread_mutex_t mutex;
2.初始化互斥锁
int pthread_mutex_init(pthread_mutex_t *mutex,pthread_mutexattr_t *attr);
mutex:定义的互斥锁变量;
attr:互斥锁属性,通常置NULL
返回值:成功返回0;失败返回错误编号
3.加锁
int pthread_mutex_lock(pthread_mutex_t *mutex);-阻塞
int pthread_mutex_trylock(pthread_mutex_t *mutex);-非阻塞
4.解锁
int pthread_mutex_unlock(pthread_mutex_t mutex)
5.释放销毁
int pthread_mutex_destroy(pthread_mutex_t
mutex)
注意点:
1.使用过程中,加锁后,在任意有可能退出的位置都要解锁
2.锁只能保证安全操作,无法保证操作合理
实现:
黄牛抢票
在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

死锁

概念:程序流程无法继续推进,卡死的情况叫做死锁
产生:由于对于锁资源的争抢不当锁导致
四个必要条件
1.互斥条件:我加的锁,别人不能继续加
2.不可剥夺条件:我加的锁别人不能解
3.请求与保持条件:加A锁后请求B锁,B锁请求不到不释放A
4.环路等待条件:加A请求B,对方加B请求A
1.加锁顺序不一致 2.阻塞加锁
预防
编写代码过程中破坏死锁产生的必要条件
1.加锁顺序保持一致
2.使用非阻塞加锁,加不了释放已有
避免
银行家算法

同步的实现:条件变量

本质:pcb队列+两个接口(使线程阻塞+唤醒线程)
注意:条件变量本身不做合理判断,什么时候阻塞/唤醒线程用户控制,条件变量是搭配互斥锁一起使用的。
接口
1.定义:pthread_cond_t cond
2.初始化:pthread_cond_int(pthread_cond_t cond,pthread_condattr_t attr)//属性通常置空
3.阻塞:pthread_cond_wait(pthread_cond_tcond,pthread_mutex_t mutex)
pthread_cond_timedwait:限制时长的阻塞等待
这个接口中涉及三个操作:解锁,阻塞,被唤醒后加锁
并且解锁是原子操作,一步完成不会被打断
4.唤醒:pthread_cond_signal(pthread_cond_t cond):将cond中的pcb队列中的线程至少唤醒一个
/pthread_cond_broadcast(pthread_cond_t cond):将cond的pcb队列中的线程全部唤醒

5.销毁:pthread_cond_destroy(pthread_cond_t cond)
信号只提供了使线程阻塞,和唤醒线程接口,至于什么时候阻塞,什么时候唤醒,条件变量本身并不关心,全部由用户自己控制
注意事项
1.条件的合理判断需要循环进行
2.多种角色要使用多个条件变量分开等待分开唤醒
实现

生产者与消费者模型

针对场景:大量数据产生与处理的场景
思想:将产生与处理两个模块分开,通过线程安全的缓冲区进行交互
好处:解耦合,支持忙闲不均,支持并发
实现:生产者与消费者模型无非就是两类执行流+线程安全的任务队列

#include <iostream>
#include <cstdio> //== #inculde <stdio.h>
#include <cstdlib> //==#include <stdlib.h>
#include <queue>
#include <mutex>
#include <pthread.h>

#define MAXQ 10
class BlockQueue
{
private:
    std::queue<int> _queue;
    int _capacity;
    pthread_mutex_t _mutex;
    pthread_cond_t _cond_pro;
    pthread_cond_t _cond_con;
public:
    BlockQueue(int maxq = MAXQ) :_capacity(maxq) {
        pthread_mutex_init(&_mutex, NULL);
        pthread_cond_init(&_cond_pro, NULL);
        pthread_cond_init(&_cond_con, NULL);
    }
    ~BlockQueue() {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_cond_pro);
        pthread_cond_destroy(&_cond_con);
    }
    bool Push(int data) {
        pthread_mutex_lock(&_mutex);
        while (_queue.size() == _capacity) {
            pthread_cond_wait(&_cond_pro, &_mutex);
        }
        _queue.push(data);
        pthread_mutex_unlock(&_mutex);
        pthread_cond_signal(&_cond_con);
        return true;
    }
    bool Pop(int* data) {
        pthread_mutex_lock(&_mutex);
        while (_queue.empty() == true) {
            pthread_cond_wait(&_cond_con, &_mutex);
        }
        *data = _queue.front();
        _queue.pop();
        pthread_mutex_unlock(&_mutex);
        pthread_cond_signal(&_cond_pro);
        return true;
    }
};

std::mutex g_mutex;
void* consumer(void* arg)
{
    BlockQueue* q = (BlockQueue*)arg;
    int data;
    while (1) {
        q->Pop(&data);
        printf("%p- pop data:%dn", pthread_self(), data);
    }
    return NULL;
}
void* productor(void* arg)
{
    BlockQueue* q = (BlockQueue*)arg;
    int i = 0;
    while (1) {
        q->Push(i);
        printf("%p push data:%dn", pthread_self(), i++);
    }
    return NULL;
}
int main()
{
    pthread_t ctid[4], ptid[14];
    int ret;
    BlockQueue q;

    for (int i = 0; i < 4; i++) {
        ret = pthread_create(&ctid[i], NULL, consumer, (void*)&q);
        if (ret != 0) {
            printf("thread create errorn");
            return -1;
        }
    }
    for (int i = 0; i < 14; i++) {
        ret = pthread_create(&ptid[i], NULL, productor, (void*)&q);
        if (ret != 0) {
            printf("thread create errorn");
            return -1;
        }
    }
    for (int i = 0; i < 4; i++) {
        pthread_join(ctid[i], NULL);
        pthread_join(ptid[i], NULL);
    }
    return 0;
}

在这里插入图片描述

信号量(POSIX)

本质:就是一个计数器,用于实现进程或线程之间的同步与互斥
操作
P操作:计数-1,判断计数是否大于等于0,成立则返回否则阻塞
V操作:计数+1,唤醒一个阻塞的进程和线程
同步的实现
通过计数器对资源数量进行计数,获取资源之前进行P操作,产生资源之后进行V操作。通过这种方式实现对资源的合理获取
互斥的实现
计数器初始值为1(资源只有一个),访问资源前进行P操作,访问完毕进行V操作,实现类似于加锁和解锁的操作
接口流程认识:
1.定义sem_t sem
2.初始化int sem_init(sem_t*sem,int pshared,int value)
sem:定义的信号量;
pshared: 0-线程间; 1-进程间
value:信号量初值–有多少资源初值就设置多少
3.P操作
int sem_wait(sem_t *sem);—阻塞
int sem_trywait(sem_t *sem);
int sem_timedwait(sem_t sem,struct timespec);
4.V操作
int sem_post(sem_t *sem);
5.销毁
int sem_destroy(sem_t *sem)
使用信号量实现一个生产者与消费者模型:
实现线程安全的数据队列

#include <iostream>
#include <cstdlib>
#include <vector>
#include <pthread.h>
#include <semaphore.h>

#define MAXQ 5

class RingQueue{
    private:
        std::vector<int> _arry;
        int _capacity;
        int _write_step;
        int _read_step;
        sem_t _sem_idle;//
        sem_t _sem_data;
        sem_t _sem_lock;
    public:
        RingQueue(int maxq = MAXQ):_arry(maxq),
        _capacity(maxq), _write_step(0), _read_step(0){
            //sem_init(信号量,1进程/0线程,初值)
            sem_init(&_sem_idle, 0, maxq);
            sem_init(&_sem_data, 0, 0);//初始没有数据
            sem_init(&_sem_lock, 0, 1);
        }
        ~RingQueue(){
            sem_destroy(&_sem_idle);
            sem_destroy(&_sem_data);
            sem_destroy(&_sem_lock);
        }
        bool Push(const int data){
            sem_wait(&_sem_idle);//空闲空间-1
            sem_wait(&_sem_lock);//加锁
            _arry[_write_step] = data;
            _write_step = (_write_step+1)%_capacity;
            sem_post(&_sem_lock);//解锁
            sem_post(&_sem_data);//数字资源+1
            return true;
        }
        bool Pop(int *data){
            sem_wait(&_sem_data);//数据资源-1
            sem_wait(&_sem_lock);//加锁
            *data = _arry[_read_step];
            _read_step = (_read_step+1)%_capacity;
            sem_post(&_sem_lock);//解锁
            sem_post(&_sem_idle);//空闲空间+1
            return true;
        }
};
void *consumer(void *arg)
{
    RingQueue *q = (RingQueue*)arg;
    int data;
    while(1) {
        q->Pop(&data);
        printf("%p- pop data:%dn", pthread_self(), data);
    }
    return NULL;
}
void *productor(void *arg)
{
    RingQueue *q = (RingQueue*)arg;
    int i = 0;
    while(1) {
        q->Push(i);
        printf("%p push data:%dn", pthread_self(), i++);
    }
    return NULL;
}
int main()
{
    pthread_t ctid[4], ptid[14];
    int ret;
    RingQueue q;

    for (int i = 0; i < 4; i++) {
        ret = pthread_create(&ctid[i], NULL, consumer, (void*)&q);
        if (ret != 0) {
            printf("thread create errorn");
            return -1;
        }
    }
    for (int i = 0; i < 4; i++) {
        ret = pthread_create(&ptid[i], NULL, productor, (void*)&q);
        if (ret != 0) {
            printf("thread create errorn");
            return -1;
        }
    }
    for (int i = 0; i < 4; i++) {
        pthread_join(ctid[i], NULL);
        pthread_join(ptid[i], NULL);
    }
    return 0;
}

在这里插入图片描述

读写锁 锁的种类

应用场景:读者写者模型
读者写者模型:读共享,写互斥的场景(用互斥锁,串行化效率低)
读写锁:
加读锁:当前只要没有被加写锁
加写锁:既没有读,也没有写的时候才能加写锁
实现:用两个计数器–读者计数+写者计数
当加锁不成功时,则要阻塞进程/线程
读写锁的阻塞是通过自旋锁来实现的
自旋锁:
一直占用CPU不释放,循环进行条件判断(即时性更强)
适用场景:适用于等待时间确定较短的场景
悲观锁:总是认为访问期间会有冲突,因此总是加锁保护
乐观锁:总是认为访问期间大概率没有冲突(CAS锁)
可重入锁:同一个进程可以重复加锁
不可重入锁:同一个线程不能重复加锁,一个锁只能加一次

四.线程应用

线程池的简单实现

线程池:说白了就是一堆(一个或多个)线程进行任务处理
主要针对:有大量人物需要处理的场景
使用多个执行流可以提高处理效率
若一个任务到来就创建一个线程进行处理,处理完毕后销毁有很大缺陷:
1.成本:一个任务处理的成本
线程创建时间+任务处理时间+线程销毁时间=总耗时
若任务处理时间很短,则大量时间呗线程的创建于销毁消耗了
2.风险:若线程无限制,则在峰值压力下会有资源耗尽系统崩溃风险
思想:线程池其实就是一堆创建好的线程+任务队列
有任务来了抛入线程池中,分配一个线程进行处理
1.节省了任务处理过程中线程的创建与销毁成本
2.线程池中的线程与任务节点数量都有最大限制,避免资源耗尽风险
实现:提前创建好规定适量的线程+线程安全任务队列

#include <iostream>
#include <cstdio> //== #inculde <stdio.h>
#include <cstdlib> //==#include <stdlib.h>
#include <queue>
#include <mutex>
#include <unistd.h>
#include <pthread.h>

#define MAXQ 10
#define MAX_THREAD 5

typedef void (*handler_t)(int data);

class ThreadTask{
    private:
        int _data;//要处理的数据
        handler_t _handler;//处理数据的函数
    public:
        ThreadTask() {}
        ThreadTask(int data, handler_t handler):_data(data),
            _handler(handler){}
        void Run(){
            _handler(_data);
        }
};

class BlockQueue
{
    private:
        std::queue<ThreadTask> _queue;
        int _capacity;
        pthread_mutex_t _mutex;
        pthread_cond_t _cond_pro;
        pthread_cond_t _cond_con;
    public:
        BlockQueue(int maxq = MAXQ):_capacity(maxq){
            pthread_mutex_init(&_mutex, NULL);
            pthread_cond_init(&_cond_pro, NULL);
            pthread_cond_init(&_cond_con, NULL);
        }
        ~BlockQueue() {
            pthread_mutex_destroy(&_mutex);
            pthread_cond_destroy(&_cond_pro);
            pthread_cond_destroy(&_cond_con);
        }
        bool Push(const ThreadTask &data){
            pthread_mutex_lock(&_mutex);
            while(_queue.size() == _capacity) {
                pthread_cond_wait(&_cond_pro, &_mutex);
            }
            _queue.push(data);
            pthread_mutex_unlock(&_mutex);
            pthread_cond_signal(&_cond_con);
            return true;
        }
        bool Pop(ThreadTask *data) {
            pthread_mutex_lock(&_mutex);
            while(_queue.empty() == true) {
                pthread_cond_wait(&_cond_con, &_mutex);
            }
            *data = _queue.front();
            _queue.pop();
            pthread_mutex_unlock(&_mutex);
            pthread_cond_signal(&_cond_pro);
            return true;
        }
};


class Threadpool{
    private:
        int _max_thread;//线程最大数量
        int _max_queue;//任务队列中节点最大数量
        BlockQueue _queue;
    public:
        Threadpool(int max_thr=MAX_THREAD, int max_q=MAXQ):
            _max_thread(max_thr),
            _max_queue(max_q), _queue(max_q){
            pthread_t tid;
            int ret;
            for (int i = 0; i < max_thr; i++) {
                ret = pthread_create(&tid, NULL, thr_entry, this);
                if (ret != 0) {
                    printf("thread create errorn");
                    exit(-1);
                }
                pthread_detach(tid);
            }
        }
        static void *thr_entry(void *arg){
            Threadpool *pool = (Threadpool*)arg;
            while(1) {
                ThreadTask task;
                pool->_queue.Pop(&task);
                task.Run();
            }
        }
        bool TaskPush(const ThreadTask &task) {
            _queue.Push(task);
        }
};

void test(int data)
{
    printf("%p-get data:%dn", pthread_self(), data);
    sleep((data%5)+1);
}
int main()
{
    Threadpool pool;

    for (int i = 0; i < 10; i++) {
        ThreadTask task(i, test);
        pool.TaskPush(task);
    }
    while(1) sleep(1);
    return 0;
}

在这里插入图片描述

单例模式

线程安全的单例模式:设计模式
针对场景:一个类只能实例化一个对象,提供一个访问接口(一个资源在内存中只能有一份);
目的
1.节省内存空间
2.防止数据二义混淆(多个数据在同一时刻的不同体现)
实现:
饿汉:资源提前全部加载初始化完毕,用的时候直接用(以空间换时间思想)
1.构造函数私有化,无法在类外实例化对象
2.成员变量(资源数据)静态化,资源单独一份共享,运行前初始化,初始化过程不用考虑线程安全问题。
在这里插入图片描述

懒汉:资源用的时候在去加载初始化,不用就不需要(延迟加载的思想)
1.构造函数私有化
2.定义静态指针成员(为了在用的时候申请加载)
3.在访问接口中加锁保护资源初始化加载过程
4.在加锁之外进行二次检测,提高效率
5.防止编译器过度优化,使用volatile修饰指针成员变量

在这里插入图片描述

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
THE END
分享
二维码
< <上一篇

)">
下一篇>>