前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >九哥聊Kestrel网络编程第一章:开发一个Redis服务器

九哥聊Kestrel网络编程第一章:开发一个Redis服务器

作者头像
InCerry
发布2023-03-08 16:05:05
5630
发布2023-03-08 16:05:05
举报
文章被收录于专栏:InCerryInCerry

推荐序

之前在.NET 性能优化群内交流时,我们发现很多朋友对于高性能网络框架有需求,需要创建自己的消息服务器、游戏服务器或者物联网网关。但是大多数小伙伴只知道 DotNetty,虽然 DotNetty 是一个非常优秀的网络框架,广泛应用于各种网络服务器中,不过因为各种原因它已经不再有新的特性支持和更新,很多小伙伴都在寻找替代品。

这一切都不用担心,在.NET Core 以后的时代,我们有了更快、更强、更好的 Kestrel 网络框架,正如其名,Kestrel 中文翻译为红隼(hóng sǔn)封面就是红隼的样子,是一种飞行速度极快的猛禽。Kestrel 是 ASPNET Core 成为.NET 平台性能最强 Web 服务框架的原因之一,但是很多人还觉得 Kestrel 只是用于 ASPNET Core 的网络框架,但是其实它是一个高性能的通用网络框架。

我和拥有多个.NET 千星开源项目作者九哥[1]一拍即合,为了让更多的人了解 Kestrel,计划写一系列的文章来介绍它;本文是九哥发布的第一篇,通过 Kestrel 实现一个 Redis 的伪服务器,带大家了解 Kestrel 除了承载 HTTP 协议,还有其它更多的可能性,DotNetty 能做到的,Kestrel 同样也可以。

由于公众号排版问题,建议在PC上浏览

1 文章目的

本文讲解基于 kestrel 开发实现了部分 redis 命令的 redis 伪服务器的过程,让读者了解 kestrel 网络编程的完整步骤,其中 redis 通讯协议需要读者自行查阅,文章里不做具体解析。

2 开发顺序

  1. 创建 Kestrel 的 Redis 协议处理者
  2. 配置监听的 EndPoint 并使用 Redis 处理者
  3. 设计交互上下文 RedisContext
  4. 设计 Redis 命令处理者
  5. 设计 Redis 中间件
  6. 编排 Redis 中间件构建应用

3. 创建 Redis 协议处理者

在 Kestrel 中,末级的中间件是一个没有 next 的特殊中间件,基表现出来就是一个 ConnectionHandler 的行为。我们开发 redis 应用只需要继承 ConnectionHandler 这个抽象类来,当 kestrel 接收到新的连接时将连接交给我们来处理,我们处理完成之后,不再有下一个处理者来处理这个连接了。

代码语言:javascript
复制
/// <summary>
/// 表示Redis连接处理者
/// </summary>
sealed class RedisConnectionHandler : ConnectionHandler
{
    /// <summary>
    /// 处理Redis连接
    /// </summary>
    /// <param name="context">redis连接上下文</param>
    /// <returns></returns>
    public async override Task OnConnectedAsync(ConnectionContext context)
    {
        // 开始处理这个redis连接
        ...
        // 直到redis连接断开后结束
    }
}

4. 配置监听的 EndPoint

4.1 json 配置文件

我们在配置文件里指定监听本机的 5007 端口来做服务器,当然你可以指定本机具体的某个 IP 或任意 IP。

代码语言:javascript
复制
{
  "Kestrel": {
    "Endpoints": {
      "Redis": { // redis协议服务器,只监听loopback的IP
        "Url": "http://localhost:5007"
      }
    }
  }
}
代码语言:javascript
复制
{
  "Kestrel": {
    "Endpoints": {
      "Redis": { // redis协议服务器,监听所有IP
        "Url": "http://*:5007"
      }
    }
  }
}
4.2 在代码中配置 Redis 处理者

为 Redis 这个节点关联上RedisConnectionHandler,当 redis 客户端连接到 5007 这个端口之后,OnConnectedAsync()方法就得到触发且收到连接上下文对象。

代码语言:javascript
复制
builder.WebHost.ConfigureKestrel((context, kestrel) =>
{
    var section = context.Configuration.GetSection("Kestrel");
    kestrel.Configure(section).Endpoint("Redis", endpoint =>
    {
        endpoint.ListenOptions.UseConnectionHandler<RedisConnectionHandler>();
    });
});

5 设计 RedisContext

在 asp.netcore 里,我们知道应用层每次 http 请求都创建一个 HttpContext 对象,里面就塞着各种与本次请求有关的对象。对于 Redis 的请求,我们也可以这么抄袭 asp.netcore 来设计 Redis。

5.1 RedisContext

Redis 请求上下文,包含 Client、Request、Response 和 Features 对象,我们要知道是收到了哪个 Redis 客户端的什么请求,从而请求命令处理者可以向它响应对应的内容。

代码语言:javascript
复制
/// <summary>
/// 表示redis上下文
/// </summary>
sealed class RedisContext : ApplicationContext
{
    /// <summary>
    /// 获取redis客户端
    /// </summary>
    public RedisClient Client { get; }

    /// <summary>
    /// 获取redis请求
    /// </summary>
    public RedisRequest Reqeust { get; }

    /// <summary>
    /// 获取redis响应
    /// </summary>
    public RedisResponse Response { get; }

    /// <summary>
    /// redis上下文
    /// </summary>
    /// <param name="client"></param>
    /// <param name="request"></param>
    /// <param name="response"></param>
    /// <param name="features"></param>
    public RedisContext(RedisClient client, RedisRequest request, RedisResponse response, IFeatureCollection features)
        : base(features)
    {
        this.Client = client;
        this.Reqeust = request;
        this.Response = response;
    }

    public override string ToString()
    {
        return $"{this.Client} {this.Reqeust}";
    }
}
5.2 ApplicationContext

这是抽象的应用层上下文,它强调 Features,做为多个中间件之间的沟通渠道。

代码语言:javascript
复制
/// <summary>
/// 表示应用程序请求上下文
/// </summary>
public abstract class ApplicationContext
{
    /// <summary>
    /// 获取特征集合
    /// </summary>
    public IFeatureCollection Features { get; }

    /// <summary>
    /// 应用程序请求上下文
    /// </summary>
    /// <param name="features"></param>
    public ApplicationContext(IFeatureCollection features)
    {
        this.Features = new FeatureCollection(features);
    }
}
5.3 RedisRequest

一个 redis 请求包含请求的命令和 0 到多个参数值。

代码语言:javascript
复制
/// <summary>
/// 表示Redis请求
/// </summary>
sealed class RedisRequest
{
    private readonly List<RedisValue> values = new();

    /// <summary>
    /// 获取命令名称
    /// </summary>
    public RedisCmd Cmd { get; private set; }

    /// <summary>
    /// 获取参数数量
    /// </summary>
    public int ArgumentCount => this.values.Count - 1;

    /// <summary>
    /// 获取参数
    /// </summary>
    /// <param name="index"></param>
    /// <returns></returns>
    public RedisValue Argument(int index)
    {
        return this.values[index + 1];
    }
}

RedisRequest 的解析:

代码语言:javascript
复制
/// <summary>
/// 从内存中解析
/// </summary>
/// <param name="memory"></param>
/// <param name="request"></param>
/// <exception cref="RedisProtocolException"></exception>
/// <returns></returns>
private static bool TryParse(ReadOnlyMemory<byte> memory, [MaybeNullWhen(false)] out RedisRequest request)
{
    request = default;
    if (memory.IsEmpty == true)
    {
        return false;
    }

    var span = memory.Span;
    if (span[0] != '*')
    {
        throw new RedisProtocolException();
    }

    if (span.Length < 4)
    {
        return false;
    }

    var lineLength = span.IndexOf((byte)'\n') + 1;
    if (lineLength < 4)
    {
        throw new RedisProtocolException();
    }

    var lineCountSpan = span.Slice(1, lineLength - 3);
    var lineCountString = Encoding.ASCII.GetString(lineCountSpan);
    if (int.TryParse(lineCountString, out var lineCount) == false || lineCount < 0)
    {
        throw new RedisProtocolException();
    }

    request = new RedisRequest();
    span = span.Slice(lineLength);
    for (var i = 0; i < lineCount; i++)
    {
        if (span[0] != '$')
        {
            throw new RedisProtocolException();
        }

        lineLength = span.IndexOf((byte)'\n') + 1;
        if (lineLength < 4)
        {
            throw new RedisProtocolException();
        }

        var lineContentLengthSpan = span.Slice(1, lineLength - 3);
        var lineContentLengthString = Encoding.ASCII.GetString(lineContentLengthSpan);
        if (int.TryParse(lineContentLengthString, out var lineContentLength) == false)
        {
            throw new RedisProtocolException();
        }

        span = span.Slice(lineLength);
        if (span.Length < lineContentLength + 2)
        {
            return false;
        }

        var lineContentBytes = span.Slice(0, lineContentLength).ToArray();
        var value = new RedisValue(lineContentBytes);
        request.values.Add(value);

        span = span.Slice(lineContentLength + 2);
    }

    request.Size = memory.Span.Length - span.Length;
    Enum.TryParse<RedisCmd>(request.values[0].ToString(), ignoreCase: true, out var name);
    request.Cmd = name;

    return true;
}
5.4 RedisResponse
代码语言:javascript
复制
/// <summary>
/// 表示redis回复
/// </summary>
sealed class RedisResponse
{
    private readonly PipeWriter writer;

    public RedisResponse(PipeWriter writer)
    {
        this.writer = writer;
    }

    /// <summary>
    /// 写入\r\n
    /// </summary>
    /// <returns></returns>
    public RedisResponse WriteLine()
    {
        this.writer.WriteCRLF();
        return this;
    }

    public RedisResponse Write(char value)
    {
        this.writer.Write((byte)value);
        return this;
    }

    public RedisResponse Write(ReadOnlySpan<char> value)
    {
        this.writer.Write(value, Encoding.UTF8);
        return this;
    }

    public RedisResponse Write(ReadOnlyMemory<byte> value)
    {
        this.writer.Write(value.Span);
        return this;
    }


    public ValueTask<FlushResult> FlushAsync()
    {
        return this.writer.FlushAsync();
    }

    public ValueTask<FlushResult> WriteAsync(ResponseContent content)
    {
        return this.writer.WriteAsync(content.ToMemory());
    }
}

5.5 RedisClient

Redis 是有状态的长连接协议,所以在服务端,我把连接接收到的连接包装为 RedisClient 的概念,方便我们业务理解。对于连接级生命周期的对象属性,我们都应该放到 RedisClient 上,比如是否已认证授权等。

代码语言:javascript
复制
/// <summary>
/// 表示Redis客户端
/// </summary>
sealed class RedisClient
{
    private readonly ConnectionContext context;

    /// <summary>
    /// 获取或设置是否已授权
    /// </summary>
    public bool? IsAuthed { get; set; }

    /// <summary>
    /// 获取远程终结点
    /// </summary>
    public EndPoint? RemoteEndPoint => context.RemoteEndPoint;

    /// <summary>
    /// Redis客户端
    /// </summary>
    /// <param name="context"></param>
    public RedisClient(ConnectionContext context)
    {
        this.context = context;
    }

    /// <summary>
    /// 关闭连接
    /// </summary>
    public void Close()
    {
        this.context.Abort();
    }

    /// <summary>
    /// 转换为字符串
    /// </summary>
    /// <returns></returns>
    public override string? ToString()
    {
        return this.RemoteEndPoint?.ToString();
    }
}

6. 设计 Redis 命令处理者

redis 命令非常多,我们希望有一一对应的 cmdHandler 来对应处理,来各尽其责。所以我们要设计 cmdHandler 的接口,然后每个命令增加一个实现类型,最后使用一个中间件来聚合这些 cmdHandler。

6.1 IRedisCmdHanler 接口
代码语言:javascript
复制
/// <summary>
/// 定义redis请求处理者
/// </summary>
interface IRedisCmdHanler
{
    /// <summary>
    /// 获取能处理的请求命令
    /// </summary>
    RedisCmd Cmd { get; }

    /// <summary>
    /// 处理请求
    /// </summary>
    /// <param name="context"></param>
    /// <returns></returns>
    ValueTask HandleAsync(RedisContext context);
}
6.2 IRedisCmdHanler 实现

由于实现类型特别多,这里只举个例子

代码语言:javascript
复制
/// <summary>
/// Ping处理者
/// </summary>
sealed class PingHandler : IRedisCmdHanler
{
    public RedisCmd Cmd => RedisCmd.Ping;

    /// <summary>
    /// 处理请求
    /// </summary>
    /// <param name="context"></param>
    /// <returns></returns>
    public async ValueTask HandleAsync(RedisContext context)
    {
        await context.Response.WriteAsync(ResponseContent.Pong);
    }
}

7.设计 Redis 中间件

对于 Redis 服务器应用而言,我们处理一个请求需要经过多个大的步骤:

  1. 如果服务器要求 Auth 的话,验证连接是否已 Auth
  2. 如果 Auth 验证通过之后,则查找与请求对应的 IRedisCmdHanler 来处理请求
  3. 如果没有 IRedisCmdHanler 来处理,则告诉客户端命令不支持。
7.1 中间件接口
代码语言:javascript
复制
/// <summary>
/// redis中间件
/// </summary>
interface IRedisMiddleware : IApplicationMiddleware<RedisContext>
{
}
代码语言:javascript
复制
/// <summary>
/// 应用程序中间件的接口
/// </summary>
/// <typeparam name="TContext"></typeparam>
public interface IApplicationMiddleware<TContext>
{
    /// <summary>
    /// 执行中间件
    /// </summary>
    /// <param name="next">下一个中间件</param>
    /// <param name="context">上下文</param>
    /// <returns></returns>
    Task InvokeAsync(ApplicationDelegate<TContext> next, TContext context);
}
7.2 命令处理者中间件

这里只拿重要的命令处理者中间件来做代码说明,其它中间件也是一样处理方式。

代码语言:javascript
复制
/// <summary>
/// 命令处理中间件
/// </summary>
sealed class CmdMiddleware : IRedisMiddleware
{
    private readonly Dictionary<RedisCmd, IRedisCmdHanler> cmdHandlers;

    public CmdMiddleware(IEnumerable<IRedisCmdHanler> cmdHanlers)
    {
        this.cmdHandlers = cmdHanlers.ToDictionary(item => item.Cmd, item => item);
    }

    public async Task InvokeAsync(ApplicationDelegate<RedisContext> next, RedisContext context)
    {
        if (this.cmdHandlers.TryGetValue(context.Reqeust.Cmd, out var hanler))
        {
            // 这里是本中间件要干的活
            await hanler.HandleAsync(context);
        }
        else
        {
            // 本中间件干不了,留给下一个中间件来干
            await next(context);
        }
    }
}

8 编排 Redis 中间件

回到 RedisConnectionHandler,我们需要实现它,实现逻辑是编排 Redis 中间件并创建可以处理应用请求的委托application,再将收到的 redis 请求创建 RedisContext 对象的实例,最后使用application来执行 RedisContext 实例即可。

8.1 构建 application 委托
代码语言:javascript
复制
sealed class RedisConnectionHandler : ConnectionHandler
{
    private readonly ILogger<RedisConnectionHandler> logger;
    private readonly ApplicationDelegate<RedisContext> application;

    /// <summary>
    /// Redis连接处理者
    /// </summary>
    /// <param name="appServices"></param>
    /// <param name="logger"></param>
    public RedisConnectionHandler(
        IServiceProvider appServices,
        ILogger<RedisConnectionHandler> logger)
    {
        this.logger = logger;
        this.application = new ApplicationBuilder<RedisContext>(appServices)
            .Use<AuthMiddleware>()
            .Use<CmdMiddleware>()
            .Use<FallbackMiddlware>()
            .Build();
    }
}
8.2 使用 application 委托处理请求
代码语言:javascript
复制
sealed class RedisConnectionHandler : ConnectionHandler
{
    /// <summary>
    /// 处理Redis连接
    /// </summary>
    /// <param name="context"></param>
    /// <returns></returns>
    public async override Task OnConnectedAsync(ConnectionContext context)
    {
        try
        {
            await this.HandleRequestsAsync(context);
        }
        catch (Exception ex)
        {
            this.logger.LogDebug(ex.Message);
        }
        finally
        {
            await context.DisposeAsync();
        }
    }

    /// <summary>
    /// 处理redis请求
    /// </summary>
    /// <param name="context"></param>
    /// <returns></returns>
    private async Task HandleRequestsAsync(ConnectionContext context)
    {
        var input = context.Transport.Input;
        var client = new RedisClient(context);
        var response = new RedisResponse(context.Transport.Output);

        while (context.ConnectionClosed.IsCancellationRequested == false)
        {
            var result = await input.ReadAsync();
            if (result.IsCanceled)
            {
                break;
            }

            var requests = RedisRequest.Parse(result.Buffer, out var consumed);
            if (requests.Count > 0)
            {
                foreach (var request in requests)
                {
                    var redisContext = new RedisContext(client, request, response, context.Features);
                    await this.application.Invoke(redisContext);
                }
                input.AdvanceTo(consumed);
            }
            else
            {
                input.AdvanceTo(result.Buffer.Start, result.Buffer.End);
            }

            if (result.IsCompleted)
            {
                break;
            }
        }
    }
}

9 文章总结

在还没有进入阅读本文章之前,您可能会觉得我会大量讲解 Socket 知识内容,例如Socket BindSocket AcceptSocket SendSocket Receive等。但实际上没完全没有任何涉及,因为终结点的监听、连接的接收、缓冲区的处理、数据接收与发送等这些基础而复杂的网络底层 kestrel 已经帮我处理好,我们关注是我们的应用协议层的解析、还有应用本身功能的开发两个本质问题。

您可能发也现了,本文章的 RedisRequest 解析,也没有多少行代码!反而文章中都是抽象的中间件、处理者、上下文等概念。实际上这不但不会带来项目复杂度,反而让项目更好的解耦,比如要增加一个新的指令的支持,只需要增加一个 xxxRedisCmdHanler 的文件,其它地方都不用任何修改。

本文章是KestrelApp 项目[2]里面的一个 demo 的讲解,希望对您有用。

参考资料

[1]

九哥: https://www.cnblogs.com/kewei/

[2]

KestrelApp项目: https://github.com/xljiulang/KestrelApp

本文参与?腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2022-12-10,如有侵权请联系?cloudcommunity@tencent.com 删除

本文分享自 InCerry 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与?腾讯云自媒体分享计划? ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 推荐序
  • 1 文章目的
  • 2 开发顺序
  • 3. 创建 Redis 协议处理者
  • 4. 配置监听的 EndPoint
    • 4.1 json 配置文件
      • 4.2 在代码中配置 Redis 处理者
      • 5 设计 RedisContext
        • 5.1 RedisContext
          • 5.2 ApplicationContext
            • 5.3 RedisRequest
              • 5.4 RedisResponse
              • 5.5 RedisClient
              • 6. 设计 Redis 命令处理者
                • 6.1 IRedisCmdHanler 接口
                  • 6.2 IRedisCmdHanler 实现
                  • 7.设计 Redis 中间件
                    • 7.1 中间件接口
                      • 7.2 命令处理者中间件
                      • 8 编排 Redis 中间件
                        • 8.1 构建 application 委托
                          • 8.2 使用 application 委托处理请求
                          • 9 文章总结
                          • 参考资料
                          相关产品与服务
                          云数据库 Redis
                          腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
                          领券
                          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
                          http://www.vxiaotou.com