前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >milvus upsert流程源码分析

milvus upsert流程源码分析

原创
作者头像
melodyshu
发布2024-02-27 17:15:24
1200
发布2024-02-27 17:15:24
举报
文章被收录于专栏:milvus数据库milvus数据库

milvus版本:v2.3.2

整体架构:

Upsert 的数据流向:

1.客户端sdk发出Upsert API请求。

代码语言:python
复制
import numpy as np
from pymilvus import (
    connections,
    Collection,
)

num_entities, dim = 4, 3

print("start connecting to Milvus")
connections.connect("default", host="192.168.230.71", port="19530")

hello_milvus = Collection("hello_milvus")

print("Start upsert entities")
rng = np.random.default_rng(seed=19530)
entities = [
    [0,1,2,4000],
    [10,11,12,4000],
    rng.random((num_entities, dim)),
]
hello_milvus.upsert(entities)

2.服务端接受API请求,将request封装为upsertTask,并压入dmQueue队列。

注意这里是dmQueue。DDL类型的是ddQueue。

代码路径:internal\proxy\impl.go

代码语言:go
复制
// Upsert upsert records into collection.
func (node *Proxy) Upsert(ctx context.Context, request *milvuspb.UpsertRequest) (*milvuspb.MutationResult, error) {
	......
    // request封装为upsertTask
	it := &upsertTask{
		baseMsg: msgstream.BaseMsg{
			HashValues: request.HashKeys,
		},
		ctx:       ctx,
		Condition: NewTaskCondition(ctx),
		req:       request,
		result: &milvuspb.MutationResult{
			Status: merr.Success(),
			IDs: &schemapb.IDs{
				IdField: nil,
			},
		},

		idAllocator:   node.rowIDAllocator,
		segIDAssigner: node.segAssigner,
		chMgr:         node.chMgr,
		chTicker:      node.chTicker,
	}

	......
    // 将task压入dmQueue队列
	if err := node.sched.dmQueue.Enqueue(it); err != nil {
		......
	}

	......
    // 等待任务执行完
	if err := it.WaitToFinish(); err != nil {
		......
	}

	......
}

3.执行upsertTask的3个方法PreExecute、Execute、PostExecute。

PreExecute()一般为参数校验等工作。

Execute()为真正执行逻辑。

PostExecute()执行完后的逻辑,什么都不做,返回nil。

代码路径:internal\proxy\task_upsert.go

代码语言:go
复制
func (it *upsertTask) Execute(ctx context.Context) (err error) {
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-Upsert-Execute")
	defer sp.End()
	log := log.Ctx(ctx).With(zap.String("collectionName", it.req.CollectionName))

	tr := timerecord.NewTimeRecorder(fmt.Sprintf("proxy execute upsert %d", it.ID()))
	// 拿到stream,类型为msgstream.mqMsgStream
    stream, err := it.chMgr.getOrCreateDmlStream(it.collectionID)
	if err != nil {
		return err
	}
    // 创建msgPack
	msgPack := &msgstream.MsgPack{
		BeginTs: it.BeginTs(),
		EndTs:   it.EndTs(),
	}
    // 添加insertMsgPack
	err = it.insertExecute(ctx, msgPack)
	if err != nil {
		log.Warn("Fail to insertExecute", zap.Error(err))
		return err
	}
    // 添加deleteMsgPack
	err = it.deleteExecute(ctx, msgPack)
	if err != nil {
		log.Warn("Fail to deleteExecute", zap.Error(err))
		return err
	}

	tr.RecordSpan()
    // 发送数据至mq
	err = stream.Produce(msgPack)
	if err != nil {
		it.result.Status = merr.Status(err)
		return err
	}
	sendMsgDur := tr.RecordSpan()
	metrics.ProxySendMutationReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.UpsertLabel).Observe(float64(sendMsgDur.Milliseconds()))
	totalDur := tr.ElapseSpan()
	log.Debug("Proxy Upsert Execute done", zap.Int64("taskID", it.ID()),
		zap.Duration("total duration", totalDur))
	return nil
}

msgPack变量:

msgPack包含了insertRequest和deleteRequest。

insertRequest包含了客户端的upsert数据,以及还会有rowid,用来唯一标识一列数据。

deleteRequest包含主键值。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1.客户端sdk发出Upsert API请求。
  • 2.服务端接受API请求,将request封装为upsertTask,并压入dmQueue队列。
  • 3.执行upsertTask的3个方法PreExecute、Execute、PostExecute。
相关产品与服务
向量数据库
腾讯云向量数据库(Tencent Cloud VectorDB)是一款全托管的自研企业级分布式数据库服务,专用于存储、检索、分析多维向量数据。该数据库支持多种索引类型和相似度计算方法,单索引支持千亿级向量规模,可支持百万级 QPS 及毫秒级查询延迟。腾讯云向量数据库不仅能为大模型提供外部知识库,提高大模型回答的准确性,还可广泛应用于推荐系统、自然语言处理等 AI 领域。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
http://www.vxiaotou.com