您好,登錄后才能下訂單哦!
這篇文章主要介紹“machinery中TaskProcessor的用法”,在日常操作中,相信很多人在machinery中TaskProcessor的用法問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”machinery中TaskProcessor的用法”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!
本文主要研究一下machinery的TaskProcessor
// TaskProcessor - can process a delivered task // This will probably always be a worker instance type TaskProcessor interface { Process(signature *tasks.Signature) error CustomQueue() string PreConsumeHandler() bool }
TaskProcessor接口定義了Process、CustomQueue、PreConsumeHandler方法
// Worker represents a single worker process type Worker struct { server *Server ConsumerTag string Concurrency int Queue string errorHandler func(err error) preTaskHandler func(*tasks.Signature) postTaskHandler func(*tasks.Signature) preConsumeHandler func(*Worker) bool } // CustomQueue returns Custom Queue of the running worker process func (worker *Worker) CustomQueue() string { return worker.Queue } // Process handles received tasks and triggers success/error callbacks func (worker *Worker) Process(signature *tasks.Signature) error { // If the task is not registered with this worker, do not continue // but only return nil as we do not want to restart the worker process if !worker.server.IsTaskRegistered(signature.Name) { return nil } taskFunc, err := worker.server.GetRegisteredTask(signature.Name) if err != nil { return nil } // Update task state to RECEIVED if err = worker.server.GetBackend().SetStateReceived(signature); err != nil { return fmt.Errorf("Set state to 'received' for task %s returned error: %s", signature.UUID, err) } // Prepare task for processing task, err := tasks.NewWithSignature(taskFunc, signature) // if this failed, it means the task is malformed, probably has invalid // signature, go directly to task failed without checking whether to retry if err != nil { worker.taskFailed(signature, err) return err } // try to extract trace span from headers and add it to the function context // so it can be used inside the function if it has context.Context as the first // argument. Start a new span if it isn't found. taskSpan := tracing.StartSpanFromHeaders(signature.Headers, signature.Name) tracing.AnnotateSpanWithSignatureInfo(taskSpan, signature) task.Context = opentracing.ContextWithSpan(task.Context, taskSpan) // Update task state to STARTED if err = worker.server.GetBackend().SetStateStarted(signature); err != nil { return fmt.Errorf("Set state to 'started' for task %s returned error: %s", signature.UUID, err) } //Run handler before the task is called if worker.preTaskHandler != nil { worker.preTaskHandler(signature) } //Defer run handler for the end of the task if worker.postTaskHandler != nil { defer worker.postTaskHandler(signature) } // Call the task results, err := task.Call() if err != nil { // If a tasks.ErrRetryTaskLater was returned from the task, // retry the task after specified duration retriableErr, ok := interface{}(err).(tasks.ErrRetryTaskLater) if ok { return worker.retryTaskIn(signature, retriableErr.RetryIn()) } // Otherwise, execute default retry logic based on signature.RetryCount // and signature.RetryTimeout values if signature.RetryCount > 0 { return worker.taskRetry(signature) } return worker.taskFailed(signature, err) } return worker.taskSucceeded(signature, results) } //SetPreConsumeHandler sets a custom handler for the end of a job func (worker *Worker) SetPreConsumeHandler(handler func(*Worker) bool) { worker.preConsumeHandler = handler }
Worker實現了TaskProcessor接口,其Process方法先通過worker.server.GetRegisteredTask獲取taskFunc,然后通過signature更新state為RECEIVED,之后設置為STARTED,之后執行task.Call(),最后根據結果更新task為failed或者success
machinery的TaskProcessor接口定義了Process、CustomQueue、PreConsumeHandler方法。Worker實現了TaskProcessor接口,其Process方法先通過worker.server.GetRegisteredTask獲取taskFunc,然后通過signature更新state為RECEIVED,之后設置為STARTED,之后執行task.Call(),最后根據結果更新task為failed或者success。
到此,關于“machinery中TaskProcessor的用法”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。