
.net core Task的学习理解(上)
Thread 线程的生命周期
计算机系统在某一个时刻,当只有一个CPU工作时,它只执行一个进程中的一个线程,但是用户在使用计算机时回打开多个进行,这样就涉及到了线程的不同状态,一个进程可以创建多个线程。
当一个cpu从一个线程调用另一个线程的时候,需要将Thread1线程的状态保存到PCB1中,然后加载PCB2中Thread2的状态并调用Thread2的线程,当时间片用尽的时候再保存状态到PCB2中。…
当线程有很多的时候,CPU会不停的切换上下文,此时就会非常消耗CPU性能。所以线程池就出现了。
ThreadPool 线程池
线程池里面通过设置好线程的数量,可以将空闲的线程进行回收利用。可以通过preferLocal:false
把任务放到线程池的(global queue)全局队列中,利用空闲的线程执行该任务;也可以通过preferLocal:true
将放入线程中本地队列进行执行。
线程池内部有两种队列,global queue和每个worker thread绑定的local queue。
Work item 的存储类型是Object,实际被 worker thread 执行的时候会判断其类型执行对应的入口方法。
work item 类型可能是Task也可能是IThreadPoolWorkItem。
当全局任务队列里面的任务也没有的时候,算法将会把其他有任务的线程中的任务,分配给这些空闲的线程去执行。其目的是每个线程都有事干。
ThreadPoolTaskScheduler
我们先举个例子。Task.Run是我们常用的调用方式,这里我们通过委托的方式去获取它默认的调度器与委托,可以看到它默认的调度器是ThreadPoolTaskScheduler。
static void Main(string[] args)
{
Console.WriteLine(Thread.CurrentThread.ManagedThreadId);
Task.Run(() => {
Console.WriteLine(Thread.CurrentThread.ManagedThreadId);
Console.WriteLine(TaskScheduler.Current.GetType());
});
Console.ReadLine();
}
ThreadPoolTaskScheduler是在ThreadPool之上的一层,它确定一般模式下会将该任务(Task)对象分配给线程池(ThreadPool)的全局队列中,然后将其分配到哪个线程(Thread)的本地队列中进行执行。如果设置为LongRunning模式将不会把任务分配到线程池中,执行由下图所示。
Task并不是线程,而是一个可以封装执行过程的一个对象实体。比如上一个案例中我们就是封装了一个委托。
线程与协程
并行计算的协程
可以多个任务通过不同的线程执行进行计算。
比如下列例子:通过task1与task2不同的线程计算最后求积。
static async Task Main(string[] args)
{
Task<int> task1 = Task.Run<int>(()=> { Console.WriteLine($"task1 finish [Thread]:{Thread.CurrentThread.ManagedThreadId}"); return 1; });
Task<int> task2 = Task.Run<int>(()=> { Console.WriteLine($"task2 finish [Thread]:{Thread.CurrentThread.ManagedThreadId}"); return 2; });
Text((await task1) * (await task2));
Console.ReadLine();
}
static void Text(int i)
{
Console.WriteLine(i);
}
# 这样也可以
await Task.WhenAll(task1, task2);
在实际运用场景中我们可以计算工资啊,账单结算啊….
如果不支持Main的异步,需要在配置文件中添加这几行。
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'">
<LangVersion>7.1</LangVersion>
</PropertyGroup>
协程与异步IO
Task1发起请求然后自己的工作完成了,接着由IO等待线程接力,获取完所有IO后,由Task2进行处理。
我们通过请求百度之前,发现是由线程1处理该过程,IO获取完毕后是由线程8进行处理的。
static async Task Main(string[] args)
{
await downloadtask();
Console.ReadLine();
}
static async Task downloadtask()
{
using (var client = new HttpClient())
{
Console.WriteLine($"Before IO [Thread]:{Thread.CurrentThread.ManagedThreadId}");
Console.WriteLine($"Before IO [Thread]:{Thread.CurrentThread.ManagedThreadId}");
Console.WriteLine($"Before IO [Thread]:{Thread.CurrentThread.ManagedThreadId}");
var result = await client.GetStringAsync("https://www.baidu.com/");
Console.WriteLine($"After IO [Thread]:{Thread.CurrentThread.ManagedThreadId}");
Console.WriteLine($"After IO [Thread]:{Thread.CurrentThread.ManagedThreadId}");
Console.WriteLine($"After IO [Thread]:{Thread.CurrentThread.ManagedThreadId}");
Console.WriteLine($"After IO [Thread]:{Thread.CurrentThread.ManagedThreadId}");
}
}
await
与线程的切换没有半点关系。
本地队列与全局队列
我们通过编写如下代码,将任务放到本地队列中,并且执行5次,看看执行结果。
for (int i = 0; i < 5; i++)
{
int b = i;
Task.Run(() => Console.WriteLine($"Task{b} ThreadId:{Thread.CurrentThread.ManagedThreadId}"))
.ContinueWith(_ => Console.WriteLine($"continuationAction{b} ThreadId:{Thread.CurrentThread.ManagedThreadId}"));
Task.Delay(2000);
}
我发现Task.Run与ContinueWith的线程Id是同一个线程,在运行Task4的时候由于线程7空闲了,直接将Task4的任务执行了。
接着我们修改部分代码看看结果。
Task.Run(() => Console.WriteLine($"Task1 ThreadId:{Thread.CurrentThread.ManagedThreadId}"))
.ContinueWith(_ => Console.WriteLine($"continuationAction1 ThreadId:{Thread.CurrentThread.ManagedThreadId}"));
Task.Run(() => Console.WriteLine($"Task2 ThreadId:{Thread.CurrentThread.ManagedThreadId}"))
.ContinueWith(_ => Console.WriteLine($"continuationAction2 ThreadId:{Thread.CurrentThread.ManagedThreadId}"), TaskContinuationOptions.RunContinuationsAsynchronously);
Console.ReadLine();
很奇怪为什么不一样?这是因为Task1刚执行完,看到Task2还在执行,线程4就没事做了就去task2后面的continuationAction2任务做了。线程6发现自己没事做然后就去task1里面的continuationAction1拿过来做了。
自定义TaskScheduler
并不是所有的Task运行时都会到线程池(ThreadPool)里面去,这取决于它的TaskScheduler是哪一个;要想指定哪一个TaskScheduler需要通过TaskFactory来进行指定。下面我们通过编写自定义的TaskSchedule来进行测试。
class CustomTaskScheduler : TaskScheduler
{
/// <summary>
/// 定义一个线程安全的Task集合
/// </summary>
private BlockingCollection<Task> _queue = new BlockingCollection<Task>();
public CustomTaskScheduler() =>
new Thread(() =>
{
while (true)
{
// 从队列中取出一个
var task = _queue.Take();
// 进行执行
TryExecuteTask(task);
Console.WriteLine($"task {task.Id} Executed");
}
})
{
// 定义为后台
IsBackground = true
}.Start();
/// <summary>
/// 获取所有Task
/// </summary>
/// <returns></returns>
protected override IEnumerable<Task> GetScheduledTasks() => _queue.ToArray();
/// <summary>
/// 添加Task
/// </summary>
/// <param name="task"></param>
protected override void QueueTask(Task task) => _queue.Add(task);
/// <summary>
/// 确定提供的Task是否可以同步执行
/// </summary>
/// <param name="task"></param>
/// <param name="taskWasPreviouslyQueued"></param>
/// <returns></returns>
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) => false;
}
static void Main(string[] args)
{
var taskFactory = new TaskFactory(new CustomTaskScheduler());
taskFactory.StartNew(() => Console.WriteLine($"task {Task.CurrentId}" + $"threadId:{Thread.CurrentThread.ManagedThreadId}"));
taskFactory.StartNew(() => Console.WriteLine($"task {Task.CurrentId}" + $"threadId:{Thread.CurrentThread.ManagedThreadId}"));
Console.ReadLine();
}
有限元状态机(finite-state machine)
有限元状态机主要解决下列几个问题:
1.原来的线程去了哪里?
2.await表达式的返回值是怎么回事?
3.await之前的变量是如何被await之后的代码获取的?
4.await之后的任务在哪执行?
5.为什么有时候线程没有发生切换?
FooAsync里面的await Task.Delay(100);
会编译成如下代码。而FooStateMachine
就是状态机。(3)关于它的变量被编译成字段。并且初始化状态机状态为-1
,接着开始执行状态机里面的内容。而MoveNext方法里面的内容就是执行的主要部分,(4)其中需要执行的代码也在该方法里面。
接着我们执行MoveNext里面的内容。一开始状态是-1
所以我们执行_state!=0
这个判断下面的内容,然后通过需要执行的任务(Task)中获取GetAwaiter
去进行判断是否执行完成。
如果执行完成将直接到GetResult
方法获取结果然后返回该结果,执行结束。
如果taskAwaiter没有执行完成,状态将会改为0
,并且通过异步的方式等待完成,封装并且将再次回调MoveNext方法(想在什么地方回调就在什么地方回调)。是AwaitUnsafeOnCompleted
的主要工作。
当第二次进行执行的时候会执行else下面的内容,(2)通过taskAwaiter去获取结果,不管返回值需不需要结果都会去调用GetResult
方法。
最后将通过SetResult
放上结果值进行返回。
整个执行流程图
Task内部结构
其核心的接口有两个INotifyCompletion
与ICriticalNotifyCompletion
。其中OnCompleted方法用来传递执行上下文,当实现ICriticalNotifyCompletion
接口时就会去调用UnsafeOnCompleted这个方法。
自定义Awaiter
可以按照该结构去进行自定义一个Awaiter
public class CustomAwaiter<T> : ICriticalNotifyCompletion
{
private Action _continuation;
private T _result;
private Exception _exception;
public bool IsCompleted { get; private set; }
public void OnCompleted(Action continuation)
{
Console.WriteLine("CustomAwaiter.OnCompleted"+$"continuation:{continuation.Target.GetType().Name}.{continuation.Method}");
_continuation = continuation;
}
public void UnsafeOnCompleted(Action continuation)
{
Console.WriteLine("CustomAwaiter.UnsafeOnCompleted" + $"continuation:{continuation.Target.GetType().Name}.{continuation.Method}");
_continuation = continuation;
}
public T GetResult()
{
Console.WriteLine("CustomAwaiter.GetResult");
if (_exception != null) throw _exception;
return _result;
}
public void SetResult(T result)
{
Console.WriteLine("CustomAwaiter.SetResult");
IsCompleted = true;
_result = result;
_continuation.Invoke();
}
public void SetException(Exception exception)
{
Console.WriteLine("CustomAwaiter.SetException");
IsCompleted = true;
_exception = exception;
_continuation.Invoke();
}
}
public class CustomAwaitable<T>
{
private CustomAwaiter<T> _awaiter = new CustomAwaiter<T>();
public CustomAwaiter<T> GetAwaiter()
{
Console.WriteLine("CustomAwaitable.GetAwaiter");
return _awaiter;
}
}
测试执行代码
static async Task Main(string[] args)
{
var foo = await Foo();
Console.WriteLine(foo);
}
static CustomAwaitable<string> Foo()
{
Console.WriteLine("Foo");
var awaitable = new CustomAwaitable<string>();
var awaiter = awaitable.GetAwaiter();
new Thread(() =>
{
Thread.Sleep(100);
awaiter.SetResult("Hello World!");
})
{
IsBackground = false
}.Start();
return awaitable;
}
如果运气好,执行如下
运气不好,就会报错。报错原因是因为SetResult
在UnsafeOnCompleted
回调未完成的之前执行了,_continuation
字段的委托并没有MoveNext方法。(大佬建议不使用)
我做了一些修改可能会导致性能不太佳,但勉勉强强能避开这个问题。
public class CustomAwaiter<T> : ICriticalNotifyCompletion
{
private Action _continuation;
private T _result;
private Exception _exception;
AutoResetEvent autoEvent = new AutoResetEvent(false);
public bool IsCompleted { get; private set; }
public void OnCompleted(Action continuation)
{
Console.WriteLine("CustomAwaiter.OnCompleted"+$"continuation:{continuation.Target.GetType().Name}.{continuation.Method}");
_continuation = continuation;
}
public void UnsafeOnCompleted(Action continuation)
{
Console.WriteLine("CustomAwaiter.UnsafeOnCompleted" + $"continuation:{continuation.Target.GetType().Name}.{continuation.Method} {Thread.CurrentThread.ManagedThreadId}");
_continuation = continuation;
autoEvent.Set();
}
public T GetResult()
{
Console.WriteLine("CustomAwaiter.GetResult");
if (_exception != null) throw _exception;
return _result;
}
public void SetResult(T result)
{
Console.WriteLine($"CustomAwaiter.SetResult {Thread.CurrentThread.ManagedThreadId}");
IsCompleted = true;
_result = result;
if (_continuation == null)
{
autoEvent.WaitOne();
}
_continuation.Invoke();
}
public void SetException(Exception exception)
{
Console.WriteLine("CustomAwaiter.SetException");
IsCompleted = true;
_exception = exception;
_continuation.Invoke();
}
}
当我们运行的程序可能会报错时,Task的异常信息里不会有GetResult的StackFrame,这里我们自己的程序是会报错的。
因为在Task的GetResult的方法里面会添加StackTraceHidden
特性。
static async Task Main(string[] args)
{
var foo = await Foo();
Console.WriteLine(foo);
}
static CustomAwaitable<string> Foo()
{
Console.WriteLine("Foo");
var awaitable = new CustomAwaitable<string>();
var awaiter = awaitable.GetAwaiter();
new Thread(() =>
{
try
{
Thread.Sleep(100);
Console.WriteLine($"child Thread {Thread.CurrentThread.ManagedThreadId}");
throw new Exception("Hello Bug!");
}
catch (Exception ex)
{
awaiter.SetException(ex);
}
})
{
IsBackground = false
}.Start();
return awaitable;
}
async的支持
虽然实现了await的方法但是并没有实现async的支持,所以这里async并不会认识我们定义的CustomAwaiter
,相比async Task方法的编译结果,我们少了个AsyncTaskMethodBuilder
。
AsyncTaskMethodBuilder
关于AsyncMethodBuilder的结构如下图所示。
如果要我们去实现async的支持的话,需要我们去定义自己的AsyncMethodBuilder
。
public struct CustomAsyncMethodBuilder<T>
{
private CustomAwaiter<T> _awaiter;
private CustomAwaitable<T> _awaitable;
/// <summary>
/// 创建CustomAsyncMethodBuilder
/// </summary>
/// <returns></returns>
public static CustomAsyncMethodBuilder<T> Create()
{
Console.WriteLine("CustomAsyncMethodBuilder.Create");
var awaitable = new CustomAwaitable<T>();
var builder = new CustomAsyncMethodBuilder<T>
{
_awaitable = awaitable,
_awaiter = awaitable.GetAwaiter()
};
return builder;
}
/// <summary>
/// CustomAsyncMethodBuilder 的 任务
/// </summary>
public CustomAwaitable<T> Task
{
get
{
Console.WriteLine("CustomAsyncMethodBuilder.Task");
return _awaitable;
}
}
public void SetException(Exception exception)
{
Console.WriteLine("CustomAsyncMethodBuilder.SetException");
_awaiter.SetException(exception);
}
public void SetResult(T result)
{
Console.WriteLine("CustomAsyncMethodBuilder.SetResult");
_awaiter.SetResult(result);
}
/// <summary>
/// 调用OnCompleted方法
/// </summary>
/// <typeparam name="TAwaiter"></typeparam>
/// <typeparam name="TStateMachine"></typeparam>
/// <param name="awaiter"></param>
/// <param name="stateMachine"></param>
public void AwaitOnCompleted<TAwaiter, TStateMachine>(ref TAwaiter awaiter,ref TStateMachine stateMachine)
where TAwaiter : INotifyCompletion
where TStateMachine : IAsyncStateMachine
{
Console.WriteLine("CustomAsyncMethodBuilder.AwaitOnCompleted");
_awaiter.OnCompleted(stateMachine.MoveNext);
}
/// <summary>
/// 调用AwaitUnsafeOnCompleted方法
/// </summary>
/// <typeparam name="TAwaiter"></typeparam>
/// <typeparam name="TStateMachine"></typeparam>
/// <param name="awaiter"></param>
/// <param name="stateMachine"></param>
/// 将类型或成员标识为安全关键型,并可由透明代码安全访问
[SecuritySafeCritical]
public void AwaitUnsafeOnCompleted<TAwaiter, TStateMachine>(ref TAwaiter awaiter, ref TStateMachine stateMachine)
where TAwaiter : INotifyCompletion
where TStateMachine : IAsyncStateMachine
{
Console.WriteLine("CustomAsyncMethodBuilder.AwaitUnsafeOnCompleted");
// var iscompleted = bool.Parse(awaiter.GetType().GetProperty("IsCompleted").GetValue(awaiter).ToString());
_awaiter.UnsafeOnCompleted(stateMachine.MoveNext);
}
public void Start<TStateMachine>(ref TStateMachine stateMachine)
where TStateMachine:IAsyncStateMachine
{
Console.WriteLine("CustomAsyncMethodBuilder.Start");
stateMachine.MoveNext();
}
public void SetStateMachine(IAsyncStateMachine stateMachine)
{
Console.WriteLine("CustomAsyncMethodBuilder.SetStateMachine");
}
}
最后还需要在CustomAwaitable
上添加自定义的CustomAsyncMethodBuilder
特性
[AsyncMethodBuilder(typeof(CustomAsyncMethodBuilder<>))]
public class CustomAwaitable<T>
{
private CustomAwaiter<T> _awaiter = new CustomAwaiter<T>();
public CustomAwaiter<T> GetAwaiter()
{
Console.WriteLine("CustomAwaitable.GetAwaiter");
return _awaiter;
}
}
最后CustomAwaiter
也做了一些修改
public class CustomAwaiter<T> : ICriticalNotifyCompletion
{
private Action _continuation;
private T _result;
private Exception _exception;
// AutoResetEvent autoEvent = new AutoResetEvent(false);
public bool IsCompleted { get; private set; }
public void OnCompleted(Action continuation)
{
Console.WriteLine("CustomAwaiter.OnCompleted"+$"continuation:{continuation.Target.GetType().Name}.{continuation.Method}");
_continuation = continuation;
}
public void UnsafeOnCompleted(Action continuation)
{
Console.WriteLine("CustomAwaiter.UnsafeOnCompleted" + $"continuation:{continuation.Target.GetType().Name}.{continuation.Method} {Thread.CurrentThread.ManagedThreadId}");
_continuation = continuation;
_continuation.Invoke();
// autoEvent.Set();
}
public T GetResult()
{
Console.WriteLine("CustomAwaiter.GetResult");
if (_exception != null) throw _exception;
return _result;
}
public void SetResult(T result)
{
Console.WriteLine($"CustomAwaiter.SetResult {Thread.CurrentThread.ManagedThreadId}");
IsCompleted = true;
_result = result;
}
public void SetException(Exception exception)
{
Console.WriteLine("CustomAwaiter.SetException");
IsCompleted = true;
_exception = exception;
}
}
接着我们测试一下
static async Task Main(string[] args)
{
var boo = await Boo();
Console.WriteLine(boo);
Console.ReadLine();
}
static async CustomAwaitable<string> Boo()
{
await Task.Run(() => Console.WriteLine("Finish"));
string str = "bob";
return str;
}
欢迎加群讨论技术,1群:677373950(满了,可以加,但通过不了),2群:656732739


剑轩
这......么.....长