前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >ClickHouse ConnectionPool 链接池的优化

ClickHouse ConnectionPool 链接池的优化

原创
作者头像
jasong
修改2021-10-28 19:27:36
3.4K1
修改2021-10-28 19:27:36
举报
文章被收录于专栏:ClickHouseClickHouse

一 为什么需要线程池

  • 官方解答

是维护的数据库连接的缓存,以便在将来需要对数据库发出请求时可以重用连接。 连接池用于提高在数据库上执行命令的性能。为每个用户打开和维护数据库连接,尤其是对动态数据库驱动的网站应用程序发出的请求,既昂贵又浪费资源。在连接池中,创建连接之后,将连接放在池中并再次使用,这样就不必创建新的连接。如果所有连接都正在使用,则创建一个新连接并将其添加到池中。连接池还减少了用户必须等待创建与数据库的连接的时间。

ClickHouse 原生ConnectionPool 缺点

  1. ClickHouse 官方 对于Connnection的实现过于死板,ConnectionPool 只能适用于ClickHouse TCP Connenction
代码语言:javascript
复制

  class ConnectionPool : public IConnectionPool, private PoolBase<Connection>
  {
  public:
      using Entry = IConnectionPool::Entry;
      using Base = PoolBase<Connection>;
      ConnectionPool(unsigned max_connections_,
              const String & host_,
              UInt16 port_,
              const String & default_database_,
              const String & user_,
              const String & password_,
              const String & cluster_,
              const String & cluster_secret_,
              const String & client_name_,
              Protocol::Compression compression_,
              Protocol::Secure secure_,
              Int64 priority_ = 1)
         : Base(max_connections_,
          &Poco::Logger::get("ConnectionPool (" + host_ + ":" + toString(port_) + ")")),
          host(host_),
          port(port_),
          default_database(default_database_),
          user(user_),
          password(password_),
          cluster(cluster_),
          cluster_secret(cluster_secret_),
          client_name(client_name_),
          compression(compression_),
          secure(secure_),
          priority(priority_)
      {
      }

2. PoolBase 构造函数需要继承自类ConnnectionPool 重新实现

代码语言:javascript
复制
    PoolBase(unsigned max_items_, Poco::Logger * log_)
       : max_items(max_items_), log(log_)
    {
        items.reserve(max_items);
    }
    /** Creates a new object to put into the pool. */
    virtual ObjectPtr allocObject() = 0;

  • 目的
    • 实现模版类,更加轻量化的实现

二 准备工作

基本C++概念
代码语言:javascript
复制
std::mutex  锁
std::unique_lock 唯一锁
std::lock_guard
std::shared_ptr 指针,带引用计数器 use_count
std::vector 数据
class 类
template class 模版类
网线限制参数
代码语言:javascript
复制
connection_timeout
send_timeout
receive_timeout
tcp_keep_alive_timeout
http_keep_alive_timeout 
secure_connection_timeout
handshake_timeout 
包参数
代码语言:javascript
复制
secure 安全模式 http?https
compression 数据传输是否压缩
INodeInfo 节点信息
代码语言:javascript
复制
ip   节点ip
role 节点的角色
IClusterInfo 集群信息
代码语言:javascript
复制
http/tcp port 集群访问的端口
user 用户名
password 密码
std::vector<NodeInfo> 集群节点
xxx 其他 
网络限制参数
网络传输参数
Connection
代码语言:javascript
复制
IClusterInfo
socket/client  Server 放提供的链接方式的client 的封装
?
ConnectionPool

三 类

3.1 集群信息
  • 1 NodeInfo
代码语言:javascript
复制
struct NodeInfo
{
    explicit NodeInfo(std::string host_, std::string role_ = "follower") : host(host_), role(role_) { }
    std::string host;
    std::string role;
};
using NodeInfoPtr = std::shared_ptr<NodeInfo>;
using NodeInfoPtrs = std::vector<NodeInfoPtr>;
  • 2 ClusterInfo
代码语言:javascript
复制
struct ClusterInfo
{
    enum class Compression
    {
        Disable = 0,
        Enable = 1,
    };
?
    enum class Secure
    {
        Disable = 0,
        Enable = 1,
    };
?
?
    explicit ClusterInfo(const Poco::Util::AbstractConfiguration & config, std::string config_name);
    //to do ,memory ClusterInfo(xxx);
    NodeInfoPtrs node_info_ptrs;
    std::int32_t port;
    std::string user = "root";
    std::string password;
    Secure security = Secure::Disable;
    Compression compression = Compression::Enable;
    ConnectionTimeouts timeouts;
};
using ClusterInfoPtr = std::shared_ptr<ClusterInfo>;
?
?
?
ConnectionParameters::ConnectionParameters(const Poco::Util::AbstractConfiguration & config)
{
    bool is_secure = config.getBool("secure", false);
    security = is_secure ? Protocol::Secure::Enable : Protocol::Secure::Disable;
?
    host = config.getString("host", "localhost");
    port = config.getInt(
        "port", config.getInt(is_secure ? "tcp_port_secure" : "tcp_port", is_secure ? DBMS_DEFAULT_SECURE_PORT : DBMS_DEFAULT_PORT));
?
    default_database = config.getString("database", "");
?
    /// changed the default value to "default" to fix the issue when the user in the prompt is blank
    user = config.getString("user", "default");
?
    bool password_prompt = false;
    if (config.getBool("ask-password", false))
    {
        if (config.has("password"))
            throw Exception("Specified both --password and --ask-password. Remove one of them", ErrorCodes::BAD_ARGUMENTS);
        password_prompt = true;
    }
    else
    {
        password = config.getString("password", "");
        /// if the value of --password is omitted, the password will be set implicitly to "\n"
        if (password == "\n")
            password_prompt = true;
    }
    if (password_prompt)
    {
#if !defined(ARCADIA_BUILD)
        std::string prompt{"Password for user (" + user + "): "};
        char buf[1000] = {};
        if (auto * result = readpassphrase(prompt.c_str(), buf, sizeof(buf), 0))
            password = result;
#endif
    }
?
    /// By default compression is disabled if address looks like localhost.
    compression = config.getBool("compression", !isLocalAddress(DNSResolver::instance().resolveHost(host)))
        ? Protocol::Compression::Enable : Protocol::Compression::Disable;
?
    timeouts = ConnectionTimeouts(
        Poco::Timespan(config.getInt("connect_timeout", DBMS_DEFAULT_CONNECT_TIMEOUT_SEC), 0),
        Poco::Timespan(config.getInt("send_timeout", DBMS_DEFAULT_SEND_TIMEOUT_SEC), 0),
        Poco::Timespan(config.getInt("receive_timeout", DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC), 0),
        Poco::Timespan(config.getInt("tcp_keep_alive_timeout", 0), 0));
}
  • 3 ConnectionCache to do 数据缓存
代码语言:javascript
复制
class ConnectionCache
{
  
}
3.2 Connection
代码语言:javascript
复制
class IConnection
{
public:
    IConnection() = default;
?
    virtual ~IConnection() = default;
?
    virtual void connect() = 0;
    virtual void close() = 0;
?
    virtual void heartbeat() = 0;
?
    virtual void isLeader() = 0;
    virtual void getleader() = 0;
    virtual void getCluster() = 0;
?
    virtual void send() = 0;
    virtual void receive() = 0;
?
?
    virtual bool inUse() = 0;
};
xxxx
3.2 ClickHouse ConnectionPool
  • 优化
代码语言:javascript
复制
template<typename TObject>
class ConnectionPool {
public:
    using ObjectPtr = std::shared_ptr<TObject>;
    using ObjectPtrs = std::vector<ObjectPtr>;
?
    ObjectPtr get() {
        std::unique_lock<std::mutex> lock(object_mutex);
?
// 通过shared_ptr 直接简化封装,和use_count 判断object 是否在使用
        while (true) {
            for (auto object_ptr : object_ptrs)
                if (object_ptr.use_count() == 2)
                    return object_ptr;
?
            auto object_index = object_ptrs.size();
            if (object_index < max_pool_size) {
                alloObject(object_index);
                continue;
            }
?
            condition_variable.wait_for(lock, std::chrono::milliseconds(10));
        }
    }
?
    ConnectionPool(ClusterInfoPtr cluster_info_ptr_, size_t min_pool_size_, size_t max_pool_size_)
            : cluster_info_ptr(std::move(cluster_info_ptr_)), min_pool_size(min_pool_size_),
              max_pool_size(max_pool_size_) {
        reserve();
    }
?
?
    void reserve() {
        std::lock_guard<std::mutex> lock(object_mutex);
        for (size_t object_index = object_ptrs.size(); object_index < min_pool_size; object_index++)
            alloObject(object_index);
    }
?
?
    void alloObject(size_t object_index) {
        ObjectPtr object_ptr
                = std::make_shared<TObject>(
                        cluster_info_ptr->node_info_ptrs[object_index % cluster_info_ptr->node_info_ptrs.size()]);
        object_ptrs.emplace_back(object_ptr);
    }
    //to do 集群信息更新,更新ClusterInfo
?
    ~ConnectionPool() = default;
?
private:
    ClusterInfoPtr cluster_info_ptr;
    ObjectPtrs object_ptrs;
    size_t min_pool_size;
    size_t max_pool_size;
    std::mutex object_mutex;
    std::condition_variable condition_variable;
};
?

3.2 ClickHouse ConnectionPool

ClickHouse Object 管理

  1. 使用Entry 绑定Object 作代码调用的返回值
  2. Entry 来实际操作 Object
  3. 用户在使用的时候 必须接受Entry 的返回值
  4. 使用shared_ptr 来完成最终object 自我销毁
代码语言:javascript
复制
    /** What is given to the user. */
    class Entry
    {
    public:
        friend class PoolBase<Object>;

        Entry() = default; /// For deferred initialization.

        /** The `Entry` object protects the resource from being used by another thread.
          * The following methods are forbidden for `rvalue`, so you can not write a similar to
          *
          * auto q = pool.get()->query("SELECT .."); // Oops, after this line Entry was destroyed
          * q.execute (); // Someone else can use this Connection
          */
        Object * operator->() && = delete;
        const Object * operator->() const && = delete;
        Object & operator*() && = delete;
        const Object & operator*() const && = delete;

        Object * operator->() & { return &*data->data.object; }
        const Object * operator->() const & { return &*data->data.object; }
        Object & operator*() & { return *data->data.object; }
        const Object & operator*() const & { return *data->data.object; }

        bool isNull() const { return data == nullptr; }

        PoolBase * getPool() const
        {
            if (!data)
                throw DB::Exception("Attempt to get pool from uninitialized entry", DB::ErrorCodes::LOGICAL_ERROR);
            return &data->data.pool;
        }

    private:
        std::shared_ptr<PoolEntryHelper> data;

        explicit Entry(PooledObject & object) : data(std::make_shared<PoolEntryHelper>(object)) { }
    };

    virtual ~PoolBase() = default;

    /** Allocates the object. Wait for free object in pool for 'timeout'. With 'timeout' < 0, the timeout is infinite. */
    Entry get(Poco::Timespan::TimeDiff timeout)
    {
        std::unique_lock lock(mutex);

        while (true)
        {
            for (auto & item : items)
                if (!item->in_use)
                    return Entry(*item);

            if (items.size() < max_items)
            {
                ObjectPtr object = allocObject();
                items.emplace_back(std::make_shared<PooledObject>(object, *this));
                return Entry(*items.back());
            }

            LOG_INFO(log, "No free connections in pool. Waiting.");

            if (timeout < 0)
                available.wait(lock);
            else
                available.wait_for(lock, std::chrono::microseconds(timeout));
        }
    }

四 如何使用

eg: Redis

1 创建 RedisConnection
代码语言:javascript
复制
?
class RedisConnection : public IConnection
{
public:
    RedisConnection(NodeInfoPtr nodeInfoPtr);
    RedisConnection(ClusterInfoPtr clusterInfoPtr);
    ~RedisConnection();
?
//redis client object 
    xxxxx
};
2 ConnectionPool 模版 Client 实现
代码语言:javascript
复制
class RedisClient : public SConnectionPool<RedisConnection>
{
public:
    RedisClient(ClusterInfoPtr clusterInfoPtr_, size_t min_pool_size_, size_t max_pool_size_)
        : ConnectionPool(clusterInfoPtr_, min_pool_size_, max_pool_size_), cluster_info_ptr(clusterInfoPtr_)
    {
    }
?
    ~RedisClient() = default;
?
};

想较 ClickHouse Connection Pool 更加轻量化

demo 后面放到个人github

代码语言:javascript
复制
people1 method5
xxxxxxx  1
people1 method1
xxxxxxx  2
people1 method2
xxxxxxx  4
people1 method4
xxxxxxx  3
people1 method3
xxxxxxx  1
people1 method1
xxxxxxx  5
people1 method5
xxxxxxx  4
people1 method4
xxxxxxx  5
people1 method5
xxxxxxx  1
people1 method1
xxxxxxx  2
people1 method2
xxxxxxx  3
people1 method3
xxxxxxx  1
people1 method1
xxxxxxx  5
people1 method5
xxxxxxx  3
people1 method3
xxxxxxx  4
people1 method4
xxxxxxx  4
people1 method4
xxxxxxx  3
people1 method3
xxxxxxx  1
people1 method1
xxxxxxx  2
people1 method2
xxxxxxx  5
people1 method5
xxxxxxx  4
people1 method4

感谢阅读!

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一 为什么需要线程池
  • 二 准备工作
  • 三 类
  • 四 如何使用
    • demo 后面放到个人github
    相关产品与服务
    云数据库 Redis
    腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
    http://www.vxiaotou.com