前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊machinery的TaskProcessor

聊聊machinery的TaskProcessor

原创
作者头像
code4it
修改2021-04-06 11:24:27
3760
修改2021-04-06 11:24:27
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下machinery的TaskProcessor

TaskProcessor

代码语言:javascript
复制
// TaskProcessor - can process a delivered task
// This will probably always be a worker instance
type TaskProcessor interface {
    Process(signature *tasks.Signature) error
    CustomQueue() string
    PreConsumeHandler() bool
}

TaskProcessor接口定义了Process、CustomQueue、PreConsumeHandler方法

Worker

代码语言:javascript
复制
// Worker represents a single worker process
type Worker struct {
    server            *Server
    ConsumerTag       string
    Concurrency       int
    Queue             string
    errorHandler      func(err error)
    preTaskHandler    func(*tasks.Signature)
    postTaskHandler   func(*tasks.Signature)
    preConsumeHandler func(*Worker) bool
}

// CustomQueue returns Custom Queue of the running worker process
func (worker *Worker) CustomQueue() string {
    return worker.Queue
}

// Process handles received tasks and triggers success/error callbacks
func (worker *Worker) Process(signature *tasks.Signature) error {
    // If the task is not registered with this worker, do not continue
    // but only return nil as we do not want to restart the worker process
    if !worker.server.IsTaskRegistered(signature.Name) {
        return nil
    }

    taskFunc, err := worker.server.GetRegisteredTask(signature.Name)
    if err != nil {
        return nil
    }

    // Update task state to RECEIVED
    if err = worker.server.GetBackend().SetStateReceived(signature); err != nil {
        return fmt.Errorf("Set state to 'received' for task %s returned error: %s", signature.UUID, err)
    }

    // Prepare task for processing
    task, err := tasks.NewWithSignature(taskFunc, signature)
    // if this failed, it means the task is malformed, probably has invalid
    // signature, go directly to task failed without checking whether to retry
    if err != nil {
        worker.taskFailed(signature, err)
        return err
    }

    // try to extract trace span from headers and add it to the function context
    // so it can be used inside the function if it has context.Context as the first
    // argument. Start a new span if it isn't found.
    taskSpan := tracing.StartSpanFromHeaders(signature.Headers, signature.Name)
    tracing.AnnotateSpanWithSignatureInfo(taskSpan, signature)
    task.Context = opentracing.ContextWithSpan(task.Context, taskSpan)

    // Update task state to STARTED
    if err = worker.server.GetBackend().SetStateStarted(signature); err != nil {
        return fmt.Errorf("Set state to 'started' for task %s returned error: %s", signature.UUID, err)
    }

    //Run handler before the task is called
    if worker.preTaskHandler != nil {
        worker.preTaskHandler(signature)
    }

    //Defer run handler for the end of the task
    if worker.postTaskHandler != nil {
        defer worker.postTaskHandler(signature)
    }

    // Call the task
    results, err := task.Call()
    if err != nil {
        // If a tasks.ErrRetryTaskLater was returned from the task,
        // retry the task after specified duration
        retriableErr, ok := interface{}(err).(tasks.ErrRetryTaskLater)
        if ok {
            return worker.retryTaskIn(signature, retriableErr.RetryIn())
        }

        // Otherwise, execute default retry logic based on signature.RetryCount
        // and signature.RetryTimeout values
        if signature.RetryCount > 0 {
            return worker.taskRetry(signature)
        }

        return worker.taskFailed(signature, err)
    }

    return worker.taskSucceeded(signature, results)
}

//SetPreConsumeHandler sets a custom handler for the end of a job
func (worker *Worker) SetPreConsumeHandler(handler func(*Worker) bool) {
    worker.preConsumeHandler = handler
}

Worker实现了TaskProcessor接口,其Process方法先通过worker.server.GetRegisteredTask获取taskFunc,然后通过signature更新state为RECEIVED,之后设置为STARTED,之后执行task.Call(),最后根据结果更新task为failed或者success

小结

machinery的TaskProcessor接口定义了Process、CustomQueue、PreConsumeHandler方法。Worker实现了TaskProcessor接口,其Process方法先通过worker.server.GetRegisteredTask获取taskFunc,然后通过signature更新state为RECEIVED,之后设置为STARTED,之后执行task.Call(),最后根据结果更新task为failed或者success。

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • TaskProcessor
  • Worker
  • 小结
  • doc
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
http://www.vxiaotou.com