UE4 多线程源码浅析(2——AsyncTask)

本文章只是我个人在学习虚幻引擎过程中的一些理解,不一定正确,若有说的不对的地方,欢迎指正。

上篇我们讲解了一下FRunnable的源码,了解了一下它的基本架构和一些实现。本篇我们将介绍接下来的异步任务系统(AsyncTask),在《UE4 多线程的使用》中我们了解了异步任务系统的使用方法,似乎没看到任何有关FRunnable的字眼,那么它和FRunnable有关吗?事实上,有的,不过这需要我们进入到它的底层去看才能找到。

在正式介绍AsyncTask之前,我们需要先介绍虚幻线程池(ThreadPool),因为异步任务系统是基于虚幻线程池搭建的。

一.ThreadPool:

学习虚幻ThreadPool系统,我们需要先了解三个概念——线程池(FQueuedThreadPool、FQueuedThreadPoolBase)、线程池中的线程(FQueuedThread)和任务(IQueuedWork).

1.代码框架:

a.线程池:

首先是线程池的基类——FQueuedThreadPool,它定义在QueuedThreadPool.h文件中(具体路径在下方),该类源码如下:

class CORE_API FQueuedThreadPool
{
public:

	virtual bool Create(uint32 InNumQueuedThreads, uint32 StackSize = (32 * 1024), EThreadPriority ThreadPriority = TPri_Normal) = 0;

	virtual void Destroy() = 0;

	virtual void AddQueuedWork(IQueuedWork* InQueuedWork) = 0;

	virtual bool RetractQueuedWork(IQueuedWork* InQueuedWork) = 0;

	virtual int32 GetNumThreads() const = 0;

	//……

public:

	static FQueuedThreadPool* Allocate();

	static uint32 OverrideStackSize;
};

该类中只有一个变量:OverrideStackSize,表示线程的堆栈大小,如果为0表示使用创建线程时使用的Create方法中传递的值,表示定义了一些线程池通用的接口:

Create——创建具有指定数量线程的线程池;

Destroy——线程池清理所有后台线程;

AddQueuedWork——添加任务到线程池,如果有空闲的线程就把任务给它执行,没有就放入等待队列;

RetractQueuedWork——尝试撤销在等待队列里等待的任务;

GetNumThreads——获取线程池中线程的数量;

Allocate——分配线程池;

注:EngineSourceRuntimeCorePublicMiscQueuedThreadPool.h

FQueuedThreadPool定义的下方,定义了几个引擎使用的全局线程池:

extern CORE_API FQueuedThreadPool* GThreadPool;

extern CORE_API FQueuedThreadPool* GIOThreadPool;

extern CORE_API FQueuedThreadPool* GBackgroundPriorityThreadPool;

#if WITH_EDITOR
extern CORE_API FQueuedThreadPool* GLargeThreadPool;
#endif

其中GLargeThreadPool在编辑器模式下才会生成。

再来是FQueuedThreadPool子类FQueuedThreadPoolBase

class FQueuedThreadPoolBase : public FQueuedThreadPool
{
protected:

	TArray<IQueuedWork*> QueuedWork;

	TArray<FQueuedThread*> QueuedThreads;

	TArray<FQueuedThread*> AllThreads;

	FCriticalSection* SynchQueue;

	bool TimeToDie;

public:

	//……

	virtual bool Create(uint32 InNumQueuedThreads, uint32 StackSize = (32 * 1024), EThreadPriority ThreadPriority = TPri_Normal) override
	{
		//……
	}

	virtual void Destroy() override
	{
		//……
	}

	int32 GetNumQueuedJobs() const
	{
		return QueuedWork.Num();
	}
	virtual int32 GetNumThreads() const
	{
		return AllThreads.Num();
	}
	void AddQueuedWork(IQueuedWork* InQueuedWork) override
	{
		//……
	}

	virtual bool RetractQueuedWork(IQueuedWork* InQueuedWork) override
	{
		//……
	}

	IQueuedWork* ReturnToPoolOrGetNextJob(FQueuedThread * InQueuedThread)
	{
		//……
	}
};

类中定义了线程池的必要数据结构:

QueuedWork——等待的任务队列;

QueuedThreads——正在执行任务的线程;

AllThreads——线程吃中所有的线程;

SynchQueue——用于保护等待任务队列的同步对象;

TimeToDie——标志是否已经销毁;

接口方面大多都是实现了一些父类的接口,新加入的ReturnToPoolOrGetNextJob用来在线程结束此次任务的时候查看任务队列是否还有任务,若有则取出其中一个任务执行,没有就将线程翻入到闲置队列,实现细节我们在后面的小节介绍。

下面附一张图片:
在这里插入图片描述

b.任务:

线程池的任务(IQueuedWork)其实是个接口类,写了几个需要我们继承实现的接口,源码如下:

class IQueuedWork
{
public:

	virtual void DoThreadedWork() = 0;

	virtual void Abandon() = 0;

public:

	virtual ~IQueuedWork() { }
};

DoThreadedWork——真正完成任务的地方;

Abandon——用来放弃队列中的任务,以便它可以根据需要对每个对象进行清理。只有当任务在完成之前被放弃才会调用这个接口(注意:这要求对象使用它被分配到的堆来删除自己);

c.线程池中的线程

前面我说异步任务系统和FRunnable有关,而异步任务系统基于线程池,那么说了这么久FRunnable在哪呢?哪呢?现在来揭晓答案——线程池中的线程(FQueuedThread)。

老规矩,先贴源码:

class FQueuedThread
	: public FRunnable
{
protected:

	FEvent* DoWorkEvent = nullptr;

	TAtomic<bool> TimeToDie{ false };

	IQueuedWork* volatile QueuedWork = nullptr;

	class FQueuedThreadPoolBase* OwningThreadPool = nullptr;

	FRunnableThread* Thread = nullptr;

	virtual uint32 Run() override;

public:

	FQueuedThread() = default;

	virtual bool Create(class FQueuedThreadPoolBase* InPool, uint32 InStackSize = 0, EThreadPriority ThreadPriority = TPri_Normal)
	{
		//……
	}

	bool KillThread()
	{
		//……
	}

	void DoWork(IQueuedWork* InQueuedWork)
	{
		//……
	}
};

成员变量如下:

DoWorkEvent——这个时间用来通知线程有任务要做;

TimeToDie——标志线程时候应该退出;

QueuedWork——指向该线程要做的任务;

OwningThreadPool——指向该线程所属的线程池;

Thread——注意这是一个FRunnableThread,很熟悉吧,就是之前讲过的“真正的线程”;

成员函数如下:

Run——这个就是我们重写的FRunnable的Run函数;

Create——创建具有指定堆栈大小的线程,并创建能够与之通信的各种事件;

KillThread——退出中止线程;

DoWork——通知线程有任务可以做;

2.线程池的初始化:

在引擎启动的时候,虚幻的线程池会在FEngineLoop::PreInitPreStartupScreen中初始化,简略代码如下:

注:引擎的启动流程等我后面写文章再仔细讲讲
注:源码文件路径:EngineSourceRuntimeLaunchPrivateLaunchEngineLoop.cpp

{
	//……
	GThreadPool = FQueuedThreadPool::Allocate();
	int32 NumThreadsInThreadPool = FPlatformMisc::NumberOfWorkerThreadsToSpawn();

	if (FPlatformProperties::IsServerOnly())
	{
		NumThreadsInThreadPool = 1;
	}
	verify(GThreadPool->Create(NumThreadsInThreadPool, StackSize * 1024, TPri_SlightlyBelowNormal));
}

{
	//……
	GBackgroundPriorityThreadPool = FQueuedThreadPool::Allocate();
	int32 NumThreadsInThreadPool = 2;
	if (FPlatformProperties::IsServerOnly())
	{
		NumThreadsInThreadPool = 1;
	}

	verify(GBackgroundPriorityThreadPool->Create(NumThreadsInThreadPool, StackSize * 1024, TPri_Lowest));
}

if (FPlatformProcess::SupportsMultithreading())
{
	{
		//……
		GIOThreadPool = FQueuedThreadPool::Allocate();
		int32 NumThreadsInThreadPool = FPlatformMisc::NumberOfIOWorkerThreadsToSpawn();
		if (FPlatformProperties::IsServerOnly())
		{
			NumThreadsInThreadPool = 2;
		}
		verify(GIOThreadPool->Create(NumThreadsInThreadPool, 96 * 1024, TPri_AboveNormal));
	}
}

函数中前后三段代码分别初始化了GThreadPool 、GBackgroundPriorityThreadPool 、GIOThreadPool 。三个线程池实例都调用FQueuedThreadPool::Allocate创建。

其中GThreadPool中的线程数量由FPlatformMisc::NumberOfWorkerThreadsToSpawn来确定。GBackgroundPriorityThreadPool中的线程数客户端为2,服务端为1。GIOThreadPool的线程数量由FPlatformMisc::NumberOfIOWorkerThreadsToSpawn来确定,默认返回4,因此客户端为4,服务端为2。

3.实现细节:

在这一小节,我们会从线程池的创建,添加任务到线程池以及线程池中的线程是如何获取任务等几个方面了解虚幻线程池的运转。

a.线程池的创建:

在前文我们知道线程池的创建是通过调用FQueuedThreadPool::Allocate,源码如下:

FQueuedThreadPool* FQueuedThreadPool::Allocate()
{
	return new FQueuedThreadPoolBase;
}

代码很简单,只是new一个FQueuedThreadPoolBase对象实例返回去。接下来调用FQueuedThreadPool::Create初始化线程池指定数量的的线程,核心源码:

virtual bool Create(uint32 InNumQueuedThreads, uint32 StackSize = (32 * 1024), EThreadPriority ThreadPriority = TPri_Normal) override
{
	bool bWasSuccessful = true;

	//……

	for (uint32 Count = 0; Count < InNumQueuedThreads && bWasSuccessful == true; Count++)
	{
		FQueuedThread* pThread = new FQueuedThread();
		if (pThread->Create(this, StackSize, ThreadPriority) == true)
		{
			QueuedThreads.Add(pThread);
			AllThreads.Add(pThread);
		}
		else
		{
			bWasSuccessful = false;
			delete pThread;
		}
	}

	if (bWasSuccessful == false)
	{
		Destroy();
	}
	return bWasSuccessful;
}

bWasSuccessful用来指示本次初始化是否成功,函数中创建传入数量(InNumQueuedThreads)的线程。还记得FQueuedThread吗,就是FRunnable的子类,每次循环都new一个FQueuedThread对象,调用它的Create接口,如果线程创建成功就加入线程列表(QueuedThreads、AllThreads),失败就把bWasSuccessful置为falsedelete该线程,出循环后会判断bWasSuccessful是否为false,是的话就调用Destroy,最后返回bWasSuccessful

FQueuedThread::Create源码如下:

virtual bool Create(class FQueuedThreadPoolBase* InPool, uint32 InStackSize = 0, EThreadPriority ThreadPriority = TPri_Normal)
{
	static int32 PoolThreadIndex = 0;
	const FString PoolThreadName = FString::Printf(TEXT("PoolThread %d"), PoolThreadIndex);
	PoolThreadIndex++;

	OwningThreadPool = InPool;
	DoWorkEvent = FPlatformProcess::GetSynchEventFromPool();
	Thread = FRunnableThread::Create(this, *PoolThreadName, InStackSize, ThreadPriority, FPlatformAffinity::GetPoolThreadMask());
	check(Thread);
	return true;
}

该函数先生成一个线程名(PoolThreadName ),设置所属的线程池,从事件池获取一个事件用来做线程同步。然后调用我们熟悉的FRunnableThread::Create。

b.添加任务:

想要添加任务到线程池,我们需要调用到线程池提供的AddQueuedWork,这个接口可以说是虚幻线程池最重要的接口了,先看看它的实现:

void AddQueuedWork(IQueuedWork* InQueuedWork) override
{
	check(InQueuedWork != nullptr);

	if (TimeToDie)
	{
		InQueuedWork->Abandon();
		return;
	}

	check(SynchQueue);

	FQueuedThread* Thread = nullptr;

	{
		FScopeLock sl(SynchQueue);
		const int32 AvailableThreadCount = QueuedThreads.Num();
		if (AvailableThreadCount == 0)
		{
			QueuedWork.Add(InQueuedWork);
			return;
		}

		const int32 ThreadIndex = AvailableThreadCount - 1;

		Thread = QueuedThreads[ThreadIndex];
		QueuedThreads.RemoveAt(ThreadIndex, 1, /* do not allow shrinking */ false);
	}

	Thread->DoWork(InQueuedWork);
}

函数先检查一下是否需要放弃任务,不放弃就使用一个作用域锁保证同时只有一个线程对线程池进行操作,然后查看是否有空闲的线程,若没有就先把任务添加到等待队列然后返回,若有就取出最后一个线程(为什么是最后一个下面会解释),通过调用DoWork来通知该线程执行传入的任务(InQueuedWork)。

注:我们从数组的最后面选择一个线程,该线程是最近使用的线程,因此最有可能拥有堆栈的“热”缓存等(类似于Windows IOCP调度策略)。从最后面取可能性能更高,因为不需要移动内存

最后是DoWork

void DoWork(IQueuedWork* InQueuedWork)
{
	//……

	QueuedWork = InQueuedWork;
	FPlatformMisc::MemoryBarrier();

	DoWorkEvent->Trigger();
}

保存一下任务,唤醒一下事件,执行任务,再进去就是调用windows平台的接口了。

c.线程获取任务:

需要世道线程获取任务的方式,我们只需要了解一下线程池的这个接口——ReturnToPoolOrGetNextJob

IQueuedWork* ReturnToPoolOrGetNextJob(FQueuedThread* InQueuedThread)
{
	//……
	IQueuedWork * Work = nullptr;
	
	//……

	if (QueuedWork.Num() > 0)
	{
		Work = QueuedWork[0];
		QueuedWork.RemoveAt(0, 1, /* do not allow shrinking */ false);
	}
	if (!Work)
	{
		QueuedThreads.Add(InQueuedThread);
	}
	return Work;
}

在本次任务结束的时候,线程会调用一下这个接口,判断任务队列是否还有任务,如果有就获取最早的任务然后从队列中删除它。如果没有工作要做,就把线程添加回空闲队列。

注:获取最早的任务可以防止线程池产生饥饿现象,饥饿现象指任务一直在队列中排队并且永远不会完成

至此,虚幻线程池部分介绍完毕,我们花费了大量笔墨,介绍了虚幻线程池的几个重要概念,还有它的运行原理以及引擎在何时初始化线程池,我觉得了解异步任务系统前先了解虚幻线程池机制是很有好处的。话不多说,下面我们就进入本章的主题——AsyncTask

二.AsyncTask:

1.异步任务:

在**《UE4 多线程的使用》一文中我们了解到,异步任务的启动离不开FAsyncTaskFAutoDeleteAsyncTask**,这两类都是IQueuedWork的子类,我们逐个来看,先是FAutoDeleteAsyncTask

template<typename TTask>
class FAutoDeleteAsyncTask
	: private IQueuedWork
{
	TTask Task;
	
	//……

	void Start(bool bForceSynchronous, FQueuedThreadPool* InQueuedPool)
	{
		//……
	}

	void DoWork()
	{
		//……
	}

	virtual void DoThreadedWork()
	{
		DoWork();
	}

	virtual void Abandon(void)
	{
		//……
	}

public:
	//……

	void StartSynchronousTask()
	{
		Start(true, GThreadPool);
	}

	void StartBackgroundTask(FQueuedThreadPool* InQueuedPool = GThreadPool)
	{
		Start(false, InQueuedPool);
	}
};

主要的变量就有一个Task它是用户传过来的任务实例,剩下的都是接口:

Start——通用的开始执行任务的函数,只提供给内部调用;

DoWork——通知线程执行任务,会调用任务的DoWork接口;

DoThreadedWork——只是DoWork的一层封装;

Abandon——尝试放弃此次任务;

StartSynchronousTask——取GThreadPool中的线程并开始执行任务,调用Start

StartBackgroundTask——取传入的线程池(InQueuedPool)中的线程并开始执行任务,调用Start

下面是FAsyncTask的源码,省略了一些和FAutoDeleteAsyncTask一样的代码:

template<typename TTask>
class FAsyncTask
	: private IQueuedWork
{
	//……

	FThreadSafeCounter	WorkNotFinishedCounter;
	FEvent* DoneEvent;
	FQueuedThreadPool* QueuedPool;
	
	//……

	void DestroyEvent()
	{
		//……
	}

	//……

	void FinishThreadedWork()
	{
		//……
	}

	virtual void DoThreadedWork() override
	{
		//……
	}

	//……

	void SyncCompletion()
	{
		//……
	}

	//……

public:
	//……

	~FAsyncTask()
	{
		//……
	}

	//……

	void EnsureCompletion(bool bDoWorkOnThisThreadIfNotStarted = true)
	{
		//……
	}

	bool Cancel()
	{
		//……
	}

	bool WaitCompletionWithTimeout(float TimeLimitSeconds)
	{
		//……
	}

	bool IsDone()
	{
		//……
	}

	//……
};

变量如下:

WorkNotFinishedCounter——线程安全计数器;

DoneEvent——同步事件;

QueuedPool——指向该任务使用的线程池;

接口如下:

DestroyEvent——是一个内部函数,用来销毁该任务;

FinishThreadedWork——任务完成时被调用;

DoThreadedWork——执行任务,是DoWork的一层封装,相比FAutoDeleteAsyncTask多调用了FinishThreadedWork

SyncCompletion——内部函数,任务完成是触发;

~FAsyncTask——析构函数,会调用DestroyEvent自动删除任务;

EnsureCompletion——等待任务完成,若任务完成后我们想要执行一些东西可以用这个接口触发;

Cancel——取消任务;

WaitCompletionWithTimeout——等待直到任务被完成;

IsDone——查看任务是否完成,若完成返回true,未完成返回false

2.实现细节:

从使用篇我们知道,我们new了一个异步任务的实例,并使用StartBackgroundTaskStartSynchronousTask执行异步任务。

void StartBackgroundTask(FQueuedThreadPool* InQueuedPool = GThreadPool)
{
	Start(false, InQueuedPool);
}

void StartSynchronousTask()
{
	Start(true, GThreadPool);
}

void Start(bool bForceSynchronous, FQueuedThreadPool* InQueuedPool)
{
	//……

	FQueuedThreadPool * QueuedPool = InQueuedPool;
	if (bForceSynchronous)
	{
		QueuedPool = 0;
	}
	if (QueuedPool)
	{
		QueuedPool->AddQueuedWork(this);
	}
	else
	{
		DoWork();
	}
}

两个启动接口调用的都是Start,区别只是传入不同的线程池而已。Start接口先保存传入的线程池,根据是否需要在当前线程上执行异步任务(bForceSynchronous),来选择性的置空线程池变量(QueuedPool),最后在判断是否需要直接调用DoWork,不需要就调用线程池(QueuedPool)的AddQueuedWork

最后附图片一张:
在这里插入图片描述

分割线右边就是本章讲的异步任务和线程池了。

三.小结:

写到这里,本章总算是结束了,我也没想到介绍一些异步任务和线程池需要写这么多,本篇可以说是我写过最长的一章了。下篇我们将介绍虚幻多线程的最后一部分——TaskGraph。感谢观看,我们下一篇再见。

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
THE END
分享
二维码
< <上一篇
下一篇>>