Documentation
¶
Index ¶
- Constants
- Variables
- func Bool(b bool) *bool
- func BuildAddWorkfNode(endNode *WorkflowTaskNodeDefinition, addNode *WorkflowTaskNodeDefinition, ...) error
- func GetWorkflowInstanceStatusText(status WorkflowInstanceStatus) string
- func GetWorkflowTaskNodeStatusText(status WorkflowTaskNodeStatus) string
- func IsOverWorkflowInstanceStatus(status WorkflowInstanceStatus) bool
- func IsOverWorkflowTaskNodeStatus(status WorkflowTaskNodeStatus) bool
- func IsSeriousError(err error) bool
- func LoadWorkflowConfig(config *WorkflowConfig) error
- func PreloadingWorkflowDefinition() error
- func RegisterWorkflowTask(workflowType string, taskKey string, taskWorker WorkflowTaskNodeWorker) error
- func String(s string) *string
- func StructUnmarshal(ctx *JSONContext, v any) error
- func UniqueStr(arr []string) []string
- type AddNodeExternalEventParams
- type AsynchronousWaitCheckFunc
- type BaseTaskWorker
- type CreateWorkflowReq
- type EmptyTaskWorker
- type EndNodeWorker
- type JSONContext
- func (c *JSONContext) Clone() *JSONContext
- func (c *JSONContext) Delete(keys ...string)
- func (c *JSONContext) Get(keys ...string) (any, bool)
- func (c *JSONContext) GetBool(keys ...string) (bool, bool)
- func (c *JSONContext) GetFloat64(keys ...string) (float64, bool)
- func (c *JSONContext) GetInt64(keys ...string) (int64, bool)
- func (c *JSONContext) GetString(keys ...string) (string, bool)
- func (c *JSONContext) Set(keys []string, value any) error
- func (c *JSONContext) ToBytes() ([]byte, error)
- func (c *JSONContext) ToBytesWithoutError() []byte
- func (c *JSONContext) ToMap() map[string]any
- func (c *JSONContext) ToRawMessage() (json.RawMessage, error)
- func (c *JSONContext) Unmarshal(v any) error
- type NodeContextKey
- type NodeDefinitionConfig
- type NodeExternalEvent
- type NormalTaskWorker
- type Pager
- type QueryWorkflowInstanceParams
- type QueryWorkflowTaskInstanceParams
- type RegisterWorkflowTaskParams
- type RestartWorkflowNodeParams
- type RestartWorkflowParams
- type RootNodeWorker
- type RunFunc
- type TaskInstanceEntity
- type UpdateWorkflowInstanceField
- type UpdateWorkflowInstanceParams
- type UpdateWorkflowInstanceWhere
- type UpdateWorkflowTaskInstanceField
- type UpdateWorkflowTaskInstanceParams
- type UpdateWorkflowTaskInstanceWhere
- type WorkflowConfig
- type WorkflowDefinition
- type WorkflowInstance
- type WorkflowInstanceDetailEntity
- type WorkflowInstancePo
- type WorkflowInstanceStatus
- type WorkflowLock
- type WorkflowRepo
- type WorkflowService
- type WorkflowServiceImpl
- func (s *WorkflowServiceImpl) AddNodeExternalEvent(ctx context.Context, addParams *AddNodeExternalEventParams) error
- func (s *WorkflowServiceImpl) CancelWorkflowInstance(ctx context.Context, workflowInstanceID int64) error
- func (s *WorkflowServiceImpl) CountWorkflowInstance(ctx context.Context, params *QueryWorkflowInstanceParams) (int64, error)
- func (s *WorkflowServiceImpl) CreateWorkflow(ctx context.Context, req *CreateWorkflowReq) (*WorkflowInstance, error)
- func (s *WorkflowServiceImpl) QueryWorkflowInstanceDetail(ctx context.Context, params *QueryWorkflowInstanceParams) ([]*WorkflowInstanceDetailEntity, error)
- func (s *WorkflowServiceImpl) QueryWorkflowInstancePo(ctx context.Context, params *QueryWorkflowInstanceParams) ([]*WorkflowInstancePo, error)
- func (s *WorkflowServiceImpl) RestartWorkflowInstance(ctx context.Context, restartParams *RestartWorkflowParams) error
- func (s *WorkflowServiceImpl) RestartWorkflowNode(ctx context.Context, restartParams *RestartWorkflowNodeParams) error
- func (s *WorkflowServiceImpl) RunWorkflow(ctx context.Context, workflowID int64) error
- type WorkflowTaskInstancePo
- type WorkflowTaskNode
- type WorkflowTaskNodeDefinition
- type WorkflowTaskNodeStatus
- type WorkflowTaskNodeWorker
Constants ¶
const ( WorkflowTaskNodeStatusStatusUnCreated WorkflowInstanceStatus = "uncreated" // 数据库中不存在这个状态,用于标识工作流任务实例任务未创建 WorkflowTaskNodeStatusInit WorkflowTaskNodeStatus = "init" // 初始化状态,数据库里面没有这种状态 WorkflowTaskNodeStatusRestarting WorkflowTaskNodeStatus = "restarting" // 重新启动状态,状态上和init是是一样的,但是初始化工作已经完成了 WorkflowTaskNodeStatusRunning WorkflowTaskNodeStatus = "running" WorkflowTaskNodeStatusPending WorkflowTaskNodeStatus = "pending" WorkflowTaskNodeStatusFinishing WorkflowTaskNodeStatus = "finishing" // 完成, 工作流终止状态, 不再重试 普遍含义: 任务执行成功, 注意针对ErrorWorkflowTaskFailedWithContinue这个错误,任务节点是completed而不是failed WorkflowTaskNodeStatusCompleted WorkflowTaskNodeStatus = "completed" // 失败, 工作流终止状态, 不再重试 普遍含义: 任务执行失败, 工作流终止,某个节点原因导致工作流终止 WorkflowTaskNodeStatusFailed WorkflowTaskNodeStatus = "failed" // 取消, 工作流终止状态, 不再重试 普遍含义: 任务执行取消, 手动取消的,和人工手动操作有关系,目前没有使用到 WorkflowTaskNodeStatusCancelled WorkflowTaskNodeStatus = "canceled" )
Variables ¶
var ( LockFailedError = errors.New("lock failed") LockFailedTimeOutError = errors.New("wait time out") )
var ( // 参数错误,调用地方使用 ErrWorkflowParamInvalid = errors.New("invalid param") ErrWorkflowConfigNotFound = errors.New("workflow config not found") ErrWorkflowDefinitionNotFound = errors.New("workflow definition not found") ErrWorkflowTaskWorkerNotFound = errors.New("workflow task worker not found") ErrWorkflowTaskWorkerAlreadyRegistered = errors.New("workflow task worker already registered") ErrWorkflowInstanceNotFound = errors.New("workflow instance not found") ErrWorkflowTaskInstanceNotFound = errors.New("workflow task instance not found") // 特殊的error 会影响流程的error //ErrorWorkflowTaskInstanceNotReady: 当前阶段还没有准备好,需要过一会儿来重试 // 场景&应用: 审核中,每次都是审核中 ErrorWorkflowTaskInstanceNotReady = errors.New("workflow task instance not ready") // 任务实例没有准备好,是正常的错误,常见的 // ErrorWorkflowTaskFailedWithContinue: 任务实例失败,但是可以继续执行,可能当成另外一种完成 // 场景&应用: 一些异步任务,对这个任务结果不关系,成功或者失败都行 ErrorWorkflowTaskFailedWithContinue = errors.New("workflow task failed with continue") // 任务实例失败,但是可以继续执行 // ErrWorkflowTaskFailedWithFailed: 任务实例失败,这个任务失败,整个工作流状态变成failed // 场景&应用: 一些关键参数丢失,整个工作流需要终止,无论重试多少次都不会成功 ErrWorkflowTaskFailedWithFailed = errors.New("workflow task failed with termination") // 任务实例失败,但是终止整个工作流,工作流状态变成failed // 下面这个两个错误信息给业务上面使用,目前用于报警定义 // 如果你希望这种错误在定时脚本打印error 使用errors.Wrapf(ErrWorkBussinessCriticalError, "err message: %s", err) // 如果你希望这种错误在定时脚本打印warn 使用errors.Wrapf(ErrWorkBussinessWarningError, "err message: %s", err) ErrWorkBussinessCriticalError = errors.New("work bussiness critical error") // 业务严重错误,需要人工介入处理 ErrWorkBussinessWarningError = errors.New("work bussiness warning error") // 业务警告错误,答应warn级别日志 )
Functions ¶
func BuildAddWorkfNode ¶
func BuildAddWorkfNode(endNode *WorkflowTaskNodeDefinition, addNode *WorkflowTaskNodeDefinition, nextNodes []*WorkflowTaskNodeDefinition) error
func GetWorkflowInstanceStatusText ¶
func GetWorkflowInstanceStatusText(status WorkflowInstanceStatus) string
func GetWorkflowTaskNodeStatusText ¶
func GetWorkflowTaskNodeStatusText(status WorkflowTaskNodeStatus) string
func IsOverWorkflowInstanceStatus ¶
func IsOverWorkflowInstanceStatus(status WorkflowInstanceStatus) bool
func IsOverWorkflowTaskNodeStatus ¶
func IsOverWorkflowTaskNodeStatus(status WorkflowTaskNodeStatus) bool
func IsSeriousError ¶
IsSeriousError 目前只用在workflow_trigger_cron.go中, 用于判断是否是严重错误,如果是严重错误,则打error级别日志, 否则打warn级别日志 严重错误定义:需要人工介入处理处理, 1. 当前工作流实例不会重试,异常结束,如果一些绑定异常的数据 2. 或者当前工作流实例没有办法正常运行,需要人工介入处理,如配置不正确
func LoadWorkflowConfig ¶
func LoadWorkflowConfig(config *WorkflowConfig) error
*
- @description: 加载工作流配置 只做存储使用,config转化会在CreateWorkflow中完成,延迟加载,主要是解决RegisterWorkflowTask的依赖
- @param config *WorkflowConfig
- @return error
func PreloadingWorkflowDefinition ¶
func PreloadingWorkflowDefinition() error
func RegisterWorkflowTask ¶
func RegisterWorkflowTask(workflowType string, taskKey string, taskWorker WorkflowTaskNodeWorker) error
*
- @description: 注册工作流任务节点
- @param ctx context.Context
- @param workflowType string
- @param taskKey string
- @param taskWorker WorkflowTaskWorker
- @return error *
func StructUnmarshal ¶
func StructUnmarshal(ctx *JSONContext, v any) error
Types ¶
type AddNodeExternalEventParams ¶
type AddNodeExternalEventParams struct {
WorkflowInstanceID int64 `json:"workflow_instance_id" validate:"gt=0"`
TaskType string `json:"task_type" validate:"required"`
// TaskInstanceID int64 `json:"task_instance_id" validate:"required"`
NodeEvent *NodeExternalEvent `json:"node_event" validate:"required"`
}
type AsynchronousWaitCheckFunc ¶
type AsynchronousWaitCheckFunc func(ctx context.Context, nodeContext *JSONContext) error
type BaseTaskWorker ¶
type BaseTaskWorker struct {
EmptyTaskWorker
}
func (BaseTaskWorker) AsynchronousWaitCheck ¶
func (w BaseTaskWorker) AsynchronousWaitCheck(ctx context.Context, nodeContext *JSONContext) error
*
- @description: 异步等待检查, base中不需要实现,需要实现,自己执行处理
type CreateWorkflowReq ¶
type EmptyTaskWorker ¶
type EmptyTaskWorker struct {
}
func (EmptyTaskWorker) AsynchronousWaitCheck ¶
func (w EmptyTaskWorker) AsynchronousWaitCheck(ctx context.Context, nodeContext *JSONContext) error
func (EmptyTaskWorker) Run ¶
func (w EmptyTaskWorker) Run(ctx context.Context, nodeContext *JSONContext) error
type EndNodeWorker ¶
type EndNodeWorker struct {
BaseTaskWorker
}
*
- @description: 结束节点工作器,工作流结束节点,特殊节点
func (EndNodeWorker) Run ¶
func (w EndNodeWorker) Run(ctx context.Context, nodeContext *JSONContext) error
type JSONContext ¶
type JSONContext struct {
// contains filtered or unexported fields
}
JSONContext 封装 JSON 上下文,提供便捷的读写方法
func MergeJSONContexts ¶
func MergeJSONContexts(contexts ...*JSONContext) *JSONContext
MergeJSONContexts 合并多个上下文(后面的会覆盖前面的)
func NewByte2StrctPbValue ¶
func NewByte2StrctPbValue(b []byte) *JSONContext
func NewJSONContextFromMap ¶
func NewJSONContextFromMap(m map[string]any) *JSONContext
NewJSONContextFromMap 从 map 创建上下文
func (*JSONContext) Get ¶
func (c *JSONContext) Get(keys ...string) (any, bool)
Get 获取值,支持嵌套路径 例如: Get("user", "name") 获取 user.name
func (*JSONContext) GetBool ¶
func (c *JSONContext) GetBool(keys ...string) (bool, bool)
GetBool 获取布尔值
func (*JSONContext) GetFloat64 ¶
func (c *JSONContext) GetFloat64(keys ...string) (float64, bool)
GetFloat64 获取 float64 值
func (*JSONContext) GetInt64 ¶
func (c *JSONContext) GetInt64(keys ...string) (int64, bool)
GetInt64 获取 int64 值
func (*JSONContext) GetString ¶
func (c *JSONContext) GetString(keys ...string) (string, bool)
GetString 获取字符串值
func (*JSONContext) Set ¶
func (c *JSONContext) Set(keys []string, value any) error
Set 设置值,支持嵌套路径 例如: Set([]string{"user", "name"}, "张三") 设置 user.name = "张三"
func (*JSONContext) ToBytesWithoutError ¶
func (c *JSONContext) ToBytesWithoutError() []byte
func (*JSONContext) ToRawMessage ¶
func (c *JSONContext) ToRawMessage() (json.RawMessage, error)
ToRawMessage 转换为 json.RawMessage
func (*JSONContext) Unmarshal ¶
func (c *JSONContext) Unmarshal(v any) error
Unmarshal 将上下文反序列化到指定结构体
type NodeContextKey ¶
type NodeContextKey = string
NodeContextKey 节点上下文key,用于获取节点上下文中的值
const ( NodeContextKeyNodeEvent NodeContextKey = "node_event" NodeContextKeySystem NodeContextKey = "system" NodeContextKeyPreNodeContext NodeContextKey = "pre_node_context" NodeContextKeyWorkflowContext NodeContextKey = "workflow_context" // 备注原因,一般和节点失败相关,表明为什么失败 NodeContextKeyReason NodeContextKey = "reason" )
type NodeDefinitionConfig ¶
type NodeDefinitionConfig struct {
ID string `json:"id"` // 节点ID, 唯一标识, 用于标识节点
Name string `json:"name"` // 节点名称
NextNodes []string `json:"next_nodes"` // 后置节点ID列表
FailMaxCount *int64 `json:"fail_max_count"` // 失败次数达到 fail_max_count次后,<0的忽略
MaxWaitTimeTs *int64 `json:"max_wait_time_ts"` // 最大等待时间,单位秒,<=0 忽略
}
NodeDefinitionConfig 节点定义配置
type NodeExternalEvent ¶
type NormalTaskWorker ¶
type NormalTaskWorker struct {
BaseTaskWorker
// contains filtered or unexported fields
}
func NewNormalTaskWorker ¶
func NewNormalTaskWorker( funcRun RunFunc, funcAsynchronousWaitCheck AsynchronousWaitCheckFunc, ) *NormalTaskWorker
func (NormalTaskWorker) AsynchronousWaitCheck ¶
func (w NormalTaskWorker) AsynchronousWaitCheck(ctx context.Context, nodeContext *JSONContext) error
func (NormalTaskWorker) Run ¶
func (w NormalTaskWorker) Run(ctx context.Context, nodeContext *JSONContext) error
type QueryWorkflowInstanceParams ¶
type QueryWorkflowInstanceParams struct {
WorkflowInstanceID *int64 `json:"workflow_instance_id"`
WorkflowTypeIn []string `json:"workflow_type_in"`
BusinessID *string `json:"business_id"`
StatusIn []string `json:"status_in"`
IDGreaterThan *int64 `json:"id_greater_than"`
TaskID *int64 `json:"task_id"`
OrderbyIDAsc *bool `json:"orderby_id_asc"`
Page *Pager `json:"page"`
}
type QueryWorkflowTaskInstanceParams ¶
type QueryWorkflowTaskInstanceParams struct {
WorkflowTaskInstanceID *int64 `json:"workflow_task_instance_id"`
WorkflowInstanceID *int64 `json:"workflow_instance_id"`
TaskType *string `json:"task_type"`
StatusIn []string `json:"status_in"`
IDGreaterThan *int64 `json:"id_greater_than"`
OrderbyIDAsc *bool `json:"orderby_id_asc"`
Page *Pager `json:"page"`
}
type RegisterWorkflowTaskParams ¶
type RegisterWorkflowTaskParams struct {
WorkflowType string
TaskKey string
TaskWorker WorkflowTaskNodeWorker
IsPublic bool // 是否公开,如果为true,则可以被其他工作流使用
}
RegisterWorkflowTaskParams 注册工作流任务节点参数
type RestartWorkflowParams ¶
type RootNodeWorker ¶
type RootNodeWorker struct {
BaseTaskWorker
}
*
- @description: 根节点工作器,工作流开始节点,特殊节点
func (RootNodeWorker) Run ¶
func (w RootNodeWorker) Run(ctx context.Context, nodeContext *JSONContext) error
type TaskInstanceEntity ¶
type UpdateWorkflowInstanceField ¶
type UpdateWorkflowInstanceField struct {
Status *string `json:"status"`
WorkflowContext *JSONContext `json:"workflow_context"`
}
type UpdateWorkflowInstanceParams ¶
type UpdateWorkflowInstanceParams struct {
Where *UpdateWorkflowInstanceWhere `json:"where" validate:"required"`
Fields *UpdateWorkflowInstanceField `json:"field" validate:"required"`
LimitMax int `json:"limit_max" validate:"required"`
}
type UpdateWorkflowTaskInstanceField ¶
type UpdateWorkflowTaskInstanceField struct {
Status *string `json:"status"`
NodeContext *JSONContext `json:"node_context"`
FailCount *int64 `json:"fail_count"`
}
type UpdateWorkflowTaskInstanceParams ¶
type UpdateWorkflowTaskInstanceParams struct {
Where *UpdateWorkflowTaskInstanceWhere `json:"where" validate:"required"`
Fields *UpdateWorkflowTaskInstanceField `json:"field" validate:"required"`
LimitMax int `json:"limit_max" validate:"required"`
}
type UpdateWorkflowTaskInstanceWhere ¶
type UpdateWorkflowTaskInstanceWhere struct {
IDIn []int64 `json:"id_in"`
}
type WorkflowConfig ¶
type WorkflowConfig struct {
ID string `json:"id"` // 工作流类型ID, 唯一标识, 用于标识工作流类型
Name string `json:"name"` // 工作流类型名称
Nodes []*NodeDefinitionConfig `json:"nodes"` // 构建工作流任务
}
WorkflowConfig 工作流配置,流程配置
type WorkflowDefinition ¶
type WorkflowDefinition struct {
ID string
Name string
NodesCount int64
RootNode *WorkflowTaskNodeDefinition
Nodes []*WorkflowTaskNodeDefinition // 节点列表,冗余字段,方便构建节点详情
}
WorkflowDefinition 工作流定义entity
func GetAndLoadWorkflowDefinition ¶
func GetAndLoadWorkflowDefinition(workflowType string) (*WorkflowDefinition, error)
type WorkflowInstance ¶
type WorkflowInstance struct {
ID int64
WorkflowType string
BusinessID string
Status string
WorkflowContext *JSONContext
TaskId int64
CreatedAt int64
UpdatedAt int64
Definitions *WorkflowDefinition
}
type WorkflowInstanceDetailEntity ¶
type WorkflowInstanceDetailEntity struct {
ID int64
WorkflowType string
BusinessID string
Status WorkflowInstanceStatus
WorkflowContext *JSONContext
CreatedAt int64
UpdatedAt int64
TaskInstances []*TaskInstanceEntity
}
type WorkflowInstancePo ¶
type WorkflowInstancePo struct {
ID int64 `gorm:"column:id;primaryKey;autoIncrement" json:"id" `
WorkflowType string `gorm:"column:workflow_type" json:"workflow_type"`
BusinessID string `gorm:"column:business_id" json:"business_id"`
Status WorkflowInstanceStatus `gorm:"column:status" json:"status"`
WorkflowContext []byte `gorm:"column:workflow_context" json:"workflow_context"` // 工作流上下文
TaskId int64 `gorm:"column:task_id" json:"task_id"`
CreatedAt int64 `gorm:"column:created_at" json:"created_at"`
UpdatedAt int64 `gorm:"column:updated_at" json:"updated_at"`
}
func (WorkflowInstancePo) TableName ¶
func (WorkflowInstancePo) TableName() string
type WorkflowInstanceStatus ¶
type WorkflowInstanceStatus = string
const ( WorkflowInstanceStatusInit WorkflowInstanceStatus = "init" WorkflowInstanceStatusRunning WorkflowInstanceStatus = "running" // 完成, 工作流终止状态, 不再重试 普遍含义: 任务执行成功 WorkflowInstanceStatusCompleted WorkflowInstanceStatus = "completed" // 失败, 工作流终止状态, 不再重试 普遍含义: 任务执行失败, 工作流终止,某个节点原因导致工作流终止 WorkflowInstanceStatusFailed WorkflowInstanceStatus = "failed" // 取消, 工作流终止状态, 不再重试 普遍含义: 任务执行取消, 手动取消的,和人工手动操作有关系,目前没有使用到 WorkflowInstanceStatusCancelled WorkflowInstanceStatus = "canceled" )
type WorkflowLock ¶
type WorkflowLock interface {
// NonBlockingSynchronized
// @Description: 1.非阻塞同步块,如果没有拿到锁,立刻返回错误
// 2.可以重入锁
// @param ctx 原来的ctx
// @param key 分布式锁的的key
// @param maxLockTimeDuration 锁最大的时间
// @param f 具体执行函数的闭包
// @return error
NonBlockingSynchronized(ctx context.Context, key string, maxLockTimeDuration time.Duration, f func(context.Context) error) error
}
func NewLocalWorkflowLock ¶
func NewLocalWorkflowLock() WorkflowLock
func NewRedisWorkflowLock ¶
func NewRedisWorkflowLock(redisClient redis.Cmdable) WorkflowLock
type WorkflowRepo ¶
type WorkflowRepo interface {
CreateWorkflowInstance(ctx context.Context, workflowInstance *WorkflowInstancePo) (*WorkflowInstancePo, error)
CreateWorkflowTaskInstance(ctx context.Context, workflowTaskInstance *WorkflowTaskInstancePo) (*WorkflowTaskInstancePo, error)
QueryWorkflowInstance(ctx context.Context, param *QueryWorkflowInstanceParams) ([]*WorkflowInstancePo, error)
CountWorkflowInstance(ctx context.Context, param *QueryWorkflowInstanceParams) (int64, error)
QueryWorkflowTaskInstance(ctx context.Context, param *QueryWorkflowTaskInstanceParams) ([]*WorkflowTaskInstancePo, error)
UpdateWorkflowInstance(ctx context.Context, param *UpdateWorkflowInstanceParams) error
UpdateWorkflowTaskInstance(ctx context.Context, param *UpdateWorkflowTaskInstanceParams) error
Transaction(ctx context.Context, fn func(ctx context.Context) error) error
}
func NewWorkflowRepo ¶
func NewWorkflowRepo(db *gorm.DB) WorkflowRepo
type WorkflowService ¶
type WorkflowService interface {
/**
* @description: 创建工作流
* @param ctx context.Context
* @param req *CreateWorkflowReq
* @return *WorkflowInstance, error
*/
CreateWorkflow(ctx context.Context, req *CreateWorkflowReq) (*WorkflowInstance, error)
/**
* @description: 查询工作流实例数量
* @param ctx context.Context
* @param params *QueryWorkflowInstanceParams
* @return int64, error
*/
CountWorkflowInstance(ctx context.Context, params *QueryWorkflowInstanceParams) (int64, error)
/**
* @description: 查询工作流实例详情
* @param ctx context.Context
* @param params *QueryWorkflowInstanceParams
* @return []*WorkflowInstanceDetailEntity, error
*/
QueryWorkflowInstanceDetail(ctx context.Context, params *QueryWorkflowInstanceParams) ([]*WorkflowInstanceDetailEntity, error)
/**
* @description: 查询工作流实例Po
* @param ctx context.Context
* @param params *QueryWorkflowInstanceParams
* @return []*WorkflowInstancePo, error
*/
QueryWorkflowInstancePo(ctx context.Context, params *QueryWorkflowInstanceParams) ([]*WorkflowInstancePo, error)
/**
* @description: 运行工作流
* workflowID 为工作流实例ID,一个工作流实例只会被一个goroutine运行
* 如果有其他goroutine正在运行该工作流实例,则返回错误
* @param ctx context.Context
* @param workflowID int64
* @return error
*/
RunWorkflow(ctx context.Context, workflowID int64) error
/**
* @description: 取消工作流,手动取消工作流,目前没有使用,将来使用扩展
* workflowInstanceID 为工作流实例ID,一个工作流实例只会被一个goroutine运行
* 如果有其他goroutine正在运行该工作流实例,则返回错误
* @param ctx context.Context
* @param workflowInstanceID int64
* @return error
*/
CancelWorkflowInstance(ctx context.Context, workflowInstanceID int64) error
/**
* @description: 添加节点外部事件, 给外部输入使用,会写入节点上下文
* 一个工作流实例只会被一个goroutine运行,如果有其他goroutine正在运行该工作流实例,则返回错误
* 会将事件addParams.NodeEvent写入额外节点上下文的 NodeContextKeyNodeEvent(node_event)
* @param ctx context.Context
* @param addParams *AddNodeExternalEventParams
* addParams.WorkflowInstanceID 为工作流实例ID
* addParams.TaskType 为任务类型
* addParams.NodeEvent 为节点事件
* addParams.NodeEvent.EventTs 为事件时间, 会用来做版本控制,最新的版本会覆盖旧的版本
* addParams.NodeEvent.EventContent 为事件内容
* @return error
*/
AddNodeExternalEvent(ctx context.Context, addParams *AddNodeExternalEventParams) error
/**
* @description: 重新开始工作流任务
* 如果isAsynchronous为true,一次性只能有一个进程操作工作流,可能会出现当前工作流正在被其他进程操作的情况
* 所以失败的情况可能会出现的比较频繁
* @param ctx context.Context
* @param restartParams *RestartWorkflowTaskParams
* restartWorkflowNodeParams.WorkflowInstanceID 为工作流实例ID
* restartWorkflowNodeParams.TaskType 为任务类型
* restartWorkflowNodeParams.IsAsynchronous 为是否异步重启,如果为true,则不等待任务执行完成,直接返回
* @return error
*/
RestartWorkflowNode(ctx context.Context, restartWorkflowNodeParams *RestartWorkflowNodeParams) error
/**
* @description: 重启工作流实例, 只有失败和取消状态可以重启,正常完成的不能重启
* @param ctx context.Context
* @param restartWorkflowParams *RestartWorkflowParams 重启工作流参数
* restartWorkflowParams.WorkflowInstanceID 为工作流实例ID
* restartWorkflowParams.Context 为上下文,如果有值,则覆盖掉原来的上下文
* restartWorkflowParams.IsRun 为是否立即执行,如果为true,则立即执行
* @return error 重启工作流实例
*/
RestartWorkflowInstance(ctx context.Context, restartWorkflowParams *RestartWorkflowParams) error
}
func NewWorkflowService ¶
func NewWorkflowService(repo WorkflowRepo, executeLock WorkflowLock) WorkflowService
type WorkflowServiceImpl ¶
type WorkflowServiceImpl struct {
// contains filtered or unexported fields
}
WorkflowServiceImpl 工作流服务
func (*WorkflowServiceImpl) AddNodeExternalEvent ¶
func (s *WorkflowServiceImpl) AddNodeExternalEvent(ctx context.Context, addParams *AddNodeExternalEventParams) error
添加节点外部事件,会覆盖掉旧的事件
func (*WorkflowServiceImpl) CancelWorkflowInstance ¶
func (s *WorkflowServiceImpl) CancelWorkflowInstance(ctx context.Context, workflowInstanceID int64) error
func (*WorkflowServiceImpl) CountWorkflowInstance ¶
func (s *WorkflowServiceImpl) CountWorkflowInstance(ctx context.Context, params *QueryWorkflowInstanceParams) (int64, error)
func (*WorkflowServiceImpl) CreateWorkflow ¶
func (s *WorkflowServiceImpl) CreateWorkflow(ctx context.Context, req *CreateWorkflowReq) (*WorkflowInstance, error)
func (*WorkflowServiceImpl) QueryWorkflowInstanceDetail ¶
func (s *WorkflowServiceImpl) QueryWorkflowInstanceDetail(ctx context.Context, params *QueryWorkflowInstanceParams) ([]*WorkflowInstanceDetailEntity, error)
func (*WorkflowServiceImpl) QueryWorkflowInstancePo ¶
func (s *WorkflowServiceImpl) QueryWorkflowInstancePo(ctx context.Context, params *QueryWorkflowInstanceParams) ([]*WorkflowInstancePo, error)
func (*WorkflowServiceImpl) RestartWorkflowInstance ¶
func (s *WorkflowServiceImpl) RestartWorkflowInstance(ctx context.Context, restartParams *RestartWorkflowParams) error
func (*WorkflowServiceImpl) RestartWorkflowNode ¶
func (s *WorkflowServiceImpl) RestartWorkflowNode(ctx context.Context, restartParams *RestartWorkflowNodeParams) error
func (*WorkflowServiceImpl) RunWorkflow ¶
func (s *WorkflowServiceImpl) RunWorkflow(ctx context.Context, workflowID int64) error
type WorkflowTaskInstancePo ¶
type WorkflowTaskInstancePo struct {
ID int64 `gorm:"column:id;primaryKey;autoIncrement"`
WorkflowInstanceID int64 `gorm:"column:workflow_instance_id"`
TaskType string `gorm:"column:task_type"`
Status WorkflowTaskNodeStatus `gorm:"column:status"`
FailCount int64 `gorm:"column:fail_count"`
NodeContext []byte `gorm:"column:node_context"` // 节点上下文, input,output结合在一起
CreatedAt int64 `gorm:"column:created_at"`
UpdatedAt int64 `gorm:"column:updated_at"`
}
func (WorkflowTaskInstancePo) TableName ¶
func (WorkflowTaskInstancePo) TableName() string
type WorkflowTaskNode ¶
type WorkflowTaskNode struct {
ID int64
WorkflowInstanceID int64
TaskType string
Status string
NodeContext *JSONContext
CreatedAt int64
UpdatedAt int64
FailCount int64
}
WorkflowTaskNode 工作流任务节点entity
type WorkflowTaskNodeDefinition ¶
type WorkflowTaskNodeDefinition struct {
TaskType string
TaskName string
FailMaxCount int64 // 失败次数达到 fail_max_count次后,<0的忽略
MaxWaitTimeTs int64 // 最大等待时间,单位秒,<=0 忽略
PreNodes []*WorkflowTaskNodeDefinition
NextNodes []*WorkflowTaskNodeDefinition
TaskWorker WorkflowTaskNodeWorker // 工作流任务节点工作器,需要外部实现
}
WorkflowTaskNodeDefinition 工作流任务节点定义entity
func NewEndTaskNodeDefinition ¶
func NewEndTaskNodeDefinition() *WorkflowTaskNodeDefinition
func NewRootTaskNodeDefinition ¶
func NewRootTaskNodeDefinition() *WorkflowTaskNodeDefinition
type WorkflowTaskNodeStatus ¶
type WorkflowTaskNodeStatus = string
type WorkflowTaskNodeWorker ¶
type WorkflowTaskNodeWorker interface {
/**
* @description: 任务执行
* @return error nil表示执行成功了,不需要处理
* @param ctx context.Context 上下文
* @param nodeContext *JSONContext 节点上下文, run函数自己维护,run中更改了nodeContext,那么就会同步更改到数据库里面
*/
Run(ctx context.Context, nodeContext *JSONContext) error
/**
* @description: 异步等待检查
* @return error nil表示检查成功了,不需要处理
* @param ctx context.Context 上下文
* @param nodeContext *JSONContext 节点上下文, run函数自己维护,run中更改了nodeContext,那么就会同步更改到数据库里面
*/
AsynchronousWaitCheck(ctx context.Context, nodeContext *JSONContext) error
}
WorkflowTaskNodeWorker 工作流任务节点工作器,需要外部实现