引言

异步编程,历史太深,只能非常浅的讲述一些理解

比如以下有趣的代码,Start方法会在await处”被暂停”,但不会阻塞线程,直到我们点击一次Button,才会继续执行

(当然这既不是原生C#,也不是原生Unity,需要我们额外扩展一些方法)

public class Test : MonoBehaviour
{
    public Button btn;
        
    async void Start()
    {
        Debug.Log("Before Click Button...");
        await btn;
        Debug.Log("After Click Button...");
    }

    private void Update()
    {
        if (Input.GetMouseButtonUp(0)) 
        {
            Debug.Log("On Mouse Button Up");
        }
    }
}

再比如另一个常见的需求,打开一个确定/取消弹窗,然后根据结果进行不同的行为,不过一般来说,我们都是通过注册回调函数进行的,但现在我们可以这样

public class Test : MonoBehaviour
{
    public TestWnd wnd;
    public Button openWnd;
    
    private void Start()
    {
        openWnd.onClick.AddListener(Open);
    }

    private async void Open()
    {
        var flag = await OpenWnd();
        Debug.Log(flag ? "确定" : "取消");
    }
}

想要彻底了解这个有趣的机制,需要从很远的地方开始说起

ThreadPool

ThreadPool(线程池)是对于Thread的封装,关于Thread我用的不多,感觉API非常厚重,仅从实际体验来说被协程吊起来打,就不再赘述了,原生的Thread的一个明显缺点是线程不可重用,每次使用时都需要创建,代价很大

而线程池不一样,.NET在初始化的时候,会为每一个进程都创建一个线程池,此后的操作都会在这个池子里进行,而且简化了很多API,使用起来非常简单

ThreadPool.QueueUserWorkItem

class Program
{
    static void Main(string[] args)
    {
        Console.WriteLine(1);
        
        // 把这个匿名函数,成为工作项
        ThreadPool.QueueUserWorkItem(e =>
        {
            Console.WriteLine($"当前线程:{Thread.CurrentThread.ManagedThreadId}");
            Console.WriteLine(2);
            Thread.Sleep(1000);
            Console.WriteLine(3);
        });
        
        Console.WriteLine($"当前线程:{Thread.CurrentThread.ManagedThreadId}");
        Console.WriteLine(4);
        Console.ReadKey();
    }
}

image-20230303132209857

原理

线程池内部,主要有两块:一个全局队列多个工作线程

image-20230303132837106

当我们调用ThreadPool.QueueUserWorkItem时,会把工作项添加到全局队列中,每一个工作线程都处在一个while(true)循中,会不断的从队列里取工作项执行

每一个工作线程,都是一条独立的线程,但所有的工作线程,都需要访问全局队列,那很明显了,全局队列是一个多线程共享资源,肯定是需要加锁的,那会有一个问题,如果有一些工作项,很小,执行的很快,那会导致工作线程频繁访问全局队列,加锁解锁

因此,实际上,每一个工作线程,内部还包含一个局部队列

image-20230303133011033

ThreadPool.QueueUserWorkItem有一个重载方法:

ThreadPool.QueueUserWorkItem<TState>(Action<TState> callBack, TState state, bool preferLocal)

注意最后这个bool参数,当调用不带preferLocalQueueUserWorkItem方法时,工作项会默认被放到全局队列中,等待某一个工作线程捡走他

preferLocalture时,如果调用线程是线程池内的某一个工作线程,那么会把工作项放到工作线程局部队列中,否则也会丢到全局队列里去

image-20230303133458667

WorkItem

工作项,是线程池最基本的调度单元,底层类型是object,实际任务类型一般分为两类

IThreadPoolWorkItem

继承此接口的类型

public interface IThreadPoolWorkItem
{
    void Execute();
}

例如通过 ThreadPool.QueueUserWorkItem(WaitCallback callBack) 传入的 callBack 委托实例会被包装到一个QueueUserWorkItemCallback 实例里。QueueUserWorkItemCallbackIThreadPoolWorkItem 的实现类

Task

执行 InnerInvoke 会执行 Task 所包含的委托

class Task
{
    internal void InnerInvoke();
}

全局队列

全局队列由 ThreadPoolWorkQueue 维护的,它是整个系统的入口,直接被ThreadPool引用。

public static class ThreadPool
{
    internal static readonly ThreadPoolWorkQueue s_workQueue = new ThreadPoolWorkQueue();

    public static bool QueueUserWorkItem(WaitCallback callBack, object state)
    {
        object tpcallBack = new QueueUserWorkItemCallback(callBack!, state);

        s_workQueue.Enqueue(tpcallBack, forceGlobal: true);

        return true;
    }
}

internal sealed class ThreadPoolWorkQueue
{
    // 全局队列
    internal readonly ConcurrentQueue<object> workItems = new ConcurrentQueue<object>();

    // forceGlobal 为 true 时,push 到全局队列,否则就放到本地队列
    public void Enqueue(object callback, bool forceGlobal);
}

本地队列

线程池中的每一个线程都会绑定一个 ThreadPoolWorkQueueThreadLocals 实例,在 workStealingQueue 这个字段上保存着本地队列

internal sealed class ThreadPoolWorkQueueThreadLocals
{
    // 绑定在线程池
    [ThreadStatic]
    public static ThreadPoolWorkQueueThreadLocals threadLocals;
    // 持有全局队列的引用,以便能在需要的时候将任务转移到全局队列上
    public readonly ThreadPoolWorkQueue workQueue;
}

任务偷窃机制

在上面的本地队列结构中,其实还没有看到真正管理本地队列的地方,其实是在这里

internal sealed class ThreadPoolWorkQueueThreadLocals
{
    // 偷窃队列,本地队列的真正维护者
    public readonly ThreadPoolWorkQueue.WorkStealingQueue workStealingQueue;
    public readonly Thread currentThread;

    public ThreadPoolWorkQueueThreadLocals(ThreadPoolWorkQueue tpq)
    {
    	...
        // WorkStealingQueueList集中管理所有的偷窃队列
        ThreadPoolWorkQueue.WorkStealingQueueList.Add(workStealingQueue);
    }

    // 提供将本地队列中的任务转移到全局队列中去的功能
    public void TransferLocalWork()
    {
        while (workStealingQueue.LocalPop() is object cb)
        {
            workQueue.Enqueue(cb, forceGlobal: true);
        }
    }

    ~ThreadPoolWorkQueueThreadLocals()
    {
        ...
        ThreadPoolWorkQueue.WorkStealingQueueList.Remove(workStealingQueue);
    }
}

根据代码可以知道,每个任务线程的本地队列,由各自的workStealingQueue统一管理,而每一个workStealingQueue,又都会在构造的时候被添加到线程池统一的列表内

换句话说,工作线程A中的工作项,实际上对于其他所有工作线程,都是可见的,这只不过是一个嵌套列表

这导致的好处就是,实际上,工作线程不仅可以从全局队列中获取任务项,甚至还可以从其他工作线程中窃取任务项

如图,Thread1优先从全局队列中拿到了一个任务项,此时全局队列里没有任务了,而Thread3也没事干了,那Thread3就会从Thread2的本地队列里,窃取一个任务过来

image-20230303140547219

生命周期

线程池的底层,最终肯定还是主动创建了Thread,那么它到底是在什么时候创建的呢?

通过断点,可以观察到代码执行路径

image-20230303155304412

QueueUserWorkItem

我们的匿名函数会被封装到一个类QueueUserWorkItemCallback中,然后把这个类入队

public static bool QueueUserWorkItem(WaitCallback callBack, object state)
{
    object tpcallBack = new QueueUserWorkItemCallback(callBack!, state);

    s_workQueue.Enqueue(tpcallBack, forceGlobal: true);

    return true;
}

Enqueue

入队操作

public void Enqueue(object callback, bool forceGlobal)
{
    // 线程池中执行的任务只有两种:IThreadPoolWorkItem或者Task
    Debug.Assert((callback is IThreadPoolWorkItem) ^ (callback is Task));

    ThreadPoolWorkQueueThreadLocals? tl = null;
    if (!forceGlobal)
        // 获取本地队列,如果执行改代码的线程不是线程池线程,
        // 那这边是获取不到的,就算 forceGlobal 是 false,
        // 也会把任务放到全局队列
        tl = ThreadPoolWorkQueueThreadLocals.threadLocals;

    if (null != tl)
    {
        // 放到本地队列
        tl.workStealingQueue.LocalPush(callback);
    }
    else
    {
        // 当道全局队列
        workItems.Enqueue(callback);
    }

    EnsureThreadRequested();
}

EnsureThreadRequested

internal void EnsureThreadRequested()
{
    // 当前需要执行的任务数量
    int count = _separated.numOutstandingThreadRequests;
    // 最大数量是处理器的数量
    while (count < Environment.ProcessorCount)
    {
        // CompareExchange(location,value,compare)
        // 线程安全方法,比较location和compare,如果相等,则location=value,返回count
        // numOutstandingThreadRequests是多线程资源,可能在多个线程被修改
        // 目的是确保其他同时只有1个地方在申请线程
        int prev = Interlocked.CompareExchange(ref _separated.numOutstandingThreadRequests, count + 1, count);
        if (prev == count)
        {
            ThreadPool.RequestWorkerThread();
            break;
        }
        count = prev;
    }
}

RequestWorkerThread

RequestWorker

// 申请一个新的工作项
internal static void RequestWorkerThread() => PortableThreadPool.ThreadPoolInstance.RequestWorker();

internal void RequestWorker()
{
    Interlocked.Increment(ref _separated.numRequestedWorkers);
    WorkerThread.MaybeAddWorkingWorker(this);
    // 初始化 GateThread
    GateThread.EnsureRunning(this);
}

MaybeAddWorkingWorker

用于计算需要创建的线程

  • NumProcessingWork:当前正在执行任务的 Worker Thread
  • NumExistingThreads:当前线程池中实际有的 Worker Thread
  • NumThreadsGoal:当前允许创建的最大 Worker Thread,算法更新
internal static void MaybeAddWorkingWorker(PortableThreadPool threadPoolInstance)
{
    ThreadCounts counts = threadPoolInstance._separated.counts;
    short numExistingThreads, numProcessingWork, newNumExistingThreads, newNumProcessingWork;
    // 这个 while (true) 是确保计算出正确的待创建线程数
    while (true)
    {
        // 执行任务的线程数量,不能超过最大可以创建数量
        numProcessingWork = counts.NumProcessingWork;
        if (numProcessingWork >= counts.NumThreadsGoal)
        {
            return;
        }

        // 当前任务数+1
        newNumProcessingWork = (short) (numProcessingWork + 1);
        // 当前线程数
        numExistingThreads = counts.NumExistingThreads;
        // 当前线程数 = Max(当前任务数,当前线程数)
        newNumExistingThreads = Math.Max(numExistingThreads, newNumProcessingWork);

        ThreadCounts newCounts = counts;
        newCounts.NumProcessingWork = newNumProcessingWork;
        newCounts.NumExistingThreads = newNumExistingThreads;
        // 对比新旧线程数
       	ThreadCounts oldCounts = threadPoolInstance._separated.counts.InterlockedCompareExchange(newCounts, counts);

        if (oldCounts == counts)
        {
             break;
        }

        counts = oldCounts;
    }

    int toCreate = newNumExistingThreads - numExistingThreads;
    int toRelease = newNumProcessingWork - numProcessingWork;

    if (toRelease > 0)
    {
        s_semaphore.Release(toRelease);
    }

    while (toCreate > 0)
    {
        if (TryCreateWorkerThread())
        {
            toCreate--;
            continue;
        }

        ...
    }
}

TryCreateWorkerThread

最底层,真正创建线程的地方

private static bool TryCreateWorkerThread()
{
    try
    {
        Thread workerThread = new Thread(s_workerThreadStart);
        // 通过这里可以知道,线程池的所有线程都是后台线程,一旦程序停止,所有线程都会停止
        workerThread.IsThreadPoolThread = true;
        workerThread.IsBackground = true;
        workerThread.UnsafeStart();
    }
    catch (ThreadStartException)
    {
        return false;
    }
    catch (OutOfMemoryException)
    {
        return false;
    }

    return true;
}

Task

Task是基于ThreadPool的再一次封装,ThreadPool的API非常之简单,创建一个工作项之后就可以丢给线程池去跑了

但是,线程池还是有一些不方便的地方

  • 不知道啥时候结束
  • 没有返回值

于是Task诞生了

简单使用

几种启动Task的方式

class Program
{
    static void Main(string[] args)
    {
        var task = new Task((() =>
        {
            Console.WriteLine($"第一个Task:{Thread.CurrentThread.ManagedThreadId}");
        }));
        task.Start();

        Task.Factory.StartNew((() =>
        {
            Console.WriteLine($"第二个Task:{Thread.CurrentThread.ManagedThreadId}");
        }));

        Task.Run((() =>
        {
            Console.WriteLine($"第三个Task:{Thread.CurrentThread.ManagedThreadId}");
        }));

        Console.ReadKey();
    }
}

image-20230303163801956

等待结束,并访问返回值

class Program
{
    static void Main(string[] args)
    {
        var task = Task.Run<string>(() => "Hello World!");
        task.ContinueWith(t => Console.WriteLine(t.Result));
        Console.ReadKey();
    }
}

image-20230303164541653

await关键字,几乎以同步的方式进行异步编程

class Program
{
    static async Task Main(string[] args)
    {
        var task = Task.Run<string>(() => "Hello World!");
        var str = await task;
        Console.WriteLine(str);
        Console.ReadKey();
    }
}

生命周期

Task本质还是在线程池里跑的,以Task.Run为例,断点看一下代码的执行过程

image-20230303165239321

Task.Run

public static Task Run(Action action)
{
    return Task.InternalStartNew(null, action, null, default, TaskScheduler.Default,
        TaskCreationOptions.DenyChildAttach, InternalTaskOptions.None);
}

Task.InternalStartNew

这里可以发现我们的回调被包装成了一个Task

private static readonly TaskScheduler s_defaultTaskScheduler = new ThreadPoolTaskScheduler();

internal static Task InternalStartNew(
    Task? creatingTask, Delegate action, object? state, CancellationToken cancellationToken, TaskScheduler scheduler,
    TaskCreationOptions options, InternalTaskOptions internalOptions)
{
	
    Task t = new Task(action, state, creatingTask, cancellationToken, options, internalOptions | InternalTaskOptions.QueuedByRuntime, scheduler);

    t.ScheduleAndStart(false);
    return t;
}

Task.ScheduleAndStart

internal void ScheduleAndStart(bool needsProtection)
{
    ...
    try
    {
        // 入口
        m_taskScheduler.InternalQueueTask(this);
    }
    catch (Exception e)
    {
        ...
    }
}

TaskScheduler.InternalQueueTask

internal void InternalQueueTask(Task task)
{
	...
    this.QueueTask(task);
}

ThreadPoolTaskScheduler.QueueTask

protected internal override void QueueTask(Task task)
{
    TaskCreationOptions options = task.Options;
    if (Thread.IsThreadStartSupported && (options & TaskCreationOptions.LongRunning) != 0)
    {
        // 当 TaskCreationOptions.LongRunning 时
        // 此时会直接创建一个线程
        // 一般用于长线程(一个很耗时的同步操作)
        new Thread(s_longRunningThreadWork)
        {
            IsBackground = true,
            Name = ".NET Long Running Task"
        }.UnsafeStart(task);
    }
    else
    {
		// 正常走一个线程池
        ThreadPool.UnsafeQueueUserWorkItemInternal(task, (options & TaskCreationOptions.PreferFairness) == 0);
    }
}

针对长线程,可以用此方法创建:

new TaskFactory().StartNew(() =>
{
    Console.WriteLine("Hello World!");
}, TaskCreationOptions.LongRunning);
  • LongRunning 长线程
  • PreferFairness 全局/本地队列

Task回调

其实Task神奇的地方在于,为什么可以抛出结束事件

static async Task Main(string[] args)
{
    await Task.Run(() =>
    {
        Console.Write("Hello");
    }).ContinueWith((task =>
    {
        Console.WriteLine("Test");
    }));
}

Task.ContinueWith

private Task ContinueWith(Action<Task> continuationAction, TaskScheduler scheduler,
    CancellationToken cancellationToken, TaskContinuationOptions continuationOptions)
{
    CreationOptionsFromContinuationOptions(continuationOptions, out TaskCreationOptions creationOptions, out InternalTaskOptions internalOptions);

    // 我们的回调函数,被包装成了一个新的Task,ContinuationTaskFromTask
    Task continuationTask = new ContinuationTaskFromTask(
        this, continuationAction, null,
        creationOptions, internalOptions
    );

    // 注册Task
    ContinueWithCore(continuationTask, scheduler, cancellationToken, continuationOptions);

    return continuationTask;
}

Task.ContinueWithCore

internal void ContinueWithCore(Task continuationTask,
    TaskScheduler scheduler,
    CancellationToken cancellationToken,
    TaskContinuationOptions options)
{
    // ContinuationTaskFromTask 又被包装成了 ContinueWithTaskContinuation
    TaskContinuation continuation = new ContinueWithTaskContinuation(continuationTask, options, scheduler);

    ...

    if (!continuationTask.IsCompleted)
    {
       

        // 把回调添加到队里里
        bool continuationQueued = AddTaskContinuation(continuation, addBeforeOthers: false);

        // 如果添加失败,那说明任务已经同步完成了,此时直接直接执行回调
        if (!continuationQueued) 
            continuation.Run(this, canInlineContinuationTask: true);
    }
}

Task.AddTaskContinuation

private bool AddTaskContinuation(object tc, bool addBeforeOthers)
{

    if (IsCompleted) 
        return false;

    if ((m_continuationObject != null) || (Interlocked.CompareExchange(ref m_continuationObject, tc, null) != null))
    {
        return AddTaskContinuationComplex(tc, addBeforeOthers);
    }
    else return true;
}

Task.AddTaskContinuationComplex

此时可以发现Task里有一个object属性m_continuationObject

这个属性里存着所有的回调

private volatile object? m_continuationObject; 

private bool AddTaskContinuationComplex(object tc, bool addBeforeOthers)
{
    object? oldValue = m_continuationObject;

    ...

    List<object?>? list = m_continuationObject as List<object?>;
    if (list != null)
    {
        ...
        list.Add(tc);
    	...
    }

    return false;
}

那么这个属性什么时候会被调用呢?

参考.NET官方文档,可以发现这么一个方法

Task.Finish

void Finish(bool bUserDelegateExecuted)
{
    ...
    FinishContinuations()
}

Task.FinishContinuations

internal void FinishContinuations()
{
    // Atomically store the fact that this task is completing.  From this point on, the adding of continuations will
    // result in the continuations being run/launched directly rather than being added to the continuation list.
    // Then if we grabbed any continuations, run them.
    // 这个任务已经完成,从这个时间点开始,添加任何回调都会直接运行
    // 对于任何存在的回调,会依次执行他们
    // 这里使用了 m_continuationObject
    object? continuationObject = Interlocked.Exchange(ref m_continuationObject, s_taskCompletionSentinel);
    if (continuationObject != null)
    {
        RunContinuations(continuationObject);
    }
}

Finish又是啥地方调用的?

看一个简单的例子

image-20230512103752256

image-20230512103724386

简单看一下堆栈

image-20230512103354996

很明显这是线程池的代码,WorkItem执行完之后,抛出了一个事件,关键是ExecuteWithThreadLocal,进去看看

private void ExecuteWithThreadLocal(ref Task currentTaskSlot)
{
    this.Finish(true);
}

所以整个流程就很清楚了

  • Start Task : 把Action包装成一个Task,并由线程池执行
  • ContinueWith : 把Action包装成一个ContinuationTaskFromTask,一个Task的子类并添加到目标Task的m_continuationObject中

  • WorkItem结束时候会派发事件,Task调用m_continuationObject

Async/Await

基础用法

以一段简短的代码为例

class Program
{
    static async Task Main(string[] args)
    {
        Console.WriteLine("Start");
    
        await TestAsync();
        
        Console.Write(" World");
    }

    static Task TestAsync()
    {
        return Task.Run(() =>
        {
            Console.Write("Hello");
        });
    }
}

image-20230303170302141

  • 方法声明为async
  • 返回值可以是void Task Task

  • 参数随意,但不能添加ref和out

Await

await是一个语法题,他后接着的表达式,必须是可等待的

假设一个返回类型为T的表达式,他需要是可等待的,那么T必须满足:

  1. T必须具备(可以是扩展方法)无参方法GetAwaiter(),此方法返回一个类型A

    public struct T
    {
        // 一个返回等待器的实例方法
        public A GetAwaiter()
        {
            return new A();
        }
    }
    
  2. 类型A类型必须实现INotifyCompletion或者ICriticalNotifyCompletion接口

    public interface INotifyCompletion
    {
        void OnCompleted(Action continuation);
    }
       
    public interface ICriticalNotifyCompletion : INotifyCompletion
    {
        void UnsafeOnCompleted(Action continuation);
    }
    
  3. 类型A类型必须具有一个可读的实例属性IsCompleted,是bool

    // 对外仅可读
    public bool IsCompleted { get; private set;}
    
  4. 类型A类型必须具有一个非泛型的无参方法GetResult(),表达式的返回值需要和此方法保持一致

    public string GetResult()
    {
    	return "result";
    }
    

我们把一个可等待的对象,称为Job(为了区别于Task)

把他对应的awatier,称为等待器

结合以上信息,我们可以简单实现一个自定义任务

class TestTask
{
    private Action continuation;
    private string result;
    public bool IsCompleted { get; private set; }
    
    public TestTaskAwaiter GetAwaiter() => new TestTaskAwaiter(this);

    // 模拟Task
    public void Run(Func<string> func)
    {
        new Thread(() =>
        {
            var result = func();
            // 模拟Task,异步完成之后设置结果
            TrySetResult(result);
        })
        {
            IsBackground = true
        }.Start();
    }

    // 模拟Task添加回调
    public bool AddContinuation(Action action)
    {
        if (IsCompleted)
        {
            return false;
        }

        continuation += action;
        return true;
    }

    private void TrySetResult(string result)
    {
        this.result = result;
        IsCompleted = true;
        continuation?.Invoke();
    }
}

然后是对应的等待器,等待器Awatier需要满足上面写的几个要求

class TestTask
{
    ...
        
	// 1.继承接口
    public struct TestTaskAwaiter : INotifyCompletion
    {
        private readonly TestTask testTask;

        // 2. 实现 IsCompleted 属性
        public bool IsCompleted => testTask.IsCompleted;

        public TestTaskAwaiter(TestTask testTask)
        {
            this.testTask = testTask;
        }

        public void OnCompleted(Action continuation)
        {
            if (testTask.AddContinuation(continuation))
            {
                Console.WriteLine($"TestTaskAwaiter.OnCompleted1:{DateTime.Now.Second}");
            }
            else
            {
                Console.WriteLine($"TestTaskAwaiter.OnCompleted2:{DateTime.Now.Second}");
                continuation();
            }
        }

        // 3. 实现 GetResult 方法
        public string GetResult()
        {
            Console.WriteLine("TestTaskAwaiter.GetResult");
            return testTask.result;
        }
    }
}

测试1

class Program
{
    private static Action action;
    
    static async Task Main(string[] args)
    {
        var testTask = new TestTask();
        
        testTask.Run((() =>
        {
            Console.WriteLine($"Task Start:{DateTime.Now.Second}");
            Thread.Sleep(1000);
            Console.WriteLine($"Task End:{DateTime.Now.Second}");
            return "Result";
        }));

        testTask.AddContinuation((() =>
        {
            Console.WriteLine("After Task");
        }));
        
        Console.WriteLine($"Main Thread:{DateTime.Now.Second}");

        var res = await testTask;

        Console.WriteLine($"Main Thread End:{res}");

        Console.ReadKey();
    }
}

image-20230310200239290

测试2

class Program
{
    private static Action action;
    
    static async Task Main(string[] args)
    {
        var testTask = new TestTask();
        
        testTask.Run((() =>
        {
            Console.WriteLine($"Task Start:{DateTime.Now.Second}");
            return "Result";
        }));

        testTask.AddContinuation((() =>
        {
            Console.WriteLine("After Task");
        }));
        
        Console.WriteLine($"Main Thread:{DateTime.Now.Second}");

        var res = await testTask;

        Console.WriteLine($"Main Thread End:{res}");

        Console.ReadKey();
    }
}

image-20230310201338834

INotifyCompletion VS ICriticalNotifyCompletion

既然这两个接口只要实现其中一个,那么他们有什么区别呢?(也可以两个都实现,但是会优先调用前者)

可以直接参考TaskAwaiter的内部代码

public readonly struct TaskAwaiter<TResult> : ICriticalNotifyCompletion, INotifyCompletion
{
    private readonly Task<TResult> m_task;

    internal TaskAwaiter(Task<TResult> task)
    {
        m_task = task;
    }

    public void OnCompleted(Action continuation)
    {
        TaskAwaiter.OnCompletedInternal(m_task, continuation, continueOnCapturedContext: true, flowExecutionContext: true);
    }

    public void UnsafeOnCompleted(Action continuation)
    {
        TaskAwaiter.OnCompletedInternal(m_task, continuation, continueOnCapturedContext: true, flowExecutionContext: false);
    }
    
    internal static void OnCompletedInternal(
        Task task,
        Action continuation,
        bool continueOnCapturedContext,
        bool flowExecutionContext)
    {
        m_task.SetContinuationForAwait(continuation, continueOnCapturedContext, flowExecutionContext);
    }
}

可以发现只有一个参数不一样

  • flowExecutionContext 是否向后传播ExcutionContext(线程上下文)

语法糖

await语法糖,和Task、Thread、ThreadPool其实并没有直接的关系,await后面可以跟着任何Job,Job >= Task

await等待的是Awaiter,当等待器结束时,会派发结束事件,await等待的就是这个事件

简单考虑来说,await会把await表达式下一行开始的所有代码,都包装成一个类似于Action的东西,等Job收到结束消息时,再触发Job之后的内容

所谓收到结束消息,以上面的例子来说,即

public void Run(Func<string> func)
{
    new Thread(() =>
    {
        var result = func();
        // Job结束了,设置结果,触发回调
        TrySetResult(result);
    })
    {
        IsBackground = true
    }.Start();
}

C#原生的await,仅可以跟着void,或者Task

可以等待Task是因为C#封装好了TaskAwaiter类

如果你想等待一个void类型,那么大多数时候需要自己额外封装

Await Task

以一个简单的例子来说

class Program
{
    static async Task Main(string[] args)
    {
        Console.WriteLine("Start");
    
        await TestAsync();
        
        Console.Write(" World!");
    }

    static Task TestAsync()
    {
        return Task.Run(() =>
        {
            Console.Write("Hello");
        });
    }
}

用ILSpy反编译Dll后,查看编译器处理过的代码

// 入口函数1
private static void <Main>(string[] args)
{
    Main(args).GetAwaiter().GetResult();
}

// 入口函数2
private static Task Main(string[] args)
{
    <Main>d__0 stateMachine = new <Main>d__0 
    {
        // 核心
        <>t__builder = AsyncTaskMethodBuilder.Create(),
        args = args,
        <>1__state = -1
    };
    stateMachine.<>t__builder.Start<<Main>d__0>(ref stateMachine);
    return stateMachine.<>t__builder.Task;
}

// 编译器自动生成的类型
private sealed class <Main>d__0 : IAsyncStateMachine
{
    // Fields
    public int <>1__state;
    public AsyncTaskMethodBuilder <>t__builder;
    public string[] args;
    private TaskAwaiter <>u__1;

    // Methods
    private void MoveNext() { }
    [DebuggerHidden]
    private void SetStateMachine(IAsyncStateMachine stateMachine) { }
}

首先,编译器创建了2个Main函数,入口函数被修改了

入口函数1是真正的入口,会调用入口函数2

在入口2,我们会创建一个编译器自动生成的状态机,把函数参数拷贝到状态机内,重置状态机的状态为-1,然后启动状态机

当我们await一个Task时,我们的核心类是AsyncTaskMethodBuilder,他是整个状态机的入口

public struct AsyncTaskMethodBuilder
{
	public static void Start<TStateMachine>(ref TStateMachine stateMachine)
    {
        ...
        // 执行状态机
        stateMachine.MoveNext();
    }   
}

private struct <Main > d__0 : IAsyncStateMachine
{
    public int <>1__state;
    public AsyncTaskMethodBuilder<> t__builder;
    // await的部分,会变成TaskAwaiter
    private TaskAwaiter<> u__1;

    private void MoveNext()
    {
        int num = <>1__state;
        try
        {
            TaskAwaiter awaiter;
            // 初始化的时候,我们的状态被重置为-1,所以肯定会进入if
            if (num != 0)
            {
                // 执行await之前的部分
                Console.WriteLine("Start");
                // 获取awaiter
                awaiter = TestAsync().GetAwaiter();
                // 假设这个任务米有立刻完成(一般情况),那么会进入if
                if (!awaiter.IsCompleted)
                {
                    // 切换状态为0
                    num = (<>1__state = 0);
                    <>u__1 = awaiter;
                    <>t__builder.AwaitUnsafeOnCompleted(ref awaiter, ref this);
                    // 直接返回了,不会执行await之后的内容
                    return;
                }
            }
            else
            {
                awaiter =  <>u__1;
                    <>u__1 = default(TaskAwaiter);
                num = (<>1__state = -1);
            }

            awaiter.GetResult();
            Console.Write(" World");
        }
        catch (Exception exception)
        {
            <>1__state = -2;
            <>t__builder.SetException(exception);
            return;
        }
        <>

        1__state = -2;
        // 结束通知
        <>t__builder.SetResult();
    }
}

很明显关键部分是<>t__builder.AwaitUnsafeOnCompleted(ref awaiter, ref this);

查看.NET官方文档

  • AsyncTaskMethodBuilder.AwaitUnsafeOnCompleted Schedules the state machine to proceed to the next action when the specified awaiter completes. This method can be called from partially trusted code.

    当指定的等待程序完成时,调度状态机继续执行下一个操作,也就是当awaiter执行完成之后,再次调用MoveNext

我们继续往里反编译,看看他是怎么等待awaiter执行完成的

// =================
public void AwaitUnsafeOnCompleted<TAwaiter, TStateMachine>() 
{
	AsyncTaskMethodBuilder<VoidTaskResult>.AwaitUnsafeOnCompleted(ref awaiter, ref stateMachine, ref m_task);
}

// =================
internal static void AwaitUnsafeOnCompleted<TAwaiter, TStateMachine>()
{
	...
	AwaitUnsafeOnCompleted(ref awaiter, stateMachineBox);
}

// =================
internal static void AwaitUnsafeOnCompleted<TAwaiter>() 
{
	if (default(TAwaiter) != null && awaiter is ITaskAwaiter)
	{
        // 1.
		TaskAwaiter.UnsafeOnCompletedInternal(...);
		return;
	}
	if (default(TAwaiter) != null && awaiter is IConfiguredTaskAwaiter)
	{
        // 2.
		TaskAwaiter.UnsafeOnCompletedInternal(...);
		return;
	}
	if (default(TAwaiter) != null && awaiter is IStateMachineBoxAwareAwaiter)
	{
		// 3.
		((IStateMachineBoxAwareAwaiter)(object)awaiter).AwaitUnsafeOnCompleted(box);
		return;
	}
	// 4.自定义的awaiter会在这儿
	awaiter.UnsafeOnCompleted(box.MoveNextAction);
}

很明显了,会根据传入的awaiter类型,调用他们自身的AwaitUnsafeOnCompleted方法

此时我们是一个TaskAwatier

class TaskAwaiter
{
    public readonly struct TaskAwaiter : ICriticalNotifyCompletion, INotifyCompletion, ITaskAwaiter
    {
        internal static void UnsafeOnCompletedInternal(...)
        {
            ...
            task.UnsafeSetContinuationForAwait(...);
        }
    }
}

最后还是进入了Task,添加成了Task的一个回调方法

class Task
{
    internal void UnsafeSetContinuationForAwait(IAsyncStateMachineBox stateMachineBox, bool continueOnCapturedContext)
	{
        ...
        // 默认fasle
		if (continueOnCapturedContext)
		{
			AddTaskContinuation(...)
		}
		if (!AddTaskContinuation(stateMachineBox, addBeforeOthers: false))
		{
            // 最终如果实在没法添加为Task的回调方法,那么会调用线程池,并且放到局部线程中(preferLocal)
			ThreadPool.UnsafeQueueUserWorkItemInternal(stateMachineBox, preferLocal: true);
		}
	}
}

所以总体的流程大概如图

image-20230306173711302

Await Void

回到本文章最开始的例子,如何await一个Button?

需要进行如下扩展

  1. Button类型需要有一个方法(可以是扩展方法)GetAwaiter(),该方法返回一个Button等待器

    public static class ButtonEx
    {
        public static ButtonAwaiter GetAwaiter(this Button self) => new ButtonAwaiter(self);
    }
    
  2. Button等待器需要实现一些必须的属性和方法

    这是一个干净的Button等待器

    public class ButtonAwaiter : ICriticalNotifyCompletion
    {
        public bool IsCompleted { get; private set; }
       
        public ButtonAwaiter(){}
       
        public void GetResult(){}
           
        public void OnCompleted(Action continuation){}
       
        public void UnsafeOnCompleted(Action continuation){}
    }
    

    进一步封装之后,应该如下

    public class ButtonAwaiter : ICriticalNotifyCompletion
    {
        public bool IsCompleted { get; private set; }
       
        private Button m_Btn;
        private Action m_Continuation;
       
        public ButtonAwaiter(Button btn)
        {
            m_Btn = btn;
        }
       
        public void GetResult()
        {
               
        }
           
        public void OnCompleted(Action continuation)
        {
            UnsafeOnCompleted(continuation);
        }
       
        public void UnsafeOnCompleted(Action continuation)
        {
            m_Continuation = continuation;
            m_Btn.onClick.AddListener(OnClickInternal);
        }
       
        private void OnClickInternal()
        {
            m_Btn.onClick.RemoveListener(OnClickInternal);
            IsCompleted = true;
            m_Continuation?.Invoke();
            m_Continuation = null;
        }
    }
    

那么这个等待器,究竟做了什么呢?

以开头的简单代码为例

async void Start()
{
    Debug.Log("Before Click Button...");
    await btn;
    Debug.Log($"After Click Button...");
}

反编译的结果如下(进行了一定整理

// 入口1
private void Start()
{
    <Start>d__1 stateMachine;
    stateMachine = new <Start>d__1();
    stateMachine.<>t__builder = AsyncVoidMethodBuilder.Create();
    stateMachine.<>4__this = this;
    stateMachine.<>1__state = -1;
    stateMachine.<>t__builder.Start(ref stateMachine);
}

// 对应的状态机
private sealed class <Start > d__1 : IAsyncStateMachine
{
    public int <>1__state;
    public AsyncVoidMethodBuilder<> t__builder;
    public test<>4__this;
    private object <>u__1;

    private void MoveNext()
    {
        int num;
        num = this.<>1__state;
        try
        {
            ButtonAwaiter awaiter;
            if (num != 0)
            {
                Debug.Log((object) "Before Click Button...");
                awaiter = this.<>4__this.btn.GetAwaiter();
                if (!awaiter.IsCompleted)
                {
                    num = (this.<>1__state = 0);
                    this.<>u__1 = awaiter;
                        <Start > d__1 stateMachine;
                    stateMachine = this;
                    this.<>t__builder.AwaitUnsafeOnCompleted(ref awaiter, ref stateMachine);
                    return;
                }
            }
            else
            {
                awaiter = (ButtonAwaiter) this.<>u__1;
                this.<>u__1 = null;
                num = (this.<>1__state = -1);
            }

            awaiter.GetResult();
            Debug.Log((object) "After Click Button...");
        }
        catch (Exception exception)
        {
            this.<>1__state = -2;
            this.<>t__builder.SetException(exception);
            return;
        }

        this.<>1__state = -2;
        this.<>t__builder.SetResult();
    }
}

可以发现,大部分结构和Task的反编译代码是很接近的

但是,有一个很关键的地方不一样,此时的builder是AsyncVoidMethodBuilder

那么自然builder.AwaitUnsafeOnCompleted方法也肯定产生了改变,我们进一步反编译

class AsyncVoidMethodBuilder
{
    public void AwaitUnsafeOnCompleted<TAwaiter, TStateMachine>() 
    {
        try
        {
            AsyncMethodBuilderCore.MoveNextRunner runnerToInitialize;
            runnerToInitialize = null;
            Action completionAction;
            // 1.获取结束时的action
            completionAction = this.m_coreState.GetCompletionAction();
            ...
            // 2.添加结束委托
            awaiter.UnsafeOnCompleted(completionAction);
        }
        catch (Exception exception)
        {
            AsyncMethodBuilderCore.ThrowAsync(exception, null);
        }
    }
}

我们一步一步来

  1. GetCompletionAction

    有两个敌方比较关键,首先是捕获了当前线程的当前上下文环境

    其次能很明显的发现,返回值defaultContextAction,其实就是moveNextRunner.Run(初始化时传入了捕获的当前上下文)

    class AsyncMethodBuilderCore
    {
        internal Action GetCompletionAction()
        {
            // 首先获取当前运行上下文
            ExecutionContext executionContext = ExecutionContext.FastCapture();
            MoveNextRunner moveNextRunner;
            Action defaultContextAction;
            if (executionContext != null && executionContext.IsPreAllocatedDefault)
            {
                var defaultContextAction = this.m_defaultContextAction;
                if (defaultContextAction != null)
                {
                    return defaultContextAction;
                }
       
                moveNextRunner = new MoveNextRunner(executionContext);
                // 1.
                defaultContextAction = moveNextRunner.Run;
                this.m_defaultContextAction = defaultContextAction;
            }
            else
            {
                moveNextRunner = new MoveNextRunner(executionContext);
                // 2.
                defaultContextAction = moveNextRunner.Run;
                ...
            }
       
    		...
            return defaultContextAction;
        }
    }
    
  2. MoveNextRunner.Run

    那我们进去观察一下这个方法,其核心就是用ExecutionContext去跑了一个InvokeMoveNext方法

    关于ExecutionContext.Run方法,水平优先,很难展开讨论这个

    总的来说,这个方法可以用指定时刻的一个上下文环境(程序状态),去执行某一个方法

    具体观察这个InvokeMoveNext方法,是不是很眼熟?

    没错,他就是编译器自动生成的状态机的MoveNext方法!

    internal sealed class MoveNextRunner
    {
        internal void Run()
        {
            try
    		{
                // 这里使用的上下文,就是上一步传入进来的
    			ExecutionContext.Run(InvokeMoveNext,executionContext);
    			return;
    		}
    		finally
    		{
            	this.m_context.Dispose();
            }
        }
       
        private static void InvokeMoveNext(object stateMachine)
        {
            ((IAsyncStateMachine)stateMachine).MoveNext();
        }
    }
    
  3. awaiter.UnsafeOnCompleted

    在上一步,我们拿到了一个执行MoveNext的回调函数,那么这个回调函数又注册到了哪儿里呢?

    这里的awaiter,在编译器生成的状态机里,我们可以知道他其实就是ButtonAwaiter

    awaiter = this.<>4__this.btn.GetAwaiter();
    

    也就是说,其实这里调用的方法就是我们上面的一个实现类

    而且,这里传入的action,能够让状态机执行MoveNext

    那么这个action什么时候会被执行呢?

    public class ButtonAwaiter : ICriticalNotifyCompletion
    {
        private Action m_Continuation;
           
        // 这个continuation,会让状态机执行MoveNext
        public void UnsafeOnCompleted(Action continuation)
        {
            // 把MoveNext方法保存下来
            m_Continuation = continuation;
            // 为按钮注册一个点击事件
            m_Btn.onClick.AddListener(OnClickInternal);
        }
    }
    

    我们再具体看这个点击事件

    public class ButtonAwaiter : ICriticalNotifyCompletion
    {
        private Action m_Continuation;
       
        // 点击按钮时
        private void OnClickInternal()
        {
            // 移除监听本身(说明只能执行一次
            m_Btn.onClick.RemoveListener(OnClickInternal);
            // 象征性的设置为完成,其实在这个等待器中,这个属性没有意义
            IsCompleted = true;
            // 调用状态机的MoveNext方法
            m_Continuation?.Invoke();
            m_Continuation = null;
        }
    }
    

    此刻,MoveNext的内容应该是

    // 第二次执行MoveNext
    private sealed class <Start > d__1 : IAsyncStateMachine
    {
        private void MoveNext()
        {
            int num;
            num = this.<>1__state;
            try
            {
                ...
                awaiter.GetResult();
                Debug.Log((object) "After Click Button...");
            }
            catch (Exception exception)
            {
                ...
            }
        }
    }
    

    我们可以添加一些Log,来验证这个顺序

    async void Start()
    {
        Debug.Log($"Current Thread:{Thread.CurrentThread.ManagedThreadId}");
        Debug.Log("Before Click Button...");
        await btn;
        Debug.Log($"After Click Button...");
        Debug.Log($"Current Thread:{Thread.CurrentThread.ManagedThreadId}");
    }
       
    public class ButtonAwaiter : ICriticalNotifyCompletion
    {
        ...
               
        public void GetResult()
        {
            Debug.Log("When GetResult");
            Debug.Log($"Current Thread:{Thread.CurrentThread.ManagedThreadId}");
        }
       
        private void OnClickInternal()
        {
            Debug.Log($"When Click Internal");
            Debug.Log($"Current Thread:{Thread.CurrentThread.ManagedThreadId}");
           ...
        }
    }
    

​ 很明显了,并没有开启多线程(一直都在主线程跑),并且顺序也符合我们的预期

TaskCompleteSource

当然如果每一个类型都需要这么扩展一个Awaiter,那就太麻烦了,当充分理解上述内容后,.NET提供了这么一个偷懒类

比如,我们可以修改上述的Button扩展为

public class test : MonoBehaviour
{
    public Button btn;

    async void Start()
    {
        Debug.Log("Before Click Button...");
        await btn;
        Debug.Log($"After Click Button...");
    }

    private Task<bool> AsyncClick(Button btn)
    {
        var tcs = new TaskCompletionSource<bool>();

        btn.onClick.AddListener((() =>
        {
            tcs.SetResult(true);
        }));

        return tcs.Task;
    }
}

非常之简单、清晰

TaskCompletionSource内部包含了一个Task,我们可以直接返回这个Task,Task是可以直接直接await的,所以我们无须扩展了

但是这个Task什么时候结束呢?

同样的,我们给Button添加了一个点击事件,这个时间会主动结束一个Task

  • SetResult

    class TaskCompletionSource<TResult>
    {
        public void SetResult(TResult result)
        {
            if (!this.TrySetResult(result))
                throw new InvalidOperationException(Environment.GetResourceString("TaskT_TransitionToFinal_AlreadyCompleted"));
        }
          
        public bool TrySetResult(TResult result)
        {
            // 最终还是调用了Task的结束方法
            bool flag = this.m_task.TrySetResult(result);
            return flag;
        }
    }
    

对于弹窗的案例,其实也很简单

public class Test : MonoBehaviour
{
    public TestWnd wnd;
    public Button openWnd;
    
    private void Start()
    {
        openWnd.onClick.AddListener(Open);
    }

    private async void Open()
    {
        var flag = await OpenWnd();
        Debug.Log(flag ? "确定" : "取消");
    }

    private Task<bool> OpenWnd()
    {
        var tcs = new TaskCompletionSource<bool>();
        wnd.Open(tcs);
        return tcs.Task;
    }
}

public class TestWnd : MonoBehaviour
{
    public Button confirm;
    public Button cancel;
    
    private TaskCompletionSource<bool> tcs;

    private void Start()
    {
        confirm.onClick.AddListener((() =>
        {
            tcs.SetResult(true);
            gameObject.SetActive(false);
        }));
        
        cancel.onClick.AddListener((() =>
        {
            tcs.SetResult(false);
            gameObject.SetActive(false);
        }));
    }

    public void Open(TaskCompletionSource<bool> tcs)
    {
        this.tcs = tcs;
        gameObject.SetActive(true);
    }
}

基于TaskCompletionSource,我们可以很简单的扩展一些异步方法,比如

/// <summary>
/// 加载资源(可等待)
/// </summary>
public static Task<T> LoadAssetAsync<T>(string assetName)
{
    var loadAssetTcs = new TaskCompletionSource<T>();
    assetManager.LoadAssetAsync<T>(assetName, (asset) =>
        {
            var source = loadAssetTcs;
            loadAssetTcs = null;
            var tAsset = asset as T;
            if (tAsset != null)
            {
                source.SetResult(tAsset);
            }
            else
            {
                Debug.LogError($"Load asset failure...");
                source.SetException("Error");
            }
        }
    );

    return loadAssetTcs.Task;
}

然后我们就可以很爽的加载资源了,比如

async void Start()
{
    var asset = await LoadAssetAsync<GameObject>("Cube");
    Instantiate(asset);
}

问题分析

整个Await的流程其实比较简单,概括起来就两句话:

  • 生成状态机
  • 把状态机的MoveNext函数注册为回调函数

生成状态机

很明显,C#编译器会为我们new一个状态机,那么会不会导致额外的GC开销呢?

[AsyncStateMachine(typeof(<Main>d__0))]
private static Task Main(string[] args)
{
    <Main>d__0 stateMachine = default(<Main>d__0);
    stateMachine.<>t__builder = AsyncTaskMethodBuilder.Create();
    stateMachine.<>1__state = -1;
    stateMachine.<>t__builder.Start(ref stateMachine);
    return stateMachine.<>t__builder.Task;
}

答案是在Debug模式下会,在Release模式下不会,因为Debug模式下,生成的状态机是一个Class,而Release模式下,生成的状态机是一个Sturct

// Release模式下自动生成的代码
private struct <Main>d__0 : IAsyncStateMachine
    
// Debug模式下自动生成的代码
private sealed class <Main>d__0 : IAsyncStateMachine

注册回调函数

回调函数,是一个Struct结构体里的MoveNext

private struct <Main > d__0 : IAsyncStateMachine
{
    private void MoveNext()
}

观察回调函数的注册过程

第一步,发生在状态机中

private struct <Main > d__0 : IAsyncStateMachine
{
    public AsyncTaskMethodBuilder<> t__builder;

    private void MoveNext()
    {
        awaiter = Program.TestAsync().GetAwaiter();
        // 这里转入了this,状态机是一个Struct
        this.<>t__builder.AwaitUnsafeOnCompleted(ref awaiter, ref this);
    }
}

第二步,在具体的MehtodBuilder中

public struct AsyncTaskMethodBuilder
{
	private Task<VoidTaskResult> m_task;

	public void AwaitUnsafeOnCompleted<TAwaiter, TStateMachine>(ref TAwaiter awaiter, ref TStateMachine stateMachine)
        where TAwaiter : ICriticalNotifyCompletion where TStateMachine : IAsyncStateMachine
	{
        // 中转方法,往下传给了具体的MehtodBuilder
        // 这里传入了我们的状态机,以及一个空的,对应的Task
		AsyncTaskMethodBuilder<VoidTaskResult>.AwaitUnsafeOnCompleted(ref awaiter, ref stateMachine, ref this.m_task);
	}
}

第三步

public struct AsyncTaskMethodBuilder<TResult>
{
    static void AwaitUnsafeOnCompleted<TAwaiter, TStateMachine>(ref TAwaiter,ref TStateMachine, ref Task<TResult>)
        where TAwaiter : ICriticalNotifyCompletion where TStateMachine : IAsyncStateMachine
    {
        // 中转方法,但是注意第二个参数,调用了一个方法GetStateMachineBox
        AsyncTaskMethodBuilder<TResult>.AwaitUnsafeOnCompleted(
            ref awaiter, AsyncTaskMethodBuilder<TResult>.GetStateMachineBox(ref stateMachine, ref taskField));
    }

}

简单看一下这个方法(省略了很多内容)

最终是把stateMachine传给了Task,StateMachine是一个Struct,而Task是一个Class,所以这一步是一定会发生装箱的

所以这个方法叫做GetBox(狗头)

private static IAsyncStateMachineBox GetStateMachineBox<TStateMachine>(ref TStateMachine, ref Task<TResult>) 
    where TStateMachine : IAsyncStateMachine
{
    if (taskField is AsyncStateMachineBox<IAsyncStateMachine> asyncStateMachineBox2)
    {
        if (asyncStateMachineBox2.StateMachine == null)
        {
            asyncStateMachineBox2.StateMachine = stateMachine;
        }
        return asyncStateMachineBox2;
    }
}

UniTask

Add Package From Git Url:https://github.com/Cysharp/UniTask.git?path=src/UniTask/Assets/Plugins/UniTask

  • 高性能,基于结构体的UniTask,使用对象池缓存委托,没有GC
  • 自定义AsyncMethodBuilder,使得Unity中的所有异步操作均可以await
  • 不使用Thread,基于PlayerLoop,支持UniTask.Dealy,UniTask.Yield,UniTask.DelayFrame
  • 支持Addressable、DOTween

性能测试

public class Test : MonoBehaviour
{
    public Button testCoroutine;
    public Button testUniTask;
    public int testNum;

    private void Start()
    {
        testCoroutine.onClick.AddListener((() => { StartCoroutine(CoroutineTest()); }));

        testUniTask.onClick.AddListener((UniTaskTest));
    }

    private IEnumerator CoroutineTest()
    {
        float totalTime = 0;

        for (int i = 0; i < testNum; i++)
        {
            var time = Time.realtimeSinceStartup;
            var coroutine = StartCoroutine(EmptyCoroutine());
            totalTime += Time.realtimeSinceStartup - time;
            yield return coroutine;
        }

        Debug.Log($"开启 {testNum} 次协程的耗时:{totalTime * 1000}");
    }

    private IEnumerator EmptyCoroutine()
    {
        yield return null;
    }

    private async void UniTaskTest()
    {
        float totalTime = 0f;
        for (int i = 0; i < testNum; i++)
        {
            var time = Time.realtimeSinceStartup;
            var task = EmptyUniTask();
            totalTime += Time.realtimeSinceStartup - time;
            await task;
        }

        Debug.Log($"开启 {testNum} 次UniTask的耗时:{totalTime * 1000}");
    }

    private async UniTask EmptyUniTask()
    {
        await UniTask.Yield(PlayerLoopTiming.Update);
    }
}

切换到Release测试环境

image-20230307132733220

测试3000次,结果如图

image-20230307132826274

首次测试时,UniTask比协程快几乎4倍,当我们第二次测试时,UniTask比协程快了接近10倍,推测UniTask应该是有一些懒加载的设定,所以第二次会比第一次更快

基础使用

异步资源加载

var handle = Resources.LoadAsync<GameObject>("Cube");
await handle;
Debug.Log("After Load Cube");

异步网络请求

var request = UnityWebRequestTexture.GetTexture(@"...");
await request.SendWebRequest();
Debug.Log("After Downloading");

Delay

await UniTask.Delay(1000, DelayType.Realtime, PlayerLoopTiming.LastUpdate);
await UniTask.DelayFrame(10);
  • DelayType

    Realtime:真实时间,不受帧率影响

    DeltaTime: 受帧率影响

    UnscaledDeltaTime:忽略时间缩放

  • PlayerLoopTiming

    更新时间的位置

帧等待

等待一帧

await UniTask.NextFrame();
await UniTask.WaitForEndOfFrame(this);
await UniTask.Yield();

条件等待

public class Test : MonoBehaviour
{
    private bool isClickL;

    private bool isClickR;
    
    private async void Start()
    {
        var taskL = UniTask.WaitUntil((() => isClickL));
        var taskR = UniTask.WaitUntil((() => isClickR));
        await UniTask.WhenAll(taskL, taskR);
        Debug.Log("Click All");
    }

    private void Update()
    {
        if (Input.GetMouseButtonDown(0))
        {
            Debug.Log("Click L");
            isClickL = true;
        }

        if (Input.GetMouseButtonDown(1))
        {
            Debug.Log("Click R");
            isClickR = true;
        }
    }
}

image-20230307205050471

对于串联资源加载是很方便的

var task1 = Resources.LoadAsync<GameObject>("Task1").ToUniTask();
var task2 = Resources.LoadAsync<GameObject>("Task2").ToUniTask();
var task3 = Resources.LoadAsync<GameObject>("Task3").ToUniTask();

await UniTask.WhenAll(task1, task2, task3);
Debug.Log("After Load All...");

类似的接口还有:

  • UniTask.WaitWhile
  • UniTask.WhenAny
  • UniTask.WaitUntilValueChanged

进度显示

任何异步操作都可以ToUniTask,然后显示进度

基础用法

var handle = SceneManager.LoadSceneAsync("Test");
handle.ToUniTask(Progress.Create<float>(f =>
{
	Debug.Log($"进度:{f}");
}));
await handle;
Debug.Log("After Loading...");

或者调用方可以实现IProgress接口,则可以省去一部分匿名表达式的GC

public class Test : MonoBehaviour, IProgress<float>
{
    public void Report(float value)
    {
        UnityEngine.Debug.Log(value);
    }

    public async UniTaskVoid WebRequest()
    {
        var request = await UnityWebRequest.Get("https://xn--vckuc.xn--jh1al63br2c.com/auth/login")
            .SendWebRequest()
            .ToUniTask(progress: this);
    }
}

取消

和Task类似,需要一个Token,Token只可以使用一次,使用完成之后就可以Dispose了,再次使用需要重新生成

对于嵌套Task,可以在入口函数传入一个Token,然后所有Task都使用这个Token,就可以做到同时停止

public class Test : MonoBehaviour
{
    // 需要一个Token,在创建任务时传入
    private CancellationTokenSource tokenSource = new CancellationTokenSource();
    
    private async void Start()
    {
        // 需要以异常的形式抛出
        try
        {
            var num = 1;
            while (true)
            {
                Debug.Log(num);
                num++;
                // 传入Token
                await UniTask.Delay(1000, cancellationToken: tokenSource.Token);
            }
        }
        catch (Exception e)
        {
            Debug.LogError("Task停止");
        }
    }

    private void Update()
    {
        if (Input.GetMouseButton(0))
        {
            // 停止任务
            tokenSource.Cancel();
        }
    }
}

image-20230308133421056

针对部分异步操作,有更加简洁的写法

await Resources.LoadAsync<GameObject>("Cube").WithCancellation(tokenSource.Token);

如果是在Mono中的脚本,那还有一个写好的扩展方法,可以直接返回一个Token

await UniTask.Delay(TimeSpan.FromSeconds(1), cancellationToken: this.GetCancellationTokenOnDestroy());

超时处理

超时取消,其实是取消的一个子集,所以可以通过接口CancellationTokenSouce.CancelAfterSlim(TimeSpan),获得超时Token

var cts = new CancellationTokenSource();
// 1秒超时
cts.CancelAfterSlim(TimeSpan.FromSeconds(1));

try
{
    await UnityWebRequest.Get("https://xn--vckuc.xn--jh1al63br2c.com/auth/login").SendWebRequest().WithCancellation(cts.Token);
}
catch (OperationCanceledException ex)
{
    if (ex.CancellationToken == cts.Token)
    {
        Debug.Log("Timeout...");
    }
}

image-20230309132244432

或者使用TimeoutController,可以优化GC分配

TimeoutController timeoutController = new TimeoutController();

try
{
    await UnityWebRequest.Get("https://xn--vckuc.xn--jh1al63br2c.com/auth/login").SendWebRequest()
        .WithCancellation(timeoutController.Timeout(TimeSpan.FromSeconds(5)));
    //成功时需要调用Reset重置计时器
    timeoutController.Reset(); 
}
catch (OperationCanceledException ex)
{
    if (timeoutController.IsTimeout())
    {
        UnityEngine.Debug.Log("Timeout...");
    }
}

串联取消

多个取消条件,可以串联成到一起

var cancelToken = new CancellationTokenSource();
// 点击按钮取消
cancelButton.onClick.AddListener(()=>
{
    cancelToken.Cancel(); 
});

var timeoutToken = new CancellationTokenSource();
// 5秒超时
timeoutToken.CancelAfterSlim(TimeSpan.FromSeconds(5)); 

try
{
    // 组合token
    var linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancelToken.Token, timeoutToken.Token);
    await UnityWebRequest.Get("https://xn--vckuc.xn--jh1al63br2c.com/auth/login")
        .SendWebRequest().WithCancellation(linkedTokenSource.Token);
}
catch (OperationCanceledException ex)
{
    if (timeoutToken.IsCancellationRequested)
    {
        Debug.Log("Timeout...");
    }
    else if (cancelToken.IsCancellationRequested)
    {
        Debug.Log("Cancel Clicked...");
    }
}

关于PlayerLoop

Unity默认一般有下面这几个阶段

public enum PlayerLoopTiming
{
    Initialization = 0,
    LastInitialization = 1,

    EarlyUpdate = 2,
    LastEarlyUpdate = 3,

    FixedUpdate = 4,
    LastFixedUpdate = 5,

    PreUpdate = 6,
    LastPreUpdate = 7,

    Update = 8,
    LastUpdate = 9,

    PreLateUpdate = 10,
    LastPreLateUpdate = 11,

    PostLateUpdate = 12,
    LastPostLateUpdate = 13
    
#if UNITY_2020_2_OR_NEWER
    TimeUpdate = 14,
    LastTimeUpdate = 15,
#endif
}

UniTask的大多数等待任务中,都可以传入PlayerLoopTiming,指任务具体调用的时机

通过PlayerLoop来观察UniTask的执行时机

public class Test : MonoBehaviour
{
    private void Start()
    {
        Register();
    }

    private void Register()
    {
        var playerLoop = PlayerLoop.GetCurrentPlayerLoop();
        var subSys = playerLoop.subSystemList;
        for (int i = 0; i < subSys.Length; i++)
        {
            var index = i;
            subSys[i].updateDelegate += () =>
            {
                Debug.Log($"{subSys[index]} 当前帧{Time.frameCount}");
            };
        }
        PlayerLoop.SetPlayerLoop(playerLoop);
    }
}

image-20230307200119117

NextFrame

Debug.Log("执行NextFrame");
await UniTask.NextFrame();
Debug.Log("结束执行NextFrame");

image-20230307195127480

WaitForEndOfFrame

Debug.Log("执行EndOfFrame");
await UniTask.WaitForEndOfFrame(this);
Debug.Log("结束执行EndOfFrame");

image-20230307195556417

Yield

Debug.Log("执行Yield");
await UniTask.Yield(PlayerLoopTiming.PreUpdate);
Debug.Log("结束执行Yield");

image-20230307200609263

修改为

await UniTask.Yield(PlayerLoopTiming.Update);

image-20230308132339403

部分原理

等待器

如果我们想等待一个Job,那么这个Job肯定是需要一个等待器的,可以看一下部分UniTask为我们提供的等待器

ResourceRequest

// 扩展方法
public static ResourceRequestAwaiter GetAwaiter(this ResourceRequest asyncOperation)
{
    Error.ThrowArgumentNullException(asyncOperation, nameof(asyncOperation));
    return new ResourceRequestAwaiter(asyncOperation);
}

// 等待器,注意这里是struct
public struct ResourceRequestAwaiter : ICriticalNotifyCompletion
{
    ResourceRequest asyncOperation;
    Action<AsyncOperation> continuationAction;

    public ResourceRequestAwaiter(ResourceRequest asyncOperation)
    {
        this.asyncOperation = asyncOperation;
        this.continuationAction = null;
    }

    public bool IsCompleted => asyncOperation.isDone;

    public UnityEngine.Object GetResult()
    {
        if (continuationAction != null)
        {
            asyncOperation.completed -= continuationAction;
            continuationAction = null;
            var result = asyncOperation.asset;
            asyncOperation = null;
            return result;
        }
        else
        {
            var result = asyncOperation.asset;
            asyncOperation = null;
            return result;
        }
    }

    public void OnCompleted(Action continuation)
    {
        UnsafeOnCompleted(continuation);
    }

    public void UnsafeOnCompleted(Action continuation)
    {
        Error.ThrowWhenContinuationIsAlreadyRegistered(continuationAction);
        // 为委托方法设计了一个池对象
        continuationAction = PooledDelegate<AsyncOperation>.Create(continuation);
        // 添加结束事件(当资源加载结束时候,执行MoveNext)
        asyncOperation.completed += continuationAction;
    }
}

UnityWebRequestAsyncOperation

// 扩展方法
public static UnityWebRequestAsyncOperationAwaiter GetAwaiter(this UnityWebRequestAsyncOperation asyncOperation)
{
    Error.ThrowArgumentNullException(asyncOperation, nameof(asyncOperation));
    return new UnityWebRequestAsyncOperationAwaiter(asyncOperation);
}

// 等待器,原理几乎是一样的
public struct UnityWebRequestAsyncOperationAwaiter : ICriticalNotifyCompletion
{
    UnityWebRequestAsyncOperation asyncOperation;
    Action<AsyncOperation> continuationAction;

    public UnityWebRequestAsyncOperationAwaiter(UnityWebRequestAsyncOperation asyncOperation)
    {
        this.asyncOperation = asyncOperation;
        this.continuationAction = null;
    }

    public bool IsCompleted => asyncOperation.isDone;

    public UnityWebRequest GetResult()
    {
        if (continuationAction != null)
        {
            asyncOperation.completed -= continuationAction;
            continuationAction = null;
            var result = asyncOperation.webRequest;
            asyncOperation = null;
            if (result.IsError())
            {
                throw new UnityWebRequestException(result);
            }

            return result;
        }
        else
        {
            var result = asyncOperation.webRequest;
            asyncOperation = null;
            if (result.IsError())
            {
                throw new UnityWebRequestException(result);
            }

            return result;
        }
    }

    public void OnCompleted(Action continuation)
    {
        UnsafeOnCompleted(continuation);
    }

    public void UnsafeOnCompleted(Action continuation)
    {
        Error.ThrowWhenContinuationIsAlreadyRegistered(continuationAction);
        continuationAction = PooledDelegate<AsyncOperation>.Create(continuation);
        asyncOperation.completed += continuationAction;
    }
}

AsyncOperation

AssetBundleRequest

AssetBundleCreateRequest

Delay

以一下代码为例

async void Start()
{
    await UniTask.Delay(1000);
}

看一下底层代码,会根据Delay的参数,返回不同的UniTask

public static UniTask Delay()
{
    ...

    switch (delayType)
    {
        case DelayType.UnscaledDeltaTime:
        {
            return new UniTask(DelayIgnoreTimeScalePromise.Create(), token);
        }
        case DelayType.Realtime:
        {
            return new UniTask(DelayRealtimePromise.Create(), token);
        }
        case DelayType.DeltaTime:
        default:
        {
            return new UniTask(DelayPromise.Create(), token);
        }
    }
}

等待器部分

既然返回的是一个UniTask,那这个类型必然也要求满足await的规定

// Job
public readonly partial struct UniTask
{
    ...
    public Awaiter GetAwaiter()
    {
        // 传入了UniTask自身
    	return new Awaiter(this);
    }
}

对应的等待器

public readonly struct Awaiter : ICriticalNotifyCompletion
{
    readonly UniTask task;

    public Awaiter(in UniTask task)
    {
        this.task = task;
    }

    public bool IsCompleted => task.Status.IsCompleted();
    

    public void GetResult()
    {
        if (task.source == null)
            return;
        task.source.GetResult(task.token);
    }

    public void OnCompleted(Action continuation)
    {
        if (task.source == null)
        {
            continuation();
        }
        else
        {
            task.source.OnCompleted(AwaiterActions.InvokeContinuationDelegate, continuation, task.token);
        }
    }

    public void UnsafeOnCompleted(Action continuation)
    {
        if (task.source == null)
        {
            continuation();
        }
        else
        {
            task.source.OnCompleted(AwaiterActions.InvokeContinuationDelegate, continuation, task.token);
        }
    }
	...
}

和我们自己写的很像,但也有一点区别,从代码可以看出来,主要逻辑其实还是在UniTask里(task)

IUniTaskSource

在上面,我们创建UniTask时候,会借助几个辅助方法,生成一个IUniTaskSource对象

DelayRealtimePromise.Create()为例

public interface IUniTaskSource
{
    ...
    UniTaskStatus GetStatus(short token);
    void OnCompleted(Action<object> continuation, object state, short token);
    void GetResult(short token);
}
class DelayRealtimePromise
{
    public static IUniTaskSource Create(TimeSpan delayTimeSpan, PlayerLoopTiming timing...)
    {
        ...
        // 池对象优化
        if (!pool.TryPop(out var result))
        {
            result = new DelayRealtimePromise();
        }
        // 开启了一个计时器,很关键
        result.stopwatch = ValueStopwatch.StartNew();
        // 设置等待时间
        result.delayTimeSpanTicks = delayTimeSpan.Ticks;
        ...
        // 添加到PlayerLoop里
        PlayerLoopHelper.AddAction(timing, result);
        return result;
    }
}

其实这里的关键代码就只有一行,就是PlayerLoopHelper

PlayerLoopHelper

初始化入口添加了标签RuntimeInitializeOnLoadMethod

说明这个类型一运行代码就会初始化,不需要我们主动调用

public static class PlayerLoopHelper
{
    static PlayerLoopRunner[] runners;
    
    [RuntimeInitializeOnLoadMethod(RuntimeInitializeLoadType.BeforeSceneLoad)]
    static void Init()
    {
        ...
        var playerLoop = PlayerLoop.GetCurrentPlayerLoop();
        Initialize(ref playerLoop);
    }
    
    public static void Initialize(ref PlayerLoopSystem playerLoop)
    {
		...
        // 初始化PlayerLoopRunner
        runners = new PlayerLoopRunner[16];

        var copyList = playerLoop.subSystemList.ToArray();

        // Update
        InsertLoop(copyList, injectTimings, typeof(PlayerLoopType.Update),
            InjectPlayerLoopTimings.Update, 8, true,
            typeof(UniTaskLoopRunners.UniTaskLoopRunnerYieldUpdate), typeof(UniTaskLoopRunners.UniTaskLoopRunnerUpdate),
            PlayerLoopTiming.Update);
        
        ...

        // Insert UniTaskSynchronizationContext to Update loop
        var i = FindLoopSystemIndex(copyList, typeof(PlayerLoopType.Update));
        copyList[i].subSystemList = InsertUniTaskSynchronizationContext(copyList[i]);

        playerLoop.subSystemList = copyList;
        PlayerLoop.SetPlayerLoop(playerLoop);
    }
}

关键是在这一步InserLoop

static void InsertLoop()
{
    var i = FindLoopSystemIndex(copyList, loopType);
    copyList[i].subSystemList = InsertRunner(copyList[i], injectOnFirst,
            loopRunnerYieldType, yielders[index] = new ContinuationQueue(playerLoopTiming),
            loopRunnerType, runners[index] = new PlayerLoopRunner(playerLoopTiming));
}

这里的copyList[i].subSystemList,其实对应的就是Unity里各个类型的系统列表(Update等)

然后我们会往某一个类型的系统列表里,添加UniTask自定义的系统

static PlayerLoopSystem[] InsertRunner(PlayerLoopSystem loopSystem,Type loopRunnerType, PlayerLoopRunner runner)
{
    ...
    var runnerLoop = new PlayerLoopSystem
    {
        type = loopRunnerType,
        updateDelegate = runner.Run
    };
    ...
}

这里主动new了一个系统,并且这个系统的类型和update方法,都是我们主动传入的

PlayerLoopRunner

上面我们的AddAction,其实也是往这个Runner里添加的Action

class PlayerLoopRunner 
{
    // 所有的任务
    IPlayerLoopItem[] loopItems = new IPlayerLoopItem[InitialSize];
    // 一个任务队列
    MinimumQueue<IPlayerLoopItem> waitQueue = new MinimumQueue<IPlayerLoopItem>(InitialSize);
    
    public void AddAction(IPlayerLoopItem item)
    {
        ...
        if (running)
        {
            waitQueue.Enqueue(item);
            return;
        }
        loopItems[tail++] = item;
    }
}

如果当前系统正在运行,那么先把任务添加到队列中,否则直接添加到任务列表末尾

(系统并不是每时每刻都在运行的,不然直接卡死了不是,就和Update方法一样,是每一段时间调用一次的)

然后在看核心运行方法,先看上半部分run

void RunCore()
{
    running = true;

    var j = tail - 1;

    for (int i = 0; i < loopItems.Length; i++)
    {
        var action = loopItems[i];
        if (action != null)
        {
            if (!action.MoveNext())
            {
            	loopItems[i] = null;
            }
            else
            {
            	continue; // next i 
            }
        }
    }

	// 到这里就认为运行结束了
    running = false;
    ...
}

这里的代码很简单,就是遍历所有任务,然后尝试调用MoveNext方法,如果失败说明任务已完成,置NULL

(这个思路是参照协程的思路写的,基本一样)

那么这个MoveNext方法在哪儿呢,其实在我们的IUniTaskSource里,以DelayRealtimePromise为例

sealed class DelayRealtimePromise : IUniTaskSource, IPlayerLoopItem
{
    public bool MoveNext()
    {
        // 如果取消了
        if (cancellationToken.IsCancellationRequested)
        {
            core.TrySetCanceled(cancellationToken);
            return false;
        }

        // 如果异常了
        if (stopwatch.IsInvalid)
        {
            core.TrySetResult(AsyncUnit.Default);
            return false;
        }

        // 如果结束了,这个计时器,是在我们创建方法的时候主动生成的
        if (stopwatch.ElapsedTicks >= delayTimeSpanTicks)
        {
            core.TrySetResult(AsyncUnit.Default);
            return false;
        }

        return true;
    }
}

最终,如果时间到了,那还是运行了我们熟悉的一个老方法TrySetResult表示结束,通知Job执行回调

再接着看RunCore方法的下半部分

void RunCore()
{
    ...
	// 标志运行结束
    running = false;
    while (waitQueue.Count != 0)
    {
        if (loopItems.Length == tail)
        {
            Array.Resize(ref loopItems, checked(tail * 2));
        }

        loopItems[tail++] = waitQueue.Dequeue();
    }
}

很简单,就是把所有任务队列里的人物,添加到任务列表里

自定义的MethodBuilder

上面已经分析过了,C#原生的await,是存在装箱操作的,所以存在GC,而UniTask解决了这个问题

可以发现UniTask是自定义了MethodBuilder的

[AsyncMethodBuilder(typeof(AsyncUniTaskMethodBuilder))]
public readonly partial struct UniTask

观察一下他自己写的这个AsyncUniTaskMethodBuilder

关键部分

public struct AsyncUniTaskMethodBuilder
{
    IStateMachineRunnerPromise runnerPromise;

    public void AwaitUnsafeOnCompleted<TAwaiter, TStateMachine>(ref TAwaiter awaiter, ref TStateMachine stateMachine)
    {
        if (runnerPromise == null)
        {
            // 这里的StateMachine仍然是一个结构体,但是添加了关键字ref
            AsyncUniTask<TStateMachine>.SetStateMachine(ref stateMachine, ref runnerPromise);
        }

        awaiter.UnsafeOnCompleted(runnerPromise.MoveNext);
    }
}

观察方法SetStateMachine

internal sealed class AsyncUniTask<TStateMachine> : IStateMachineRunnerPromise, IUniTaskSource,
{
    static TaskPool<AsyncUniTask<TStateMachine>> pool;

    public static void SetStateMachine(ref TStateMachine stateMachine,
    {
        if (!pool.TryPop(out var result))
        {
            result = new AsyncUniTask<TStateMachine>();
        }

        runnerPromiseFieldRef = result; 
        result.stateMachine = stateMachine; // 通过ref关键字拷贝stateMachine
    }
}

这个方法有两个地方比较重要

  1. 通过池对象缓存Task反复利用
  2. 通过ref关键字拷贝结构体,避免GC