前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Rust之tower如何构建请求中间件

Rust之tower如何构建请求中间件

作者头像
newbmiao
发布2024-02-26 09:46:49
2820
发布2024-02-26 09:46:49
举报
文章被收录于专栏:学点Rust学点Rust

提到Rust请求中间件, 就不能不提tower

tower是一个请求协议无关的的中间件定义类库,主要定义了ServiceLayer两个trait来帮助实现可重用的请求处理中间件。

今天拿聊聊它如何巧妙构建起中间件。

初始请求

假设我们有一个请求handler, 用hyper官方的hello world例子代码如下:

代码语言:javascript
复制
use http_body_util::Full;
use hyper::{
    body::{Bytes, Incoming},
    server::conn::http1,
    Request, Response,
};
use hyper_util::rt::TokioIo;
use std::{convert::Infallible, net::SocketAddr};
use tokio::net::TcpListener;

async fn handler(_: Request<Incoming>) -> Result<Response<Full<Bytes>>, Infallible> {
    Ok(Response::new(Full::new(Bytes::from("Hello, World!"))))
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
    let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
    let listener = TcpListener::bind(addr).await?;
    loop {
        let (stream, _) = listener.accept().await?;
        let io = TokioIo::new(stream);
        tokio::spawn(async move {
            // 请求在这里转成了Service
            let svc = hyper::service::service_fn(handler);
            if let Err(err) = http1::Builder::new().serve_connection(io, svc).await {
                eprintln!("server error: {}", err);
            }
        });
    }
}

这里service_fnhandler转成了Service,也就是server启动时要的是一个实现了Service trait的请求处理函数,这是后边构建中间件的基础。

注意,在 hyper 发布 v1 之后,这里的Service准确说不是tower的Service trait,但理念是一样,我们后边在讲他们接口的不同

这时如果想在处理上边加上LoggerTimeout两个流程来分别记录请求日志和超时约束, 能很灵活的按如下方式组织

tower-service

代码语言:javascript
复制
let svc = hyper::service::service_fn(handler);
// 增加两个layer中间件
let svc = ServiceBuilder::new()
    .layer_fn(Logger::new)
    .layer_fn(|s| Timeout::new(s, std::time::Duration::from_secs(5)))
    .service(svc);
// 先忽略下边为了接口转换,后边在展开这里
// let svc = TowerToHyperService::new(svc);

Service trait

这样处理,就像是一个service链,一个service处理完,再调用下一个service

所以tower定义了如下Service trait:

代码语言:javascript
复制
pub trait Service<Request> {
    type Response;
    type Error;
    type Future: Future<Output = Result<Self::Response, Self::Error>>;
    // 请求是否可以处理
    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
    // 处理逻辑
    fn call(&mut self, req: Request) -> Self::Future;
}

这里poll_ready是决定是否可以执行请求处理call前的判断。

call拿到请求,返回一个异步处理的结果,这样当请求执行耗时时不阻塞其他请求的处理。

说个题外话,你可能会好奇为什么这里要返回一个Future而不是用async

这是因为之前Rust不支持trait中定义异步函数。不过Rust 1.75开始支持了,如果后边换成下边的实现就不奇怪了

代码语言:javascript
复制
trait Service<Request> {
    type Response;
    type Error;
    async fn call(&mut self, req: Request) -> Result<Self::Response, Self::Error>;
}

实现 middleware

Logger middleware实现来看如何构建起service

注释及代码如下:

代码语言:javascript
复制
use tower::Service;

#[derive(Debug, Clone)]
pub struct Logger<S> {
    inner: S,
}
impl<S> Logger<S> {
    pub fn new(inner: S) -> Self {
        Logger { inner }
    }
}
type Req = hyper::Request<Incoming>;
impl<S> Service<Req> for Logger<S>
where
    S: Service<Req> + Clone,
{
    // Logger拿到的也是一个Service,返回类型也没有变化,直接指定即可
    type Response = S::Response;
    type Error = S::Error;
    type Future = S::Future;

    fn poll_ready(
        &mut self,
        cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Result<(), Self::Error>> {
        // 直接可以处理,无需额外满足条件
        self.inner.poll_ready(cx)
    }

    fn call(&mut self, req: Req) -> Self::Future {
        println!("processing request: {} {}", req.method(), req.uri().path());
        // 处理完调用下一个Service
        self.inner.call(req)
    }
}

这样添加Logger可以这么添加

代码语言:javascript
复制
let svc = Logger::new(svc);

再加个Timeout

代码语言:javascript
复制
let svc = Timeout::new(svc, std::time::Duration::from_secs(5));

不够优雅,要是链式操作就好了,这时就到Layer trait显身手了

Layer trait

代码语言:javascript
复制
pub trait Layer<S> {
    type Service;
    fn layer(&self, inner: S) -> Self::Service;
}

只要实现以上Layer trait

代码语言:javascript
复制
impl<S> Layer<S> for Logger<S> {
    type Service = Logger<S>;
    fn layer(&self, inner: S) -> Self::Service {
        Logger { inner }
    }
}

然后就能用ServiceBuilder构建service

代码语言:javascript
复制
tower::ServiceBuilder::new()
    .layer(LoggerLayer)
    .layer_fn(|s| Timeout::new(s, std::time::Duration::from_secs(5)))
    .service(svc);

当然layer_fn也可以直接将函数转为实现Layer trait

最重要的是顺序是按调用顺序。

hyper Service trait

hyper之前依赖了tower Service,但 v1 稳定版发布前替换成了自己的Service

一方面是tower还没有稳定版本

另一方面为了简化请求处理:

  • 移除了poll_ready
  • call也不再需要&mut self,即不再考虑通过其修改请求,如果需要的话可以加Arc<Mutex<_>>state
代码语言:javascript
复制
pub trait Service<Request> {
    type Response;
    type Error;
    type Future: Future<Output = Result<Self::Response, Self::Error>>;

    fn call(&self, req: Request) -> Self::Future;
}

这也就是为什么如果你想直接复用tower Serivice(如Timeout等)需要TowerToHyperService转一下:

代码语言:javascript
复制
let svc = TowerToHyperService::new(svc);

另外hyper作为比较底层的请求库,很多web框架(Axum, Actix web等)都依赖他。也就也支持了tower, 使得tower实现的中间件就更容易复用了。

总的来说,tower能用Service trait构建一个请求中间件的规范,确实很神奇。从目前实现反推似乎很简单,但其实设计过程中还是有很多考虑的。推荐看看官方的这篇inventing-the-service-trait[1]

想了解中间件实现过程的话也推荐看看 David Pedersen 的Rust live codingTower deep dive[2] (看不了的同学可以 B 站找找...)

参考资料

[1]

inventing-the-service-trait: https://tokio.rs/blog/2021-05-14-inventing-the-service-trait

[2]

Tower deep dive: https://www.youtube.com/watch?v=16sU1q8OeeI&t=4227s

本文参与?腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2024-02-06,如有侵权请联系?cloudcommunity@tencent.com 删除

本文分享自 菜鸟Miao 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与?腾讯云自媒体分享计划? ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 初始请求
  • Service trait
  • 实现 middleware
  • Layer trait
  • hyper Service trait
相关产品与服务
消息队列 TDMQ
消息队列 TDMQ (Tencent Distributed Message Queue)是腾讯基于 Apache Pulsar 自研的一个云原生消息中间件系列,其中包含兼容Pulsar、RabbitMQ、RocketMQ 等协议的消息队列子产品,得益于其底层计算与存储分离的架构,TDMQ 具备良好的弹性伸缩以及故障恢复能力。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
http://www.vxiaotou.com