笔者在上周参加阿里云开发者大会时 特别注意到一个现象就是Serverless这个概念被反复提及 其受关注程度提升明显 笔者仔细看了一下 Serverless的核心理念就是函数式计算 开发者不需要再关注具体的模块 云上部署的粒度变成了程序函数 自动伸缩、扩容等工作完全由云服务负责 能够想象Serverless必将在未来引领时代潮流。
Serverless?Computing 即”无服务器计算” 其实这一概念在刚刚提出的时候并没有获得太多的关注 直到2014年AWS Lambda这一里程碑式的产品出现。Serverless算是正式走进了云计算的舞台。2018年5月 Google在KubeCon CloudNative 2018期间开源了gVisor容器沙箱运行时并分享了它的设计理念和原则。随后2018年的Google Next大会上Google推出了自己的 Google Serverless平台 —— gVisor。同年AWS又放了颗大炮仗-Firecracker 这是一款基于Rust语言编写的安全沙箱基础组件 用于函数计算服务Lambda和托管的容器服务。而值得注意的是Google也并没有死守自己一手缔造的Go语言平台 而是选择了Go与Rust的模式 据说Google在Rust方面也开始招兵买马 也要用Rust重写之前基于Go编写的Serverless平台。
笔者写本文的初衷 其实就是要回答为什么在这个高并发大行其道的时代 以性能著称的C语言和以安全高效闻名的Java都不香了呢
高并发模式初探在这个高并发时代最重要的设计模式无疑是生产者、消费者模式 比如著名的消息队列kafka其实就是一个生产者消费者模式的典型实现。其实生产者消费者问题 也就是有限缓冲问题 可以用以下场景进行简要描述 生产者生成一定量的产品放到库房 并不断重复此过程 与此同时 消费者也在缓冲区消耗这些数据 但由于库房大小有限 所以生产者和消费者之间步调协调 生产者不会在库房满的情况放入端口 消费者也不会在库房空时消耗数据。详见下图
而如果在生产者与消费者之间完美协调并保持高效 这就是高并发要解决的本质问题。
? C语言的高并发案例? ? ?笔者在前文《这位创造了Github冠军项目的老男人 堪称10倍程序员本尊》曾经介绍过TDEngine的相关代码 其中Sheduler模块的相关调度算法就使用了生产、消费者模式进行消息传递功能的实现 也就是有多个生产者(producer)生成并不断向队列中传递消息 也有多个消费者 consumer 不断从队列中取消息。
后面我们也会说明类型功能在Go、Java等高级语言中类似的功能已经被封装好了 但是在C语言中你就必须要用好互斥体 mutex 和信号量 semaphore 并协调他们之间的关系。由于C语言的实现是最复杂的 先来看结构体设计和他的注释
??
? ?再来看Shceduler初始化函数 这里需要特别说明的是 两个信号量的创建 其中emptySem是队列的可写状态 初始化时其值为queueSize 即初始时队列可写 可接受消息长度为队列长度 fullSem是队列的可读状态 初始化时其值为0 即初始时队列不可读。具体代码及我的注释如下
?void *taosInitScheduler(int queueSize, int numOfThreads, char *label) ??{?
?????pthread_attr_t???attr;?
???SSchedQueue * ?pSched (SSchedQueue *)??malloc??(??sizeof??(SSchedQueue));?
???
?????memset??(pSched, ??0??, ??sizeof??(SSchedQueue));?
???pSched- queueSize queueSize;?
???pSched- numOfThreads numOfThreads;?
?????strcpy??(pSched- label, label);?
???
?????if???(pthread_mutex_init( pSched- queueMutex, ??NULL??) ??0??) {?
?????pError(?? init %s:queueMutex failed, reason:%s ??, pSched- label, strerror(errno));?
???????goto???_error;?
???}?
??????//emptySem是队列的可写状态 初始化时其值为queueSize 即初始时队列可写 可接受消息长度为队列长度。?
?????if???(sem_init( pSched- emptySem, ??0??, (??unsigned?????int??)pSched- queueSize) ! ??0??) {?
?????pError(?? init %s:empty semaphore failed, reason:%s ??, pSched- label, strerror(errno));?
???????goto???_error;?
???}?
????//fullSem是队列的可读状态 初始化时其值为0 即初始时队列不可读?
?????if???(sem_init( pSched- fullSem, ??0??, ??0??) ! ??0??) {?
?????pError(?? init %s:full semaphore failed, reason:%s ??, pSched- label, strerror(errno));?
???????goto???_error;?
???}?
???
?????if???((pSched- ??queue??? (SSchedMsg *)??malloc??((??size_t??)pSched- queueSize * ??sizeof??(SSchedMsg))) ??NULL??) {?
?????pError(?? %s: no enough memory for queue, reason:%s ??, pSched- label, strerror(errno));?
???????goto???_error;?
???}?
???
?????memset??(pSched- ??queue??, ??0??, (??size_t??)pSched- queueSize * ??sizeof??(SSchedMsg));?
???pSched- fullSlot ??0??;??//实始化时队列为空 故队头和队尾的位置都是0?
???pSched- emptySlot ??0??;??//实始化时队列为空 故队头和队尾的位置都是0?
???
???pSched- qthread ??malloc??(??sizeof??(??pthread_t??) * (??size_t??)pSched- numOfThreads);?
???
???pthread_attr_init( attr);?
???pthread_attr_setdetachstate( attr, PTHREAD_CREATE_JOINABLE);?
???
?????for???(??int???i ??0??; i pSched- numOfThreads; i) {?
???????if???(pthread_create(pSched- qthread i, attr, taosProcessSchedQueue, (??void???*)pSched) ! ??0??) {?
???????pError(?? %s: failed to create rpc thread, reason:%s ??, pSched- label, strerror(errno));?
?????????goto???_error;?
?????}?
???}?
???
???pTrace(?? %s scheduler is initialized, numOfThreads:%d ??, pSched- label, pSched- numOfThreads);?
???
?????return???(??void???*)pSched;?
???
?_error:?
???taosCleanUpScheduler(pSched);?
?????return?????NULL??;?
?}?
?
再来看读消息的taosProcessSchedQueue函数这其实是消费者一方的实现 这个函数的主要逻辑是
1.使用无限循环 只要队列可读即sem_wait( pSched- fullSem)不再阻塞就继续向下处理
2.在操作msg前 加入互斥体防止msg被误用。
3.读操作完毕后修改fullSlot的值 注意这为避免fullSlot溢出 需要对于queueSize取余。同时退出互斥体。
4.对emptySem进行post操作 即把emptySem的值加1 如emptySem原值为5 读取一个消息后 emptySem的值为6 即可写状态 且能接受的消息数量为6
?具体代码及注释如下 ?
?void *taosProcessSchedQueue(void *param) ??{?
???SSchedMsg ???msg;?
???SSchedQueue *pSched (SSchedQueue *)param;?
????//注意这里是个无限循环 只要队列可读即sem_wait( pSched- fullSem)不再阻塞就继续处理?
?????while???(??1??) {?
???????if???(sem_wait( pSched- fullSem) ! ??0??) {?
???????pError(?? wait %s fullSem failed, errno:%d, reason:%s ??, pSched- label, errno, strerror(errno));?
?????????if???(errno EINTR) {?
???????????/* sem_wait is interrupted by interrupt, ignore and continue */?
???????????continue??;?
???????}?
?????}?
????????//加入互斥体防止msg被误用。?
???????if???(pthread_mutex_lock( pSched- queueMutex) ! ??0??)?
???????pError(?? lock %s queueMutex failed, reason:%s ??, pSched- label, strerror(errno));?
???
?????msg pSched- ??queue??[pSched- fullSlot];?
???????memset??(pSched- ??queue??? pSched- fullSlot, ??0??, ??sizeof??(SSchedMsg));?
???????//读取完毕修改fullSlot的值 注意这为避免fullSlot溢出 需要对于queueSize取余。?
?????pSched- fullSlot (pSched- fullSlot ??1??) % pSched- queueSize;?
????????//读取完毕修改退出互斥体?
???????if???(pthread_mutex_unlock( pSched- queueMutex) ! ??0??)?
???????pError(?? unlock %s queueMutex failed, reason:%s\n ??, pSched- label, strerror(errno));?
????????//读取完毕对emptySem进行post操作 即把emptySem的值加1 如emptySem原值为5 读取一个消息后 emptySem的值为6 即可写状态 且能接受的消息数量为6?
???????if???(sem_post( pSched- emptySem) ! ??0??)?
???????pError(?? post %s emptySem failed, reason:%s\n ??, pSched- label, strerror(errno));?
???
???????if???(msg.fp)?
???????(*(msg.fp))( msg);?
???????else?????if???(msg.tfp)?
???????(*(msg.tfp))(msg.ahandle, msg.thandle);?
???}?
?}?
?
?最后写消息的taosScheduleTask函数也就是生产的实现 其基本逻辑是
1.写队列前先对emptySem进行减1操作 如emptySem原值为1 那么减1后为0 也就是队列已满 必须在读取消息后 即emptySem进行post操作后 队列才能进行可写状态。
?2.加入互斥体防止msg被误操作 写入完成后退出互斥体
3.写队列完成后对fullSem进行加1操作 如fullSem原值为0 那么加1后为1 也就是队列可读 咱们上面介绍的读取taosProcessSchedQueue中sem_wait( pSched- fullSem)不再阻塞就继续向下。
?int taosScheduleTask(void *qhandle, SSchedMsg *pMsg) ??{?
???SSchedQueue *pSched (SSchedQueue *)qhandle;?
?????if???(pSched ??NULL??) {?
?????pError(?? sched is not ready, msg:%p is dropped ??, pMsg);?
???????return?????0??;?
???}?
?????//在写队列前先对emptySem进行减1操作 如emptySem原值为1 那么减1后为0 也就是队列已满 必须在读取消息后 即emptySem进行post操作后 队列才能进行可写状态。?
?????if???(sem_wait( pSched- emptySem) ! ??0??) pError(?? wait %s emptySem failed, reason:%s ??, pSched- label, strerror(errno));??//加入互斥体防止msg被误操作?
?????if???(pthread_mutex_lock( pSched- queueMutex) ! ??0??)?
?????pError(?? lock %s queueMutex failed, reason:%s ??, pSched- label, strerror(errno));?
???
???pSched- ??queue??[pSched- emptySlot] *pMsg;?
???pSched- emptySlot (pSched- emptySlot ??1??) % pSched- queueSize;?
???
?????if???(pthread_mutex_unlock( pSched- queueMutex) ! ??0??)?
?????pError(?? unlock %s queueMutex failed, reason:%s ??, pSched- label, strerror(errno));?
?????//在写队列前先对fullSem进行加1操作 如fullSem原值为0 那么加1后为1 也就是队列可读 咱们上面介绍的读取函数可以进行处理。?
?????if???(sem_post( pSched- fullSem) ! ??0??) pError(?? post %s fullSem failed, reason:%s ??, pSched- label, strerror(errno));?
???
?????return?????0??;?
?}?
?
Java的高并发实现从并发模型来看 Go和Rust都有channel这个概念 也都是通过Channel来实现线 协 程间的同步 由于channel带有读写状态且保证数据顺序 而且channel的封装程度和效率明显可以做的更高 因此Go和Rust官方都会建议使用channel 通信 来共享内存 而不是使用共享内存来通信。
为了让帮助大家找到区别 我们先以Java为例来 看一下没有channel的高级语言Java 生产者消费者该如何实现 代码及注释如下
?public?????class Storage ??{?
???
???????// 仓库最大存储量?
???????private?????final?????int???MAX_SIZE ??10??;?
???????// 仓库存储的载体?
???????private???LinkedList Object list ??new???LinkedList Object ?
???????// 锁?
???????private?????final???Lock lock ??new???ReentrantLock();?
???????// 仓库满的信号量?
???????private?????final???Condition full lock.newCondition();?
???????// 仓库空的信号量?
???????private?????final???Condition empty lock.newCondition();?
???
???????public void produce()?
???????{?
???????????// 获得锁?
?????????lock.lock();?
???????????while???(list.size() ??1??? MAX_SIZE) {?
?????????????System.out.println(?? 【生产者 ??? Thread.currentThread().getName()?
? ???????????? ?? 】仓库已满 ??);?
???????????????try???{?
?????????????????full.await();?
?????????????} ??catch???(InterruptedException e) {?
?????????????????e.printStackTrace();?
?????????????}?
?????????}?
?????????list.add(??new???Object());?
?????????System.out.println(?? 【生产者 ??? Thread.currentThread().getName() ?
? ?? 】生产一个产品 现库存 ??? list.size());?
???
?????????empty.signalAll();?
?????????lock.unlock();?
?????}?
???
???????public void consume()?
???????{?
???????????// 获得锁?
?????????lock.lock();?
???????????while???(list.size() ??0??) {?
?????????????System.out.println(?? 【消费者 ??? Thread.currentThread().getName()?
? ???????????? ?? 】仓库为空 ??);?
???????????????try???{?
?????????????????empty.await();?
?????????????} ??catch???(InterruptedException e) {?
?????????????????e.printStackTrace();?
?????????????}?
?????????}?
?????????list.remove();?
?????????System.out.println(?? 【消费者 ??? Thread.currentThread().getName()?
? ???????? ?? 】消费一个产品 现库存 ??? list.size());?
???
?????????full.signalAll();?
?????????lock.unlock();?
?????}?
?}?
???
?
在Java、C#这种面向对象 但是没有channel语言中 生产者、消费者模式至少要借助一个lock和两个信号量共同完成。其中锁的作用是保证同是时间 仓库中只有一个用户进行数据的修改 而还需要表示仓库满的信号量 一旦达到仓库满的情况则将此信号量置为阻塞状态 从而阻止其它生产者再向仓库运商品了 反之仓库空的信号量也是一样 一旦仓库空了 也要阻其它消费者再前来消费了。
Go的高并发实现我们刚刚也介绍过了Go语言中官方推荐使用channel来实现协程间通信 所以不需要再添加lock和信号量就能实现模式了 以下代码中我们通过子goroutine完成了生产者的功能 在在另一个子goroutine中实现了消费者的功能 注意要阻塞主goroutine以确保子goroutine能够执行 从而轻而易举的就这完成了生产者消费者模式。下面我们就通过具体实践中来看一下生产者消费者模型的实现。
?package???main?
?import???(?
? ?? fmt ?
? ?? time ?
?)?
?func Product(ch chan - int)???{ ??//生产者?
? ??for???i : ??0??; i ??3??; i {?
? fmt.Println(?? Product ?produceed ??, i)?
? ch - i ??//由于channel是goroutine安全的,所以此处没有必要必须加锁或者加lock操作.?
? }?
?}??func Consumer(ch -chan int)???{?
? ??for???i : ??0??; i ??3??; i {?
? j : -ch ??//由于channel是goroutine安全的,所以此处没有必要必须加锁或者加lock操作.?
? fmt.Println(?? Consmuer consumed ??, j)?
? }?
?}??func main()???{?
? ch : ??make??(??chan?????int??)?
? ??go???Product(ch)??//注意生产者与消费者放在不同goroutine中?
? ??go???Consumer(ch)??//注意生产者与消费者放在不同goroutine中?
? time.Sleep(time.Second * ??1??)??//防止主goroutine退出?
? ??/*运行结果并不确定 可能为?
? Product ?produceed 0?
? Product ?produceed 1?
? Consmuer consumed ?0?
? Consmuer consumed ?1?
? Product ?produceed 2?
? Consmuer consumed ?2?
? */?
???
?}?
???
可以看到和Java比起来使用GO来实现并发式的生产者消费者模式的确是更为清爽了。
Rust的高并发实现不得不说Rust的难度实在太高了 虽然笔者之前在汇编、C、Java等方面的经验可以帮助我快速掌握Go语言。但是假期看了两天Rust真想大呼告辞 这尼玛也太劝退了。在Rust官方提供的功能中 其实并不包括多生产者、多消费者的channel std:sync空间下只有一个多生产者单消费者 mpsc)的channel。其样例实现如下
?use std::sync::mpsc;?
?use std::thread;?
?use std::time::Duration;?
???
?fn main() {?
?????let (tx, rx) mpsc::channel();?
?????let tx1 mpsc::Sender::clone( tx);?
?????let tx2 mpsc::Sender::clone( tx);?
???
?????thread::spawn(move || {?
?????????let vals vec![?
?????????????String::from( 1 ),?
?????????????String::from( 3 ),?
?????????????String::from( 5 ),?
?????????????String::from( 7 ),?
?????????];?
???
?????????for val in vals {?
?????????????tx1.send(val).unwrap();?
?????????????thread::sleep(Duration::from_secs(1));?
?????????}?
?????});?
???
?????thread::spawn(move || {?
?????????let vals vec![?
?????????????String::from( 11 ),?
?????????????String::from( 13 ),?
?????????????String::from( 15 ),?
?????????????String::from( 17 ),?
?????????];?
???
?????????for val in vals {?
?????????????tx.send(val).unwrap();?
?????????????thread::sleep(Duration::from_secs(1));?
?????????}?
?????});?
???
?????thread::spawn(move || {?
?????????let vals vec![?
?????????????String::from( 21 ),?
?????????????String::from( 23 ),?
?????????????String::from( 25 ),?
?????????????String::from( 27 ),?
?????????];?
???
?????????for val in vals {?
?????????????tx2.send(val).unwrap();?
?????????????thread::sleep(Duration::from_secs(1));?
?????????}?
?????});?
???
?????for rec in rx {?
?????????println!( Got: {} , rec);?
?????}?
?}?
?
可以看到在Rust下实现生产者消费者是不难的 但是生产者可以clone多个 不过消费者却只能有一个 究其原因是因为Rust下没有GC也就是垃圾回收功能 而想保证安全Rust就必须要对于变更使用权限进行严格管理。在Rust下使用move关键字进行变更的所有权转移 但是按照Rust对于变更生产周期的管理规定 线程间权限转移的所有权接收者在同一时间只能有一个 这也是Rust官方只提供MPSC的原因
?use std::thread;?
???
?fn main() {?
?????let s hello ?
?????
?????let handle thread::spawn(move || {?
?????????println!( {} , s);?
?????});?
???
?????handle.join().unwrap();?
?}?
???
当然Rust下有一个API比较贴心就是join 他可以所有子线程都执行结束再退出主线程 这比Go中要手工阻塞还是要有一定的提高。而如果你想用多生产者、多消费者的功能 就要入手crossbeam模块了 这个模块掌握起来难度也真的不低。
总结通过上面的比较我们可以用一张表格来说明几种主流语言的情况对比
语言
安全性
运行速度
进程启动速度
学习难度
C
低
极快
极快
困难
Java
高
一般
一般
一般
Go
高
较快
较快
一般
Rust
高
极快(基本比肩C
极快(基本比肩C
极困难
可以看到Rust以其高安全性、基本比肩C的运行及启动速度必将在Serverless的时代独占鳌头 Go基本也能紧随其后 而C语言程序中难以避免的野指针 Java相对较低的运行及启动速度 可能都不太适用于函数式运算的场景 Java在企业级开发的时代打败各种C#之类的对手 但是在云时代好像还真没有之前统治力那么强了 真可谓是打败你的往往不是你的对手 而是其它空间的降维打击。
?
背景 我们知道 如果在Kubernetes中支持GPU设备调度 需要做如下的工作 节点上安装...
本文转载自公众号读芯术(ID:AI_Discovery)。 这一刻你正在应对什么挑战?这位前...
TIOBE 公布了 2021 年 3 月的编程语言排行榜。 本月 TIOBE 指数没有什么有趣的变...
前言 统计科学家使用交互式的统计工具(比如R)来回答数据中的问题,获得全景的认...
本文转载自微信公众号「bugstack虫洞栈」,作者小傅哥 。转载本文请联系bugstack...
在Python开发过程中,我们难免会遇到多重条件判断的情况的情况,此时除了用很多...
想了解更多内容,请访问: 51CTO和华为官方战略合作共建的鸿蒙技术社区 https://...
溢价 域名 的续费价格如何?通常来说,因为溢价域名的价值高于普通域名,所以溢...
基本介绍 给定 n 个权值作为 n 个叶子节点,构造一颗二叉树,若该树的带权路径长...
近几年,互联网行业蓬勃发展,在互联网浪潮的冲击下,互联网创业已成为一种比较...