Task与异步编程|01 async/await
引言
异步编程,历史太深,只能非常浅的讲述一些理解
比如以下有趣的代码,Start方法会在await处”被暂停”,但不会阻塞线程,直到我们点击一次Button,才会继续执行
(当然这既不是原生C#,也不是原生Unity,需要我们额外扩展一些方法)
public class Test : MonoBehaviour
{
public Button btn;
async void Start()
{
Debug.Log("Before Click Button...");
await btn;
Debug.Log("After Click Button...");
}
private void Update()
{
if (Input.GetMouseButtonUp(0))
{
Debug.Log("On Mouse Button Up");
}
}
}
再比如另一个常见的需求,打开一个确定/取消弹窗,然后根据结果进行不同的行为,不过一般来说,我们都是通过注册回调函数进行的,但现在我们可以这样
public class Test : MonoBehaviour
{
public TestWnd wnd;
public Button openWnd;
private void Start()
{
openWnd.onClick.AddListener(Open);
}
private async void Open()
{
var flag = await OpenWnd();
Debug.Log(flag ? "确定" : "取消");
}
}
想要彻底了解这个有趣的机制,需要从很远的地方开始说起
ThreadPool
ThreadPool(线程池)是对于Thread的封装,关于Thread我用的不多,感觉API非常厚重,仅从实际体验来说被协程吊起来打,就不再赘述了,原生的Thread的一个明显缺点是线程不可重用,每次使用时都需要创建,代价很大
而线程池不一样,.NET在初始化的时候,会为每一个进程都创建一个线程池,此后的操作都会在这个池子里进行,而且简化了很多API,使用起来非常简单
ThreadPool.QueueUserWorkItem
class Program
{
static void Main(string[] args)
{
Console.WriteLine(1);
// 把这个匿名函数,成为工作项
ThreadPool.QueueUserWorkItem(e =>
{
Console.WriteLine($"当前线程:{Thread.CurrentThread.ManagedThreadId}");
Console.WriteLine(2);
Thread.Sleep(1000);
Console.WriteLine(3);
});
Console.WriteLine($"当前线程:{Thread.CurrentThread.ManagedThreadId}");
Console.WriteLine(4);
Console.ReadKey();
}
}
原理
线程池内部,主要有两块:一个全局队列、多个工作线程
当我们调用ThreadPool.QueueUserWorkItem
时,会把工作项添加到全局队列中,每一个工作线程都处在一个while(true)
循中,会不断的从队列里取工作项执行
每一个工作线程,都是一条独立的线程,但所有的工作线程,都需要访问全局队列,那很明显了,全局队列是一个多线程共享资源,肯定是需要加锁的,那会有一个问题,如果有一些工作项,很小,执行的很快,那会导致工作线程频繁访问全局队列,加锁解锁
因此,实际上,每一个工作线程,内部还包含一个局部队列
ThreadPool.QueueUserWorkItem
有一个重载方法:
ThreadPool.QueueUserWorkItem<TState>(Action<TState> callBack, TState state, bool preferLocal)
注意最后这个bool参数,当调用不带preferLocal
的QueueUserWorkItem
方法时,工作项会默认被放到全局队列中,等待某一个工作线程捡走他
当preferLocal
为ture
时,如果调用线程是线程池内的某一个工作线程,那么会把工作项放到工作线程的局部队列中,否则也会丢到全局队列里去
WorkItem
工作项,是线程池最基本的调度单元,底层类型是object
,实际任务类型一般分为两类
IThreadPoolWorkItem
继承此接口的类型
public interface IThreadPoolWorkItem
{
void Execute();
}
例如通过 ThreadPool.QueueUserWorkItem(WaitCallback callBack)
传入的 callBack 委托实例会被包装到一个QueueUserWorkItemCallback
实例里。QueueUserWorkItemCallback
是 IThreadPoolWorkItem
的实现类
Task
执行 InnerInvoke 会执行 Task 所包含的委托
class Task
{
internal void InnerInvoke();
}
全局队列
全局队列由 ThreadPoolWorkQueue
维护的,它是整个系统的入口,直接被ThreadPool
引用。
public static class ThreadPool
{
internal static readonly ThreadPoolWorkQueue s_workQueue = new ThreadPoolWorkQueue();
public static bool QueueUserWorkItem(WaitCallback callBack, object state)
{
object tpcallBack = new QueueUserWorkItemCallback(callBack!, state);
s_workQueue.Enqueue(tpcallBack, forceGlobal: true);
return true;
}
}
internal sealed class ThreadPoolWorkQueue
{
// 全局队列
internal readonly ConcurrentQueue<object> workItems = new ConcurrentQueue<object>();
// forceGlobal 为 true 时,push 到全局队列,否则就放到本地队列
public void Enqueue(object callback, bool forceGlobal);
}
本地队列
线程池中的每一个线程都会绑定一个 ThreadPoolWorkQueueThreadLocals
实例,在 workStealingQueue 这个字段上保存着本地队列
internal sealed class ThreadPoolWorkQueueThreadLocals
{
// 绑定在线程池
[ThreadStatic]
public static ThreadPoolWorkQueueThreadLocals threadLocals;
// 持有全局队列的引用,以便能在需要的时候将任务转移到全局队列上
public readonly ThreadPoolWorkQueue workQueue;
}
任务偷窃机制
在上面的本地队列结构中,其实还没有看到真正管理本地队列的地方,其实是在这里
internal sealed class ThreadPoolWorkQueueThreadLocals
{
// 偷窃队列,本地队列的真正维护者
public readonly ThreadPoolWorkQueue.WorkStealingQueue workStealingQueue;
public readonly Thread currentThread;
public ThreadPoolWorkQueueThreadLocals(ThreadPoolWorkQueue tpq)
{
...
// WorkStealingQueueList集中管理所有的偷窃队列
ThreadPoolWorkQueue.WorkStealingQueueList.Add(workStealingQueue);
}
// 提供将本地队列中的任务转移到全局队列中去的功能
public void TransferLocalWork()
{
while (workStealingQueue.LocalPop() is object cb)
{
workQueue.Enqueue(cb, forceGlobal: true);
}
}
~ThreadPoolWorkQueueThreadLocals()
{
...
ThreadPoolWorkQueue.WorkStealingQueueList.Remove(workStealingQueue);
}
}
根据代码可以知道,每个任务线程的本地队列,由各自的workStealingQueue
统一管理,而每一个workStealingQueue
,又都会在构造的时候被添加到线程池统一的列表内
换句话说,工作线程A中的工作项,实际上对于其他所有工作线程,都是可见的,这只不过是一个嵌套列表
这导致的好处就是,实际上,工作线程不仅可以从全局队列中获取任务项,甚至还可以从其他工作线程中窃取任务项
如图,Thread1优先从全局队列中拿到了一个任务项,此时全局队列里没有任务了,而Thread3也没事干了,那Thread3就会从Thread2的本地队列里,窃取一个任务过来
生命周期
线程池的底层,最终肯定还是主动创建了Thread,那么它到底是在什么时候创建的呢?
通过断点,可以观察到代码执行路径
QueueUserWorkItem
我们的匿名函数会被封装到一个类QueueUserWorkItemCallback中,然后把这个类入队
public static bool QueueUserWorkItem(WaitCallback callBack, object state)
{
object tpcallBack = new QueueUserWorkItemCallback(callBack!, state);
s_workQueue.Enqueue(tpcallBack, forceGlobal: true);
return true;
}
Enqueue
入队操作
public void Enqueue(object callback, bool forceGlobal)
{
// 线程池中执行的任务只有两种:IThreadPoolWorkItem或者Task
Debug.Assert((callback is IThreadPoolWorkItem) ^ (callback is Task));
ThreadPoolWorkQueueThreadLocals? tl = null;
if (!forceGlobal)
// 获取本地队列,如果执行改代码的线程不是线程池线程,
// 那这边是获取不到的,就算 forceGlobal 是 false,
// 也会把任务放到全局队列
tl = ThreadPoolWorkQueueThreadLocals.threadLocals;
if (null != tl)
{
// 放到本地队列
tl.workStealingQueue.LocalPush(callback);
}
else
{
// 当道全局队列
workItems.Enqueue(callback);
}
EnsureThreadRequested();
}
EnsureThreadRequested
internal void EnsureThreadRequested()
{
// 当前需要执行的任务数量
int count = _separated.numOutstandingThreadRequests;
// 最大数量是处理器的数量
while (count < Environment.ProcessorCount)
{
// CompareExchange(location,value,compare)
// 线程安全方法,比较location和compare,如果相等,则location=value,返回count
// numOutstandingThreadRequests是多线程资源,可能在多个线程被修改
// 目的是确保其他同时只有1个地方在申请线程
int prev = Interlocked.CompareExchange(ref _separated.numOutstandingThreadRequests, count + 1, count);
if (prev == count)
{
ThreadPool.RequestWorkerThread();
break;
}
count = prev;
}
}
RequestWorkerThread
RequestWorker
// 申请一个新的工作项
internal static void RequestWorkerThread() => PortableThreadPool.ThreadPoolInstance.RequestWorker();
internal void RequestWorker()
{
Interlocked.Increment(ref _separated.numRequestedWorkers);
WorkerThread.MaybeAddWorkingWorker(this);
// 初始化 GateThread
GateThread.EnsureRunning(this);
}
MaybeAddWorkingWorker
用于计算需要创建的线程
- NumProcessingWork:当前正在执行任务的 Worker Thread
- NumExistingThreads:当前线程池中实际有的 Worker Thread
- NumThreadsGoal:当前允许创建的最大 Worker Thread,算法更新
internal static void MaybeAddWorkingWorker(PortableThreadPool threadPoolInstance)
{
ThreadCounts counts = threadPoolInstance._separated.counts;
short numExistingThreads, numProcessingWork, newNumExistingThreads, newNumProcessingWork;
// 这个 while (true) 是确保计算出正确的待创建线程数
while (true)
{
// 执行任务的线程数量,不能超过最大可以创建数量
numProcessingWork = counts.NumProcessingWork;
if (numProcessingWork >= counts.NumThreadsGoal)
{
return;
}
// 当前任务数+1
newNumProcessingWork = (short) (numProcessingWork + 1);
// 当前线程数
numExistingThreads = counts.NumExistingThreads;
// 当前线程数 = Max(当前任务数,当前线程数)
newNumExistingThreads = Math.Max(numExistingThreads, newNumProcessingWork);
ThreadCounts newCounts = counts;
newCounts.NumProcessingWork = newNumProcessingWork;
newCounts.NumExistingThreads = newNumExistingThreads;
// 对比新旧线程数
ThreadCounts oldCounts = threadPoolInstance._separated.counts.InterlockedCompareExchange(newCounts, counts);
if (oldCounts == counts)
{
break;
}
counts = oldCounts;
}
int toCreate = newNumExistingThreads - numExistingThreads;
int toRelease = newNumProcessingWork - numProcessingWork;
if (toRelease > 0)
{
s_semaphore.Release(toRelease);
}
while (toCreate > 0)
{
if (TryCreateWorkerThread())
{
toCreate--;
continue;
}
...
}
}
TryCreateWorkerThread
最底层,真正创建线程的地方
private static bool TryCreateWorkerThread()
{
try
{
Thread workerThread = new Thread(s_workerThreadStart);
// 通过这里可以知道,线程池的所有线程都是后台线程,一旦程序停止,所有线程都会停止
workerThread.IsThreadPoolThread = true;
workerThread.IsBackground = true;
workerThread.UnsafeStart();
}
catch (ThreadStartException)
{
return false;
}
catch (OutOfMemoryException)
{
return false;
}
return true;
}
Task
Task是基于ThreadPool的再一次封装,ThreadPool的API非常之简单,创建一个工作项之后就可以丢给线程池去跑了
但是,线程池还是有一些不方便的地方
- 不知道啥时候结束
- 没有返回值
于是Task诞生了
简单使用
几种启动Task的方式
class Program
{
static void Main(string[] args)
{
var task = new Task((() =>
{
Console.WriteLine($"第一个Task:{Thread.CurrentThread.ManagedThreadId}");
}));
task.Start();
Task.Factory.StartNew((() =>
{
Console.WriteLine($"第二个Task:{Thread.CurrentThread.ManagedThreadId}");
}));
Task.Run((() =>
{
Console.WriteLine($"第三个Task:{Thread.CurrentThread.ManagedThreadId}");
}));
Console.ReadKey();
}
}
等待结束,并访问返回值
class Program
{
static void Main(string[] args)
{
var task = Task.Run<string>(() => "Hello World!");
task.ContinueWith(t => Console.WriteLine(t.Result));
Console.ReadKey();
}
}
await关键字,几乎以同步的方式进行异步编程
class Program
{
static async Task Main(string[] args)
{
var task = Task.Run<string>(() => "Hello World!");
var str = await task;
Console.WriteLine(str);
Console.ReadKey();
}
}
生命周期
Task本质还是在线程池里跑的,以Task.Run为例,断点看一下代码的执行过程
Task.Run
public static Task Run(Action action)
{
return Task.InternalStartNew(null, action, null, default, TaskScheduler.Default,
TaskCreationOptions.DenyChildAttach, InternalTaskOptions.None);
}
Task.InternalStartNew
这里可以发现我们的回调被包装成了一个Task
private static readonly TaskScheduler s_defaultTaskScheduler = new ThreadPoolTaskScheduler();
internal static Task InternalStartNew(
Task? creatingTask, Delegate action, object? state, CancellationToken cancellationToken, TaskScheduler scheduler,
TaskCreationOptions options, InternalTaskOptions internalOptions)
{
Task t = new Task(action, state, creatingTask, cancellationToken, options, internalOptions | InternalTaskOptions.QueuedByRuntime, scheduler);
t.ScheduleAndStart(false);
return t;
}
Task.ScheduleAndStart
internal void ScheduleAndStart(bool needsProtection)
{
...
try
{
// 入口
m_taskScheduler.InternalQueueTask(this);
}
catch (Exception e)
{
...
}
}
TaskScheduler.InternalQueueTask
internal void InternalQueueTask(Task task)
{
...
this.QueueTask(task);
}
ThreadPoolTaskScheduler.QueueTask
protected internal override void QueueTask(Task task)
{
TaskCreationOptions options = task.Options;
if (Thread.IsThreadStartSupported && (options & TaskCreationOptions.LongRunning) != 0)
{
// 当 TaskCreationOptions.LongRunning 时
// 此时会直接创建一个线程
// 一般用于长线程(一个很耗时的同步操作)
new Thread(s_longRunningThreadWork)
{
IsBackground = true,
Name = ".NET Long Running Task"
}.UnsafeStart(task);
}
else
{
// 正常走一个线程池
ThreadPool.UnsafeQueueUserWorkItemInternal(task, (options & TaskCreationOptions.PreferFairness) == 0);
}
}
针对长线程,可以用此方法创建:
new TaskFactory().StartNew(() =>
{
Console.WriteLine("Hello World!");
}, TaskCreationOptions.LongRunning);
- LongRunning 长线程
- PreferFairness 全局/本地队列
Task回调
其实Task神奇的地方在于,为什么可以抛出结束事件
static async Task Main(string[] args)
{
await Task.Run(() =>
{
Console.Write("Hello");
}).ContinueWith((task =>
{
Console.WriteLine("Test");
}));
}
Task.ContinueWith
private Task ContinueWith(Action<Task> continuationAction, TaskScheduler scheduler,
CancellationToken cancellationToken, TaskContinuationOptions continuationOptions)
{
CreationOptionsFromContinuationOptions(continuationOptions, out TaskCreationOptions creationOptions, out InternalTaskOptions internalOptions);
// 我们的回调函数,被包装成了一个新的Task,ContinuationTaskFromTask
Task continuationTask = new ContinuationTaskFromTask(
this, continuationAction, null,
creationOptions, internalOptions
);
// 注册Task
ContinueWithCore(continuationTask, scheduler, cancellationToken, continuationOptions);
return continuationTask;
}
Task.ContinueWithCore
internal void ContinueWithCore(Task continuationTask,
TaskScheduler scheduler,
CancellationToken cancellationToken,
TaskContinuationOptions options)
{
// ContinuationTaskFromTask 又被包装成了 ContinueWithTaskContinuation
TaskContinuation continuation = new ContinueWithTaskContinuation(continuationTask, options, scheduler);
...
if (!continuationTask.IsCompleted)
{
// 把回调添加到队里里
bool continuationQueued = AddTaskContinuation(continuation, addBeforeOthers: false);
// 如果添加失败,那说明任务已经同步完成了,此时直接直接执行回调
if (!continuationQueued)
continuation.Run(this, canInlineContinuationTask: true);
}
}
Task.AddTaskContinuation
private bool AddTaskContinuation(object tc, bool addBeforeOthers)
{
if (IsCompleted)
return false;
if ((m_continuationObject != null) || (Interlocked.CompareExchange(ref m_continuationObject, tc, null) != null))
{
return AddTaskContinuationComplex(tc, addBeforeOthers);
}
else return true;
}
Task.AddTaskContinuationComplex
此时可以发现Task里有一个object属性m_continuationObject
这个属性里存着所有的回调
private volatile object? m_continuationObject;
private bool AddTaskContinuationComplex(object tc, bool addBeforeOthers)
{
object? oldValue = m_continuationObject;
...
List<object?>? list = m_continuationObject as List<object?>;
if (list != null)
{
...
list.Add(tc);
...
}
return false;
}
那么这个属性什么时候会被调用呢?
参考.NET官方文档,可以发现这么一个方法
Task.Finish
void Finish(bool bUserDelegateExecuted)
{
...
FinishContinuations()
}
Task.FinishContinuations
internal void FinishContinuations()
{
// Atomically store the fact that this task is completing. From this point on, the adding of continuations will
// result in the continuations being run/launched directly rather than being added to the continuation list.
// Then if we grabbed any continuations, run them.
// 这个任务已经完成,从这个时间点开始,添加任何回调都会直接运行
// 对于任何存在的回调,会依次执行他们
// 这里使用了 m_continuationObject
object? continuationObject = Interlocked.Exchange(ref m_continuationObject, s_taskCompletionSentinel);
if (continuationObject != null)
{
RunContinuations(continuationObject);
}
}
那Finish又是啥地方调用的?
看一个简单的例子
简单看一下堆栈
很明显这是线程池的代码,WorkItem执行完之后,抛出了一个事件,关键是ExecuteWithThreadLocal,进去看看
private void ExecuteWithThreadLocal(ref Task currentTaskSlot)
{
this.Finish(true);
}
所以整个流程就很清楚了
- Start Task : 把Action包装成一个Task,并由线程池执行
-
ContinueWith : 把Action包装成一个ContinuationTaskFromTask,一个Task的子类并添加到目标Task的m_continuationObject中
- WorkItem结束时候会派发事件,Task调用m_continuationObject
Async/Await
基础用法
以一段简短的代码为例
class Program
{
static async Task Main(string[] args)
{
Console.WriteLine("Start");
await TestAsync();
Console.Write(" World");
}
static Task TestAsync()
{
return Task.Run(() =>
{
Console.Write("Hello");
});
}
}
- 方法声明为async
-
返回值可以是void Task Task
- 参数随意,但不能添加ref和out
Await
await是一个语法题,他后接着的表达式,必须是可等待的
假设一个返回类型为T的表达式,他需要是可等待的,那么T必须满足:
-
T必须具备(可以是扩展方法)无参方法GetAwaiter(),此方法返回一个类型A
public struct T { // 一个返回等待器的实例方法 public A GetAwaiter() { return new A(); } }
-
类型A类型必须实现INotifyCompletion或者ICriticalNotifyCompletion接口
public interface INotifyCompletion { void OnCompleted(Action continuation); } public interface ICriticalNotifyCompletion : INotifyCompletion { void UnsafeOnCompleted(Action continuation); }
-
类型A类型必须具有一个可读的实例属性IsCompleted,是bool
// 对外仅可读 public bool IsCompleted { get; private set;}
-
类型A类型必须具有一个非泛型的无参方法GetResult(),表达式的返回值需要和此方法保持一致
public string GetResult() { return "result"; }
我们把一个可等待的对象,称为Job(为了区别于Task)
把他对应的awatier,称为等待器
结合以上信息,我们可以简单实现一个自定义任务
class TestTask
{
private Action continuation;
private string result;
public bool IsCompleted { get; private set; }
public TestTaskAwaiter GetAwaiter() => new TestTaskAwaiter(this);
// 模拟Task
public void Run(Func<string> func)
{
new Thread(() =>
{
var result = func();
// 模拟Task,异步完成之后设置结果
TrySetResult(result);
})
{
IsBackground = true
}.Start();
}
// 模拟Task添加回调
public bool AddContinuation(Action action)
{
if (IsCompleted)
{
return false;
}
continuation += action;
return true;
}
private void TrySetResult(string result)
{
this.result = result;
IsCompleted = true;
continuation?.Invoke();
}
}
然后是对应的等待器,等待器Awatier需要满足上面写的几个要求
class TestTask
{
...
// 1.继承接口
public struct TestTaskAwaiter : INotifyCompletion
{
private readonly TestTask testTask;
// 2. 实现 IsCompleted 属性
public bool IsCompleted => testTask.IsCompleted;
public TestTaskAwaiter(TestTask testTask)
{
this.testTask = testTask;
}
public void OnCompleted(Action continuation)
{
if (testTask.AddContinuation(continuation))
{
Console.WriteLine($"TestTaskAwaiter.OnCompleted1:{DateTime.Now.Second}");
}
else
{
Console.WriteLine($"TestTaskAwaiter.OnCompleted2:{DateTime.Now.Second}");
continuation();
}
}
// 3. 实现 GetResult 方法
public string GetResult()
{
Console.WriteLine("TestTaskAwaiter.GetResult");
return testTask.result;
}
}
}
测试1
class Program
{
private static Action action;
static async Task Main(string[] args)
{
var testTask = new TestTask();
testTask.Run((() =>
{
Console.WriteLine($"Task Start:{DateTime.Now.Second}");
Thread.Sleep(1000);
Console.WriteLine($"Task End:{DateTime.Now.Second}");
return "Result";
}));
testTask.AddContinuation((() =>
{
Console.WriteLine("After Task");
}));
Console.WriteLine($"Main Thread:{DateTime.Now.Second}");
var res = await testTask;
Console.WriteLine($"Main Thread End:{res}");
Console.ReadKey();
}
}
测试2
class Program
{
private static Action action;
static async Task Main(string[] args)
{
var testTask = new TestTask();
testTask.Run((() =>
{
Console.WriteLine($"Task Start:{DateTime.Now.Second}");
return "Result";
}));
testTask.AddContinuation((() =>
{
Console.WriteLine("After Task");
}));
Console.WriteLine($"Main Thread:{DateTime.Now.Second}");
var res = await testTask;
Console.WriteLine($"Main Thread End:{res}");
Console.ReadKey();
}
}
INotifyCompletion VS ICriticalNotifyCompletion
既然这两个接口只要实现其中一个,那么他们有什么区别呢?(也可以两个都实现,但是会优先调用前者)
可以直接参考TaskAwaiter的内部代码
public readonly struct TaskAwaiter<TResult> : ICriticalNotifyCompletion, INotifyCompletion
{
private readonly Task<TResult> m_task;
internal TaskAwaiter(Task<TResult> task)
{
m_task = task;
}
public void OnCompleted(Action continuation)
{
TaskAwaiter.OnCompletedInternal(m_task, continuation, continueOnCapturedContext: true, flowExecutionContext: true);
}
public void UnsafeOnCompleted(Action continuation)
{
TaskAwaiter.OnCompletedInternal(m_task, continuation, continueOnCapturedContext: true, flowExecutionContext: false);
}
internal static void OnCompletedInternal(
Task task,
Action continuation,
bool continueOnCapturedContext,
bool flowExecutionContext)
{
m_task.SetContinuationForAwait(continuation, continueOnCapturedContext, flowExecutionContext);
}
}
可以发现只有一个参数不一样
- flowExecutionContext 是否向后传播ExcutionContext(线程上下文)
语法糖
await语法糖,和Task、Thread、ThreadPool其实并没有直接的关系,await后面可以跟着任何Job,Job >= Task
await等待的是Awaiter,当等待器结束时,会派发结束事件,await等待的就是这个事件
简单考虑来说,await会把await表达式下一行开始的所有代码,都包装成一个类似于Action的东西,等Job收到结束消息时,再触发Job之后的内容
所谓收到结束消息,以上面的例子来说,即
public void Run(Func<string> func)
{
new Thread(() =>
{
var result = func();
// Job结束了,设置结果,触发回调
TrySetResult(result);
})
{
IsBackground = true
}.Start();
}
C#原生的await,仅可以跟着void,或者Task
可以等待Task是因为C#封装好了TaskAwaiter类
如果你想等待一个void类型,那么大多数时候需要自己额外封装
Await Task
以一个简单的例子来说
class Program
{
static async Task Main(string[] args)
{
Console.WriteLine("Start");
await TestAsync();
Console.Write(" World!");
}
static Task TestAsync()
{
return Task.Run(() =>
{
Console.Write("Hello");
});
}
}
用ILSpy反编译Dll后,查看编译器处理过的代码
// 入口函数1
private static void <Main>(string[] args)
{
Main(args).GetAwaiter().GetResult();
}
// 入口函数2
private static Task Main(string[] args)
{
<Main>d__0 stateMachine = new <Main>d__0
{
// 核心
<>t__builder = AsyncTaskMethodBuilder.Create(),
args = args,
<>1__state = -1
};
stateMachine.<>t__builder.Start<<Main>d__0>(ref stateMachine);
return stateMachine.<>t__builder.Task;
}
// 编译器自动生成的类型
private sealed class <Main>d__0 : IAsyncStateMachine
{
// Fields
public int <>1__state;
public AsyncTaskMethodBuilder <>t__builder;
public string[] args;
private TaskAwaiter <>u__1;
// Methods
private void MoveNext() { }
[DebuggerHidden]
private void SetStateMachine(IAsyncStateMachine stateMachine) { }
}
首先,编译器创建了2个Main函数,入口函数被修改了
入口函数1是真正的入口,会调用入口函数2
在入口2,我们会创建一个编译器自动生成的状态机,把函数参数拷贝到状态机内,重置状态机的状态为-1,然后启动状态机
当我们await一个Task时,我们的核心类是AsyncTaskMethodBuilder
,他是整个状态机的入口
public struct AsyncTaskMethodBuilder
{
public static void Start<TStateMachine>(ref TStateMachine stateMachine)
{
...
// 执行状态机
stateMachine.MoveNext();
}
}
private struct <Main > d__0 : IAsyncStateMachine
{
public int <>1__state;
public AsyncTaskMethodBuilder<> t__builder;
// await的部分,会变成TaskAwaiter
private TaskAwaiter<> u__1;
private void MoveNext()
{
int num = <>1__state;
try
{
TaskAwaiter awaiter;
// 初始化的时候,我们的状态被重置为-1,所以肯定会进入if
if (num != 0)
{
// 执行await之前的部分
Console.WriteLine("Start");
// 获取awaiter
awaiter = TestAsync().GetAwaiter();
// 假设这个任务米有立刻完成(一般情况),那么会进入if
if (!awaiter.IsCompleted)
{
// 切换状态为0
num = (<>1__state = 0);
<>u__1 = awaiter;
<>t__builder.AwaitUnsafeOnCompleted(ref awaiter, ref this);
// 直接返回了,不会执行await之后的内容
return;
}
}
else
{
awaiter = <>u__1;
<>u__1 = default(TaskAwaiter);
num = (<>1__state = -1);
}
awaiter.GetResult();
Console.Write(" World");
}
catch (Exception exception)
{
<>1__state = -2;
<>t__builder.SetException(exception);
return;
}
<>
1__state = -2;
// 结束通知
<>t__builder.SetResult();
}
}
很明显关键部分是<>t__builder.AwaitUnsafeOnCompleted(ref awaiter, ref this);
查看.NET官方文档
-
AsyncTaskMethodBuilder.AwaitUnsafeOnCompleted Schedules the state machine to proceed to the next action when the specified awaiter completes. This method can be called from partially trusted code.
当指定的等待程序完成时,调度状态机继续执行下一个操作,也就是当awaiter执行完成之后,再次调用MoveNext
我们继续往里反编译,看看他是怎么等待awaiter执行完成的
// =================
public void AwaitUnsafeOnCompleted<TAwaiter, TStateMachine>()
{
AsyncTaskMethodBuilder<VoidTaskResult>.AwaitUnsafeOnCompleted(ref awaiter, ref stateMachine, ref m_task);
}
// =================
internal static void AwaitUnsafeOnCompleted<TAwaiter, TStateMachine>()
{
...
AwaitUnsafeOnCompleted(ref awaiter, stateMachineBox);
}
// =================
internal static void AwaitUnsafeOnCompleted<TAwaiter>()
{
if (default(TAwaiter) != null && awaiter is ITaskAwaiter)
{
// 1.
TaskAwaiter.UnsafeOnCompletedInternal(...);
return;
}
if (default(TAwaiter) != null && awaiter is IConfiguredTaskAwaiter)
{
// 2.
TaskAwaiter.UnsafeOnCompletedInternal(...);
return;
}
if (default(TAwaiter) != null && awaiter is IStateMachineBoxAwareAwaiter)
{
// 3.
((IStateMachineBoxAwareAwaiter)(object)awaiter).AwaitUnsafeOnCompleted(box);
return;
}
// 4.自定义的awaiter会在这儿
awaiter.UnsafeOnCompleted(box.MoveNextAction);
}
很明显了,会根据传入的awaiter类型,调用他们自身的AwaitUnsafeOnCompleted方法
此时我们是一个TaskAwatier
class TaskAwaiter
{
public readonly struct TaskAwaiter : ICriticalNotifyCompletion, INotifyCompletion, ITaskAwaiter
{
internal static void UnsafeOnCompletedInternal(...)
{
...
task.UnsafeSetContinuationForAwait(...);
}
}
}
最后还是进入了Task,添加成了Task的一个回调方法
class Task
{
internal void UnsafeSetContinuationForAwait(IAsyncStateMachineBox stateMachineBox, bool continueOnCapturedContext)
{
...
// 默认fasle
if (continueOnCapturedContext)
{
AddTaskContinuation(...)
}
if (!AddTaskContinuation(stateMachineBox, addBeforeOthers: false))
{
// 最终如果实在没法添加为Task的回调方法,那么会调用线程池,并且放到局部线程中(preferLocal)
ThreadPool.UnsafeQueueUserWorkItemInternal(stateMachineBox, preferLocal: true);
}
}
}
所以总体的流程大概如图
Await Void
回到本文章最开始的例子,如何await一个Button?
需要进行如下扩展
-
Button类型需要有一个方法(可以是扩展方法)GetAwaiter(),该方法返回一个Button等待器
public static class ButtonEx { public static ButtonAwaiter GetAwaiter(this Button self) => new ButtonAwaiter(self); }
-
Button等待器需要实现一些必须的属性和方法
这是一个干净的Button等待器
public class ButtonAwaiter : ICriticalNotifyCompletion { public bool IsCompleted { get; private set; } public ButtonAwaiter(){} public void GetResult(){} public void OnCompleted(Action continuation){} public void UnsafeOnCompleted(Action continuation){} }
进一步封装之后,应该如下
public class ButtonAwaiter : ICriticalNotifyCompletion { public bool IsCompleted { get; private set; } private Button m_Btn; private Action m_Continuation; public ButtonAwaiter(Button btn) { m_Btn = btn; } public void GetResult() { } public void OnCompleted(Action continuation) { UnsafeOnCompleted(continuation); } public void UnsafeOnCompleted(Action continuation) { m_Continuation = continuation; m_Btn.onClick.AddListener(OnClickInternal); } private void OnClickInternal() { m_Btn.onClick.RemoveListener(OnClickInternal); IsCompleted = true; m_Continuation?.Invoke(); m_Continuation = null; } }
那么这个等待器,究竟做了什么呢?
以开头的简单代码为例
async void Start()
{
Debug.Log("Before Click Button...");
await btn;
Debug.Log($"After Click Button...");
}
反编译的结果如下(进行了一定整理
// 入口1
private void Start()
{
<Start>d__1 stateMachine;
stateMachine = new <Start>d__1();
stateMachine.<>t__builder = AsyncVoidMethodBuilder.Create();
stateMachine.<>4__this = this;
stateMachine.<>1__state = -1;
stateMachine.<>t__builder.Start(ref stateMachine);
}
// 对应的状态机
private sealed class <Start > d__1 : IAsyncStateMachine
{
public int <>1__state;
public AsyncVoidMethodBuilder<> t__builder;
public test<>4__this;
private object <>u__1;
private void MoveNext()
{
int num;
num = this.<>1__state;
try
{
ButtonAwaiter awaiter;
if (num != 0)
{
Debug.Log((object) "Before Click Button...");
awaiter = this.<>4__this.btn.GetAwaiter();
if (!awaiter.IsCompleted)
{
num = (this.<>1__state = 0);
this.<>u__1 = awaiter;
<Start > d__1 stateMachine;
stateMachine = this;
this.<>t__builder.AwaitUnsafeOnCompleted(ref awaiter, ref stateMachine);
return;
}
}
else
{
awaiter = (ButtonAwaiter) this.<>u__1;
this.<>u__1 = null;
num = (this.<>1__state = -1);
}
awaiter.GetResult();
Debug.Log((object) "After Click Button...");
}
catch (Exception exception)
{
this.<>1__state = -2;
this.<>t__builder.SetException(exception);
return;
}
this.<>1__state = -2;
this.<>t__builder.SetResult();
}
}
可以发现,大部分结构和Task的反编译代码是很接近的
但是,有一个很关键的地方不一样,此时的builder是AsyncVoidMethodBuilder
那么自然builder.AwaitUnsafeOnCompleted
方法也肯定产生了改变,我们进一步反编译
class AsyncVoidMethodBuilder
{
public void AwaitUnsafeOnCompleted<TAwaiter, TStateMachine>()
{
try
{
AsyncMethodBuilderCore.MoveNextRunner runnerToInitialize;
runnerToInitialize = null;
Action completionAction;
// 1.获取结束时的action
completionAction = this.m_coreState.GetCompletionAction();
...
// 2.添加结束委托
awaiter.UnsafeOnCompleted(completionAction);
}
catch (Exception exception)
{
AsyncMethodBuilderCore.ThrowAsync(exception, null);
}
}
}
我们一步一步来
-
GetCompletionAction
有两个敌方比较关键,首先是捕获了当前线程的当前上下文环境
其次能很明显的发现,返回值defaultContextAction,其实就是moveNextRunner.Run(初始化时传入了捕获的当前上下文)
class AsyncMethodBuilderCore { internal Action GetCompletionAction() { // 首先获取当前运行上下文 ExecutionContext executionContext = ExecutionContext.FastCapture(); MoveNextRunner moveNextRunner; Action defaultContextAction; if (executionContext != null && executionContext.IsPreAllocatedDefault) { var defaultContextAction = this.m_defaultContextAction; if (defaultContextAction != null) { return defaultContextAction; } moveNextRunner = new MoveNextRunner(executionContext); // 1. defaultContextAction = moveNextRunner.Run; this.m_defaultContextAction = defaultContextAction; } else { moveNextRunner = new MoveNextRunner(executionContext); // 2. defaultContextAction = moveNextRunner.Run; ... } ... return defaultContextAction; } }
-
MoveNextRunner.Run
那我们进去观察一下这个方法,其核心就是用ExecutionContext去跑了一个InvokeMoveNext方法
关于
ExecutionContext.Run
方法,水平优先,很难展开讨论这个总的来说,这个方法可以用指定时刻的一个上下文环境(程序状态),去执行某一个方法
具体观察这个
InvokeMoveNext
方法,是不是很眼熟?没错,他就是编译器自动生成的状态机的MoveNext方法!
internal sealed class MoveNextRunner { internal void Run() { try { // 这里使用的上下文,就是上一步传入进来的 ExecutionContext.Run(InvokeMoveNext,executionContext); return; } finally { this.m_context.Dispose(); } } private static void InvokeMoveNext(object stateMachine) { ((IAsyncStateMachine)stateMachine).MoveNext(); } }
-
awaiter.UnsafeOnCompleted
在上一步,我们拿到了一个执行MoveNext的回调函数,那么这个回调函数又注册到了哪儿里呢?
这里的awaiter,在编译器生成的状态机里,我们可以知道他其实就是ButtonAwaiter
awaiter = this.<>4__this.btn.GetAwaiter();
也就是说,其实这里调用的方法就是我们上面的一个实现类
而且,这里传入的action,能够让状态机执行MoveNext
那么这个action什么时候会被执行呢?
public class ButtonAwaiter : ICriticalNotifyCompletion { private Action m_Continuation; // 这个continuation,会让状态机执行MoveNext public void UnsafeOnCompleted(Action continuation) { // 把MoveNext方法保存下来 m_Continuation = continuation; // 为按钮注册一个点击事件 m_Btn.onClick.AddListener(OnClickInternal); } }
我们再具体看这个点击事件
public class ButtonAwaiter : ICriticalNotifyCompletion { private Action m_Continuation; // 点击按钮时 private void OnClickInternal() { // 移除监听本身(说明只能执行一次 m_Btn.onClick.RemoveListener(OnClickInternal); // 象征性的设置为完成,其实在这个等待器中,这个属性没有意义 IsCompleted = true; // 调用状态机的MoveNext方法 m_Continuation?.Invoke(); m_Continuation = null; } }
此刻,MoveNext的内容应该是
// 第二次执行MoveNext private sealed class <Start > d__1 : IAsyncStateMachine { private void MoveNext() { int num; num = this.<>1__state; try { ... awaiter.GetResult(); Debug.Log((object) "After Click Button..."); } catch (Exception exception) { ... } } }
我们可以添加一些Log,来验证这个顺序
async void Start() { Debug.Log($"Current Thread:{Thread.CurrentThread.ManagedThreadId}"); Debug.Log("Before Click Button..."); await btn; Debug.Log($"After Click Button..."); Debug.Log($"Current Thread:{Thread.CurrentThread.ManagedThreadId}"); } public class ButtonAwaiter : ICriticalNotifyCompletion { ... public void GetResult() { Debug.Log("When GetResult"); Debug.Log($"Current Thread:{Thread.CurrentThread.ManagedThreadId}"); } private void OnClickInternal() { Debug.Log($"When Click Internal"); Debug.Log($"Current Thread:{Thread.CurrentThread.ManagedThreadId}"); ... } }
很明显了,并没有开启多线程(一直都在主线程跑),并且顺序也符合我们的预期
TaskCompleteSource
当然如果每一个类型都需要这么扩展一个Awaiter,那就太麻烦了,当充分理解上述内容后,.NET提供了这么一个偷懒类
比如,我们可以修改上述的Button扩展为
public class test : MonoBehaviour
{
public Button btn;
async void Start()
{
Debug.Log("Before Click Button...");
await btn;
Debug.Log($"After Click Button...");
}
private Task<bool> AsyncClick(Button btn)
{
var tcs = new TaskCompletionSource<bool>();
btn.onClick.AddListener((() =>
{
tcs.SetResult(true);
}));
return tcs.Task;
}
}
非常之简单、清晰
TaskCompletionSource内部包含了一个Task,我们可以直接返回这个Task,Task是可以直接直接await的,所以我们无须扩展了
但是这个Task什么时候结束呢?
同样的,我们给Button添加了一个点击事件,这个时间会主动结束一个Task
-
SetResult
class TaskCompletionSource<TResult> { public void SetResult(TResult result) { if (!this.TrySetResult(result)) throw new InvalidOperationException(Environment.GetResourceString("TaskT_TransitionToFinal_AlreadyCompleted")); } public bool TrySetResult(TResult result) { // 最终还是调用了Task的结束方法 bool flag = this.m_task.TrySetResult(result); return flag; } }
对于弹窗的案例,其实也很简单
public class Test : MonoBehaviour
{
public TestWnd wnd;
public Button openWnd;
private void Start()
{
openWnd.onClick.AddListener(Open);
}
private async void Open()
{
var flag = await OpenWnd();
Debug.Log(flag ? "确定" : "取消");
}
private Task<bool> OpenWnd()
{
var tcs = new TaskCompletionSource<bool>();
wnd.Open(tcs);
return tcs.Task;
}
}
public class TestWnd : MonoBehaviour
{
public Button confirm;
public Button cancel;
private TaskCompletionSource<bool> tcs;
private void Start()
{
confirm.onClick.AddListener((() =>
{
tcs.SetResult(true);
gameObject.SetActive(false);
}));
cancel.onClick.AddListener((() =>
{
tcs.SetResult(false);
gameObject.SetActive(false);
}));
}
public void Open(TaskCompletionSource<bool> tcs)
{
this.tcs = tcs;
gameObject.SetActive(true);
}
}
基于TaskCompletionSource,我们可以很简单的扩展一些异步方法,比如
/// <summary>
/// 加载资源(可等待)
/// </summary>
public static Task<T> LoadAssetAsync<T>(string assetName)
{
var loadAssetTcs = new TaskCompletionSource<T>();
assetManager.LoadAssetAsync<T>(assetName, (asset) =>
{
var source = loadAssetTcs;
loadAssetTcs = null;
var tAsset = asset as T;
if (tAsset != null)
{
source.SetResult(tAsset);
}
else
{
Debug.LogError($"Load asset failure...");
source.SetException("Error");
}
}
);
return loadAssetTcs.Task;
}
然后我们就可以很爽的加载资源了,比如
async void Start()
{
var asset = await LoadAssetAsync<GameObject>("Cube");
Instantiate(asset);
}
问题分析
整个Await的流程其实比较简单,概括起来就两句话:
- 生成状态机
- 把状态机的MoveNext函数注册为回调函数
生成状态机
很明显,C#编译器会为我们new一个状态机,那么会不会导致额外的GC开销呢?
[AsyncStateMachine(typeof(<Main>d__0))]
private static Task Main(string[] args)
{
<Main>d__0 stateMachine = default(<Main>d__0);
stateMachine.<>t__builder = AsyncTaskMethodBuilder.Create();
stateMachine.<>1__state = -1;
stateMachine.<>t__builder.Start(ref stateMachine);
return stateMachine.<>t__builder.Task;
}
答案是在Debug模式下会,在Release模式下不会,因为Debug模式下,生成的状态机是一个Class,而Release模式下,生成的状态机是一个Sturct
// Release模式下自动生成的代码
private struct <Main>d__0 : IAsyncStateMachine
// Debug模式下自动生成的代码
private sealed class <Main>d__0 : IAsyncStateMachine
注册回调函数
回调函数,是一个Struct结构体里的MoveNext
private struct <Main > d__0 : IAsyncStateMachine
{
private void MoveNext()
}
观察回调函数的注册过程
第一步,发生在状态机中
private struct <Main > d__0 : IAsyncStateMachine
{
public AsyncTaskMethodBuilder<> t__builder;
private void MoveNext()
{
awaiter = Program.TestAsync().GetAwaiter();
// 这里转入了this,状态机是一个Struct
this.<>t__builder.AwaitUnsafeOnCompleted(ref awaiter, ref this);
}
}
第二步,在具体的MehtodBuilder中
public struct AsyncTaskMethodBuilder
{
private Task<VoidTaskResult> m_task;
public void AwaitUnsafeOnCompleted<TAwaiter, TStateMachine>(ref TAwaiter awaiter, ref TStateMachine stateMachine)
where TAwaiter : ICriticalNotifyCompletion where TStateMachine : IAsyncStateMachine
{
// 中转方法,往下传给了具体的MehtodBuilder
// 这里传入了我们的状态机,以及一个空的,对应的Task
AsyncTaskMethodBuilder<VoidTaskResult>.AwaitUnsafeOnCompleted(ref awaiter, ref stateMachine, ref this.m_task);
}
}
第三步
public struct AsyncTaskMethodBuilder<TResult>
{
static void AwaitUnsafeOnCompleted<TAwaiter, TStateMachine>(ref TAwaiter,ref TStateMachine, ref Task<TResult>)
where TAwaiter : ICriticalNotifyCompletion where TStateMachine : IAsyncStateMachine
{
// 中转方法,但是注意第二个参数,调用了一个方法GetStateMachineBox
AsyncTaskMethodBuilder<TResult>.AwaitUnsafeOnCompleted(
ref awaiter, AsyncTaskMethodBuilder<TResult>.GetStateMachineBox(ref stateMachine, ref taskField));
}
}
简单看一下这个方法(省略了很多内容)
最终是把stateMachine传给了Task,StateMachine是一个Struct,而Task是一个Class,所以这一步是一定会发生装箱的
所以这个方法叫做GetBox(狗头)
private static IAsyncStateMachineBox GetStateMachineBox<TStateMachine>(ref TStateMachine, ref Task<TResult>)
where TStateMachine : IAsyncStateMachine
{
if (taskField is AsyncStateMachineBox<IAsyncStateMachine> asyncStateMachineBox2)
{
if (asyncStateMachineBox2.StateMachine == null)
{
asyncStateMachineBox2.StateMachine = stateMachine;
}
return asyncStateMachineBox2;
}
}
UniTask
Add Package From Git Url:https://github.com/Cysharp/UniTask.git?path=src/UniTask/Assets/Plugins/UniTask
- 高性能,基于结构体的UniTask,使用对象池缓存委托,没有GC
- 自定义AsyncMethodBuilder,使得Unity中的所有异步操作均可以await
- 不使用Thread,基于PlayerLoop,支持UniTask.Dealy,UniTask.Yield,UniTask.DelayFrame
- 支持Addressable、DOTween
性能测试
public class Test : MonoBehaviour
{
public Button testCoroutine;
public Button testUniTask;
public int testNum;
private void Start()
{
testCoroutine.onClick.AddListener((() => { StartCoroutine(CoroutineTest()); }));
testUniTask.onClick.AddListener((UniTaskTest));
}
private IEnumerator CoroutineTest()
{
float totalTime = 0;
for (int i = 0; i < testNum; i++)
{
var time = Time.realtimeSinceStartup;
var coroutine = StartCoroutine(EmptyCoroutine());
totalTime += Time.realtimeSinceStartup - time;
yield return coroutine;
}
Debug.Log($"开启 {testNum} 次协程的耗时:{totalTime * 1000}");
}
private IEnumerator EmptyCoroutine()
{
yield return null;
}
private async void UniTaskTest()
{
float totalTime = 0f;
for (int i = 0; i < testNum; i++)
{
var time = Time.realtimeSinceStartup;
var task = EmptyUniTask();
totalTime += Time.realtimeSinceStartup - time;
await task;
}
Debug.Log($"开启 {testNum} 次UniTask的耗时:{totalTime * 1000}");
}
private async UniTask EmptyUniTask()
{
await UniTask.Yield(PlayerLoopTiming.Update);
}
}
切换到Release测试环境
测试3000次,结果如图
首次测试时,UniTask比协程快几乎4倍,当我们第二次测试时,UniTask比协程快了接近10倍,推测UniTask应该是有一些懒加载的设定,所以第二次会比第一次更快
基础使用
异步资源加载
var handle = Resources.LoadAsync<GameObject>("Cube");
await handle;
Debug.Log("After Load Cube");
异步网络请求
var request = UnityWebRequestTexture.GetTexture(@"...");
await request.SendWebRequest();
Debug.Log("After Downloading");
Delay
await UniTask.Delay(1000, DelayType.Realtime, PlayerLoopTiming.LastUpdate);
await UniTask.DelayFrame(10);
-
DelayType
Realtime:真实时间,不受帧率影响
DeltaTime: 受帧率影响
UnscaledDeltaTime:忽略时间缩放
-
PlayerLoopTiming
更新时间的位置
帧等待
等待一帧
await UniTask.NextFrame();
await UniTask.WaitForEndOfFrame(this);
await UniTask.Yield();
条件等待
public class Test : MonoBehaviour
{
private bool isClickL;
private bool isClickR;
private async void Start()
{
var taskL = UniTask.WaitUntil((() => isClickL));
var taskR = UniTask.WaitUntil((() => isClickR));
await UniTask.WhenAll(taskL, taskR);
Debug.Log("Click All");
}
private void Update()
{
if (Input.GetMouseButtonDown(0))
{
Debug.Log("Click L");
isClickL = true;
}
if (Input.GetMouseButtonDown(1))
{
Debug.Log("Click R");
isClickR = true;
}
}
}
对于串联资源加载是很方便的
var task1 = Resources.LoadAsync<GameObject>("Task1").ToUniTask();
var task2 = Resources.LoadAsync<GameObject>("Task2").ToUniTask();
var task3 = Resources.LoadAsync<GameObject>("Task3").ToUniTask();
await UniTask.WhenAll(task1, task2, task3);
Debug.Log("After Load All...");
类似的接口还有:
- UniTask.WaitWhile
- UniTask.WhenAny
- UniTask.WaitUntilValueChanged
进度显示
任何异步操作都可以ToUniTask,然后显示进度
基础用法
var handle = SceneManager.LoadSceneAsync("Test");
handle.ToUniTask(Progress.Create<float>(f =>
{
Debug.Log($"进度:{f}");
}));
await handle;
Debug.Log("After Loading...");
或者调用方可以实现IProgress接口,则可以省去一部分匿名表达式的GC
public class Test : MonoBehaviour, IProgress<float>
{
public void Report(float value)
{
UnityEngine.Debug.Log(value);
}
public async UniTaskVoid WebRequest()
{
var request = await UnityWebRequest.Get("https://xn--vckuc.xn--jh1al63br2c.com/auth/login")
.SendWebRequest()
.ToUniTask(progress: this);
}
}
取消
和Task类似,需要一个Token,Token只可以使用一次,使用完成之后就可以Dispose了,再次使用需要重新生成
对于嵌套Task,可以在入口函数传入一个Token,然后所有Task都使用这个Token,就可以做到同时停止
public class Test : MonoBehaviour
{
// 需要一个Token,在创建任务时传入
private CancellationTokenSource tokenSource = new CancellationTokenSource();
private async void Start()
{
// 需要以异常的形式抛出
try
{
var num = 1;
while (true)
{
Debug.Log(num);
num++;
// 传入Token
await UniTask.Delay(1000, cancellationToken: tokenSource.Token);
}
}
catch (Exception e)
{
Debug.LogError("Task停止");
}
}
private void Update()
{
if (Input.GetMouseButton(0))
{
// 停止任务
tokenSource.Cancel();
}
}
}
针对部分异步操作,有更加简洁的写法
await Resources.LoadAsync<GameObject>("Cube").WithCancellation(tokenSource.Token);
如果是在Mono中的脚本,那还有一个写好的扩展方法,可以直接返回一个Token
await UniTask.Delay(TimeSpan.FromSeconds(1), cancellationToken: this.GetCancellationTokenOnDestroy());
超时处理
超时取消,其实是取消的一个子集,所以可以通过接口CancellationTokenSouce.CancelAfterSlim(TimeSpan),获得超时Token
var cts = new CancellationTokenSource();
// 1秒超时
cts.CancelAfterSlim(TimeSpan.FromSeconds(1));
try
{
await UnityWebRequest.Get("https://xn--vckuc.xn--jh1al63br2c.com/auth/login").SendWebRequest().WithCancellation(cts.Token);
}
catch (OperationCanceledException ex)
{
if (ex.CancellationToken == cts.Token)
{
Debug.Log("Timeout...");
}
}
或者使用TimeoutController,可以优化GC分配
TimeoutController timeoutController = new TimeoutController();
try
{
await UnityWebRequest.Get("https://xn--vckuc.xn--jh1al63br2c.com/auth/login").SendWebRequest()
.WithCancellation(timeoutController.Timeout(TimeSpan.FromSeconds(5)));
//成功时需要调用Reset重置计时器
timeoutController.Reset();
}
catch (OperationCanceledException ex)
{
if (timeoutController.IsTimeout())
{
UnityEngine.Debug.Log("Timeout...");
}
}
串联取消
多个取消条件,可以串联成到一起
var cancelToken = new CancellationTokenSource();
// 点击按钮取消
cancelButton.onClick.AddListener(()=>
{
cancelToken.Cancel();
});
var timeoutToken = new CancellationTokenSource();
// 5秒超时
timeoutToken.CancelAfterSlim(TimeSpan.FromSeconds(5));
try
{
// 组合token
var linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancelToken.Token, timeoutToken.Token);
await UnityWebRequest.Get("https://xn--vckuc.xn--jh1al63br2c.com/auth/login")
.SendWebRequest().WithCancellation(linkedTokenSource.Token);
}
catch (OperationCanceledException ex)
{
if (timeoutToken.IsCancellationRequested)
{
Debug.Log("Timeout...");
}
else if (cancelToken.IsCancellationRequested)
{
Debug.Log("Cancel Clicked...");
}
}
关于PlayerLoop
Unity默认一般有下面这几个阶段
public enum PlayerLoopTiming
{
Initialization = 0,
LastInitialization = 1,
EarlyUpdate = 2,
LastEarlyUpdate = 3,
FixedUpdate = 4,
LastFixedUpdate = 5,
PreUpdate = 6,
LastPreUpdate = 7,
Update = 8,
LastUpdate = 9,
PreLateUpdate = 10,
LastPreLateUpdate = 11,
PostLateUpdate = 12,
LastPostLateUpdate = 13
#if UNITY_2020_2_OR_NEWER
TimeUpdate = 14,
LastTimeUpdate = 15,
#endif
}
UniTask的大多数等待任务中,都可以传入PlayerLoopTiming,指任务具体调用的时机
通过PlayerLoop来观察UniTask的执行时机
public class Test : MonoBehaviour
{
private void Start()
{
Register();
}
private void Register()
{
var playerLoop = PlayerLoop.GetCurrentPlayerLoop();
var subSys = playerLoop.subSystemList;
for (int i = 0; i < subSys.Length; i++)
{
var index = i;
subSys[i].updateDelegate += () =>
{
Debug.Log($"{subSys[index]} 当前帧{Time.frameCount}");
};
}
PlayerLoop.SetPlayerLoop(playerLoop);
}
}
NextFrame
Debug.Log("执行NextFrame");
await UniTask.NextFrame();
Debug.Log("结束执行NextFrame");
WaitForEndOfFrame
Debug.Log("执行EndOfFrame");
await UniTask.WaitForEndOfFrame(this);
Debug.Log("结束执行EndOfFrame");
Yield
Debug.Log("执行Yield");
await UniTask.Yield(PlayerLoopTiming.PreUpdate);
Debug.Log("结束执行Yield");
修改为
await UniTask.Yield(PlayerLoopTiming.Update);
部分原理
等待器
如果我们想等待一个Job,那么这个Job肯定是需要一个等待器的,可以看一下部分UniTask为我们提供的等待器
ResourceRequest
// 扩展方法
public static ResourceRequestAwaiter GetAwaiter(this ResourceRequest asyncOperation)
{
Error.ThrowArgumentNullException(asyncOperation, nameof(asyncOperation));
return new ResourceRequestAwaiter(asyncOperation);
}
// 等待器,注意这里是struct
public struct ResourceRequestAwaiter : ICriticalNotifyCompletion
{
ResourceRequest asyncOperation;
Action<AsyncOperation> continuationAction;
public ResourceRequestAwaiter(ResourceRequest asyncOperation)
{
this.asyncOperation = asyncOperation;
this.continuationAction = null;
}
public bool IsCompleted => asyncOperation.isDone;
public UnityEngine.Object GetResult()
{
if (continuationAction != null)
{
asyncOperation.completed -= continuationAction;
continuationAction = null;
var result = asyncOperation.asset;
asyncOperation = null;
return result;
}
else
{
var result = asyncOperation.asset;
asyncOperation = null;
return result;
}
}
public void OnCompleted(Action continuation)
{
UnsafeOnCompleted(continuation);
}
public void UnsafeOnCompleted(Action continuation)
{
Error.ThrowWhenContinuationIsAlreadyRegistered(continuationAction);
// 为委托方法设计了一个池对象
continuationAction = PooledDelegate<AsyncOperation>.Create(continuation);
// 添加结束事件(当资源加载结束时候,执行MoveNext)
asyncOperation.completed += continuationAction;
}
}
UnityWebRequestAsyncOperation
// 扩展方法
public static UnityWebRequestAsyncOperationAwaiter GetAwaiter(this UnityWebRequestAsyncOperation asyncOperation)
{
Error.ThrowArgumentNullException(asyncOperation, nameof(asyncOperation));
return new UnityWebRequestAsyncOperationAwaiter(asyncOperation);
}
// 等待器,原理几乎是一样的
public struct UnityWebRequestAsyncOperationAwaiter : ICriticalNotifyCompletion
{
UnityWebRequestAsyncOperation asyncOperation;
Action<AsyncOperation> continuationAction;
public UnityWebRequestAsyncOperationAwaiter(UnityWebRequestAsyncOperation asyncOperation)
{
this.asyncOperation = asyncOperation;
this.continuationAction = null;
}
public bool IsCompleted => asyncOperation.isDone;
public UnityWebRequest GetResult()
{
if (continuationAction != null)
{
asyncOperation.completed -= continuationAction;
continuationAction = null;
var result = asyncOperation.webRequest;
asyncOperation = null;
if (result.IsError())
{
throw new UnityWebRequestException(result);
}
return result;
}
else
{
var result = asyncOperation.webRequest;
asyncOperation = null;
if (result.IsError())
{
throw new UnityWebRequestException(result);
}
return result;
}
}
public void OnCompleted(Action continuation)
{
UnsafeOnCompleted(continuation);
}
public void UnsafeOnCompleted(Action continuation)
{
Error.ThrowWhenContinuationIsAlreadyRegistered(continuationAction);
continuationAction = PooledDelegate<AsyncOperation>.Create(continuation);
asyncOperation.completed += continuationAction;
}
}
AsyncOperation
略
AssetBundleRequest
略
AssetBundleCreateRequest
略
Delay
以一下代码为例
async void Start()
{
await UniTask.Delay(1000);
}
看一下底层代码,会根据Delay的参数,返回不同的UniTask
public static UniTask Delay()
{
...
switch (delayType)
{
case DelayType.UnscaledDeltaTime:
{
return new UniTask(DelayIgnoreTimeScalePromise.Create(), token);
}
case DelayType.Realtime:
{
return new UniTask(DelayRealtimePromise.Create(), token);
}
case DelayType.DeltaTime:
default:
{
return new UniTask(DelayPromise.Create(), token);
}
}
}
等待器部分
既然返回的是一个UniTask,那这个类型必然也要求满足await的规定
// Job
public readonly partial struct UniTask
{
...
public Awaiter GetAwaiter()
{
// 传入了UniTask自身
return new Awaiter(this);
}
}
对应的等待器
public readonly struct Awaiter : ICriticalNotifyCompletion
{
readonly UniTask task;
public Awaiter(in UniTask task)
{
this.task = task;
}
public bool IsCompleted => task.Status.IsCompleted();
public void GetResult()
{
if (task.source == null)
return;
task.source.GetResult(task.token);
}
public void OnCompleted(Action continuation)
{
if (task.source == null)
{
continuation();
}
else
{
task.source.OnCompleted(AwaiterActions.InvokeContinuationDelegate, continuation, task.token);
}
}
public void UnsafeOnCompleted(Action continuation)
{
if (task.source == null)
{
continuation();
}
else
{
task.source.OnCompleted(AwaiterActions.InvokeContinuationDelegate, continuation, task.token);
}
}
...
}
和我们自己写的很像,但也有一点区别,从代码可以看出来,主要逻辑其实还是在UniTask里(task)
IUniTaskSource
在上面,我们创建UniTask时候,会借助几个辅助方法,生成一个IUniTaskSource对象
以DelayRealtimePromise.Create()
为例
public interface IUniTaskSource
{
...
UniTaskStatus GetStatus(short token);
void OnCompleted(Action<object> continuation, object state, short token);
void GetResult(short token);
}
class DelayRealtimePromise
{
public static IUniTaskSource Create(TimeSpan delayTimeSpan, PlayerLoopTiming timing...)
{
...
// 池对象优化
if (!pool.TryPop(out var result))
{
result = new DelayRealtimePromise();
}
// 开启了一个计时器,很关键
result.stopwatch = ValueStopwatch.StartNew();
// 设置等待时间
result.delayTimeSpanTicks = delayTimeSpan.Ticks;
...
// 添加到PlayerLoop里
PlayerLoopHelper.AddAction(timing, result);
return result;
}
}
其实这里的关键代码就只有一行,就是PlayerLoopHelper
PlayerLoopHelper
初始化入口添加了标签RuntimeInitializeOnLoadMethod
说明这个类型一运行代码就会初始化,不需要我们主动调用
public static class PlayerLoopHelper
{
static PlayerLoopRunner[] runners;
[RuntimeInitializeOnLoadMethod(RuntimeInitializeLoadType.BeforeSceneLoad)]
static void Init()
{
...
var playerLoop = PlayerLoop.GetCurrentPlayerLoop();
Initialize(ref playerLoop);
}
public static void Initialize(ref PlayerLoopSystem playerLoop)
{
...
// 初始化PlayerLoopRunner
runners = new PlayerLoopRunner[16];
var copyList = playerLoop.subSystemList.ToArray();
// Update
InsertLoop(copyList, injectTimings, typeof(PlayerLoopType.Update),
InjectPlayerLoopTimings.Update, 8, true,
typeof(UniTaskLoopRunners.UniTaskLoopRunnerYieldUpdate), typeof(UniTaskLoopRunners.UniTaskLoopRunnerUpdate),
PlayerLoopTiming.Update);
...
// Insert UniTaskSynchronizationContext to Update loop
var i = FindLoopSystemIndex(copyList, typeof(PlayerLoopType.Update));
copyList[i].subSystemList = InsertUniTaskSynchronizationContext(copyList[i]);
playerLoop.subSystemList = copyList;
PlayerLoop.SetPlayerLoop(playerLoop);
}
}
关键是在这一步InserLoop
static void InsertLoop()
{
var i = FindLoopSystemIndex(copyList, loopType);
copyList[i].subSystemList = InsertRunner(copyList[i], injectOnFirst,
loopRunnerYieldType, yielders[index] = new ContinuationQueue(playerLoopTiming),
loopRunnerType, runners[index] = new PlayerLoopRunner(playerLoopTiming));
}
这里的copyList[i].subSystemList
,其实对应的就是Unity里各个类型的系统列表(Update等)
然后我们会往某一个类型的系统列表里,添加UniTask自定义的系统
static PlayerLoopSystem[] InsertRunner(PlayerLoopSystem loopSystem,Type loopRunnerType, PlayerLoopRunner runner)
{
...
var runnerLoop = new PlayerLoopSystem
{
type = loopRunnerType,
updateDelegate = runner.Run
};
...
}
这里主动new了一个系统,并且这个系统的类型和update方法,都是我们主动传入的
PlayerLoopRunner
上面我们的AddAction,其实也是往这个Runner里添加的Action
class PlayerLoopRunner
{
// 所有的任务
IPlayerLoopItem[] loopItems = new IPlayerLoopItem[InitialSize];
// 一个任务队列
MinimumQueue<IPlayerLoopItem> waitQueue = new MinimumQueue<IPlayerLoopItem>(InitialSize);
public void AddAction(IPlayerLoopItem item)
{
...
if (running)
{
waitQueue.Enqueue(item);
return;
}
loopItems[tail++] = item;
}
}
如果当前系统正在运行,那么先把任务添加到队列中,否则直接添加到任务列表末尾
(系统并不是每时每刻都在运行的,不然直接卡死了不是,就和Update方法一样,是每一段时间调用一次的)
然后在看核心运行方法,先看上半部分run
void RunCore()
{
running = true;
var j = tail - 1;
for (int i = 0; i < loopItems.Length; i++)
{
var action = loopItems[i];
if (action != null)
{
if (!action.MoveNext())
{
loopItems[i] = null;
}
else
{
continue; // next i
}
}
}
// 到这里就认为运行结束了
running = false;
...
}
这里的代码很简单,就是遍历所有任务,然后尝试调用MoveNext方法,如果失败说明任务已完成,置NULL
(这个思路是参照协程的思路写的,基本一样)
那么这个MoveNext方法在哪儿呢,其实在我们的IUniTaskSource里,以DelayRealtimePromise
为例
sealed class DelayRealtimePromise : IUniTaskSource, IPlayerLoopItem
{
public bool MoveNext()
{
// 如果取消了
if (cancellationToken.IsCancellationRequested)
{
core.TrySetCanceled(cancellationToken);
return false;
}
// 如果异常了
if (stopwatch.IsInvalid)
{
core.TrySetResult(AsyncUnit.Default);
return false;
}
// 如果结束了,这个计时器,是在我们创建方法的时候主动生成的
if (stopwatch.ElapsedTicks >= delayTimeSpanTicks)
{
core.TrySetResult(AsyncUnit.Default);
return false;
}
return true;
}
}
最终,如果时间到了,那还是运行了我们熟悉的一个老方法TrySetResult
表示结束,通知Job执行回调
再接着看RunCore方法的下半部分
void RunCore()
{
...
// 标志运行结束
running = false;
while (waitQueue.Count != 0)
{
if (loopItems.Length == tail)
{
Array.Resize(ref loopItems, checked(tail * 2));
}
loopItems[tail++] = waitQueue.Dequeue();
}
}
很简单,就是把所有任务队列里的人物,添加到任务列表里
自定义的MethodBuilder
上面已经分析过了,C#原生的await,是存在装箱操作的,所以存在GC,而UniTask解决了这个问题
可以发现UniTask是自定义了MethodBuilder的
[AsyncMethodBuilder(typeof(AsyncUniTaskMethodBuilder))]
public readonly partial struct UniTask
观察一下他自己写的这个AsyncUniTaskMethodBuilder
关键部分
public struct AsyncUniTaskMethodBuilder
{
IStateMachineRunnerPromise runnerPromise;
public void AwaitUnsafeOnCompleted<TAwaiter, TStateMachine>(ref TAwaiter awaiter, ref TStateMachine stateMachine)
{
if (runnerPromise == null)
{
// 这里的StateMachine仍然是一个结构体,但是添加了关键字ref
AsyncUniTask<TStateMachine>.SetStateMachine(ref stateMachine, ref runnerPromise);
}
awaiter.UnsafeOnCompleted(runnerPromise.MoveNext);
}
}
观察方法SetStateMachine
internal sealed class AsyncUniTask<TStateMachine> : IStateMachineRunnerPromise, IUniTaskSource,
{
static TaskPool<AsyncUniTask<TStateMachine>> pool;
public static void SetStateMachine(ref TStateMachine stateMachine,
{
if (!pool.TryPop(out var result))
{
result = new AsyncUniTask<TStateMachine>();
}
runnerPromiseFieldRef = result;
result.stateMachine = stateMachine; // 通过ref关键字拷贝stateMachine
}
}
这个方法有两个地方比较重要
- 通过池对象缓存Task反复利用
- 通过ref关键字拷贝结构体,避免GC