Documentation
¶
Index ¶
- func Contains(src []string, target string) bool
- type CONTROL_CODE
- type ConfigManager
- type DagConfig
- type DagGraph
- type DagGraphNode
- type DagNodeHandler
- type HandlerManager
- type Job
- func (this *Job) ParseJobInputs(input map[string]interface{}) error
- func (this *Job) ParseJobOutputs() (map[string]interface{}, error)
- func (this *Job) Run(parentCtx context.Context, input map[string]interface{}) (output map[string]interface{}, err error)
- func (this *Job) Schedule(parentCtx context.Context) error
- type JobContext
- type NodeConfig
- type Task
- func (this *Task) ParseTaskInputs(context *JobContext) (map[string]interface{}, error)
- func (this *Task) ParseTaskOutputs(output map[string]interface{}, context *JobContext) error
- func (this *Task) RealRun(parentCtx context.Context, jobContext *JobContext)
- func (this *Task) Run(parentCtx context.Context, jobContext *JobContext, callback func())
- func (this *Task) WaitPre()
- type YamlConfig
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type CONTROL_CODE ¶
type CONTROL_CODE int
const ( EXECUTION_CONTINUE CONTROL_CODE = iota EXECUTION_SKIP_FOLLOWER_TASK EXECUTION_STOP_JOB )
type ConfigManager ¶
* 配置管理器
type DagConfig ¶
type DagConfig struct {
Name string `yaml:"name"`
Input []string `yaml:"input"`
Output []string `yaml:"output"`
Nodes map[string]*NodeConfig `yaml:"nodes"`
}
func ParseConfig ¶
type DagGraphNode ¶
type DagNodeHandler ¶
type DagNodeHandler interface {
Init(parentCtx context.Context, params map[string]interface{}) error //初始化方法
Process(parentCtx context.Context, input map[string]interface{}) (map[string]interface{}, error)
}
* dag节点处理器
type HandlerManager ¶
type HandlerManager interface {
CreateHandler(handlerName string) (DagNodeHandler, error)
}
* dag节点管理器
type Job ¶
* 一个计算任务
func (*Job) ParseJobInputs ¶
func (*Job) ParseJobOutputs ¶
func (*Job) Run ¶
func (this *Job) Run(parentCtx context.Context, input map[string]interface{}) (output map[string]interface{}, err error)
执行dag job。
每个dag task节点都有一个waitGroup,用于监听前序节点执行状态
|task| ----------> |task| ----> |task| ----> |task|
\ /
----> |task| ----> |task|
/
|task| ----------> |task| ----> |task| ----> |task|
type JobContext ¶
type JobContext struct {
sync.RWMutex
Config *DagConfig
GlobalVals *sync.Map
HandlerRegistry HandlerManager
}
func NewJobContext ¶
func NewJobContext(config *DagConfig, registry HandlerManager) *JobContext
func (*JobContext) GetVals ¶
func (this *JobContext) GetVals(paramNames []string) (map[string]interface{}, error)
func (*JobContext) UpdateVals ¶
func (this *JobContext) UpdateVals(params map[string]interface{})
type NodeConfig ¶
type Task ¶
func (*Task) ParseTaskInputs ¶
func (this *Task) ParseTaskInputs(context *JobContext) (map[string]interface{}, error)
func (*Task) ParseTaskOutputs ¶
func (this *Task) ParseTaskOutputs(output map[string]interface{}, context *JobContext) error
func (*Task) RealRun ¶
func (this *Task) RealRun(parentCtx context.Context, jobContext *JobContext)
* 执行任务
type YamlConfig ¶
type YamlConfig struct {
DagConfig *DagConfig `yaml:"dag_config"`
}
Click to show internal directories.
Click to hide internal directories.