本文转载自微信公众号「老王Plus」,作者老王Plus的老王 。转载本文请联系老王Plus公众号。
今天用一个简单例子说说异步的多路径终止。我尽可能写得容易理解吧,但今天的内容需要有一定的编程能力。
今天这个话题,来自于最近对gRPC的一些技术研究。
话题本身跟gRPC没有太大关系。应用中,我用到了全双工数据管道这样一个相对复杂的概念。
我们知道,全双工连接是两个节点之间的连接,但不是简单的“请求-响应”连接。任何一个节点都可以在任何时间发送消息。概念上,还是有客户端和服务端的区分,但这仅仅是概念上,只是为了区分谁在监听连接尝试,谁在建立连接。实际上,做一个双工的API比做一个“请求-响应”式的API要复杂得多。
由此,延伸出了另一个想法:做个类库,在库内部构建双工管道,供给消费者时,只暴露简单的内容和熟悉的方式。
一、开始
假设我们有这样一个API:
接口代码可以写成这样:
- interface ITransport<TRequest, TResponse> : IAsyncDisposable
- {
- ValueTask SendAsync(TRequest request, CancellationToken cancellationToken);
- ValueTask<(bool Success, TResponse Message)> TryReceiveAsync(CancellationToken cancellationToken);
- }
忽略连接的部分,代码看起来并不复杂。
下面,我们创建两个循环,并通过枚举器公开数据:
- ITransport<TRequest, TResponse> transport;
- public async IAsyncEnumerable<TResponse> ReceiveAsync([EnumeratorCancellation] CancellationToken cancellationToken)
- {
- while (true)
- {
- var (success, message) =
- await transport.TryReceiveAsync(cancellationToken);
- if (!success) break;
- yield return message;
- }
- }
- public async ValueTask SendAsync(IAsyncEnumerable<TRequest> data, CancellationToken cancellationToken)
- {
- await foreach (var message in data.WithCancellation(cancellationToken))
- {
- await transport.SendAsync(message, cancellationToken);
- }
- }
这里面用到了异步迭代器相关的概念。如果不明白,可以去看我的另一篇专门讨论异步迭代器的文章,【传送门】。
二、解决终止标志
好像做好了,我们用循环接收和发送,并传递了外部的终止标志给这两个方法。
真的做好了吗?
还没有。问题出在终止标志上。我们没有考虑到这两个流是相互依赖的,特别是,我们不希望生产者(使用SendAsync的代码)在任何连接失败的场景中仍然运行。
实际上,会有比我们想像中更多的终止路径:
这只是一些可能的例子,但实际的可能会更多。
本质上,这些都表示连接终止,因此我们需要以某种方式包含所有这些场景,进而允许发送和接收路径之间传达问题。换句话说,我们需要自己的CancellationTokenSource。
显然,这种需求,用库来解决是比较完美的。我们可以把这些复杂的内容放在一个消费者可以访问的单一API中:
- public IAsyncEnumerable<TResponse> Duplex(IAsyncEnumerable<TRequest> request, CancellationToken cancellationToken = default);
这个方法:
使用时,我们可以这样做:
- await foreach (MyResponse item in client.Duplex(ProducerAsync()))
- {
- // ... todo
- }
- async IAsyncEnumerable<MyRequest> ProducerAsync([EnumeratorCancellation] CancellationToken cancellationToken = default)
- {
- for (int i = 0; i < 100; i++)
- {
- yield return new MyRequest(i);
- await Task.Delay(100, cancellationToken);
- }
- }
上面这段代码中,我们ProducerAsync还没有实现太多内容,目前只是传递了一个占位符。稍后我们可以枚举它,而枚举行为实际上调用了代码。
回到Duplex。这个方法,至少需要考虑两种不同的终止方式:
这儿,为什么不是之前列出的更多种终止方式呢?这儿要考虑到编译器的组合方式。我们需要的不是一个CancellationToken,而是一个CancellationTokenSource。
- public IAsyncEnumerable<TResponse> Duplex(IAsyncEnumerable<TRequest> request, CancellationToken cancellationToken = default) => DuplexImpl(transport, request, cancellationToken);
- private async static IAsyncEnumerable<TResponse> DuplexImpl(ITransport<TRequest, TResponse> transport, IAsyncEnumerable<TRequest> request, CancellationToken externalToken, [EnumeratorCancellation] CancellationToken enumeratorToken = default)
- {
- using var allDone = CancellationTokenSource.CreateLinkedTokenSource(externalToken, enumeratorToken);
- // ... todo
- }
这里,DuplexImpl方法允许枚举终止,但又与外部终止标记保持分离。这样,在编译器层面不会被合并。在里面,CreateLinkedTokenSource反倒像编译器的处理。
现在,我们有一个CancellationTokenSource,需要时,我们可能通过它来终止循环的运行。
- using var allDone = CancellationTokenSource.CreateLinkedTokenSource(externalToken, enumeratorToken);
- try
- {
- // ... todo
- }
- finally
- {
- allDone.Cancel();
- }
通过这种方式,我们可以处理这样的场景:消费者没有获取所有数据,而我们想要触发allDone,但是我们退出了DuplexImpl。这时候,迭代器的作用就很大了,它让程序变得更简单,因为用了using,最终里面的任何内容都会定位到Dispose/DisposeAsync。
下一个是生产者,也就是SendAsync。它也是双工的,对传入的消息没有影响,所以可以用Task.Run作为一个独立的代码路径开始运行,而如果生产者出现错误,则终止发送。上边的todo部分,可以加入:
- var send = Task.Run(async () =>
- {
- try
- {
- await foreach (var message in request.WithCancellation(allDone.Token))
- {
- await transport.SendAsync(message, allDone.Token);
- }
- }
- catch
- {
- allDone.Cancel();
- throw;
- }
- }, allDone.Token);
- // ... todo: receive
- await send;
这里启动了一个生产者的并行操作SendAsync。注意,这里我们用标记allDone.Token把组合的终止标记传递给生产者。延迟await是为了允许ProducerAsync方法里可以使用终止令牌,以满足复合双工操作的生命周期要求。
这样,接收代码就变成了:
- while (true)
- {
- var (success, message) = await transport.TryReceiveAsync(allDone.Token);
- if (!success) break;
- yield return message;
- }
- allDone.Cancel();
最后,把这部分代码合在一起看看:
- private async static IAsyncEnumerable<TResponse> DuplexImpl(ITransport<TRequest, TResponse> transport, IAsyncEnumerable<TRequest> request, CancellationToken externalToken, [EnumeratorCancellation] CancellationToken enumeratorToken = default)
- {
- using var allDone = CancellationTokenSource.CreateLinkedTokenSource(externalToken, enumeratorToken);
- try
- {
- var send = Task.Run(async () =>
- {
- try
- {
- await foreach (var message in request.WithCancellation(allDone.Token))
- {
- await transport.SendAsync(message, allDone.Token);
- }
- }
- catch
- {
- allDone.Cancel();
- throw;
- }
- }, allDone.Token);
- while (true)
- {
- var (success, message) = await transport.TryReceiveAsync(allDone.Token);
- if (!success) break;
- yield return message;
- }
- allDone.Cancel();
- await send;
- }
- finally
- {
- allDone.Cancel();
- }
- }
三、总结
相关的处理就这么多。这里实现的关键点是:
最后多说一点,关于ConfigureAwait(false):
默认情况下,await包含一个对SynchronizationContext.Current的检查。除了表示额外的上下文切换之外,在UI应用程序的情况下,它也意味着在UI线程上运行不需要在UI线程上运行的代码。库代码通常不需要这样做。因此,在库代码中,通常应该在所有用到await的地方使用. configureawait (false)来绕过这个检查。而在一般应用程序的代码中,应该默认只使用await而不使用ConfigureAwait,除非你知道你在做什么。
2021年3月24日,主题为《数据的世界,世界的数据》的星环科技2021春季新品发布会...
在Python语言中有如下3种方法: 成员方法 类方法(classmethod) 静态方法(staticm...
摘要 元旦期间 订单业务线 告知 推送系统 无法正常收发消息,作为推送系统维护者...
从 10.0.0 版开始,异步迭代器就出现在 Node 中了,在本文中,我们将讨论异步迭...
建站 什么 虚拟主机 够用?这要看搭建的是什么类型的网站。比如个人博客类型的网...
本文整理自直播《Hologres 数据导入/导出实践-王华峰(继儒)》 视频链接: https:/...
【51CTO.com快译】 数据可视化工具不断发展,提供更强大的功能,同时改善可访问...
Docker生成新镜像版本的两种方式 There are two ways Docker can generate new m...
信息化2.0时代提出开展智慧教育创新发展行动。2019年2月,中共中央、国务院印发...
前提条件 请您在购买前确保已完成注册和充值。详细操作请参见 如何注册公有云管...