workflow

package
v0.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 25, 2025 License: MIT Imports: 14 Imported by: 0

Documentation

Index

Constants

View Source
const (
	WorkflowTaskNodeStatusStatusUnCreated WorkflowInstanceStatus = "uncreated"  // 数据库中不存在这个状态,用于标识工作流任务实例任务未创建
	WorkflowTaskNodeStatusInit            WorkflowTaskNodeStatus = "init"       // 初始化状态,数据库里面没有这种状态
	WorkflowTaskNodeStatusRestarting      WorkflowTaskNodeStatus = "restarting" // 重新启动状态,状态上和init是是一样的,但是初始化工作已经完成了
	WorkflowTaskNodeStatusRunning         WorkflowTaskNodeStatus = "running"
	WorkflowTaskNodeStatusPending         WorkflowTaskNodeStatus = "pending"
	WorkflowTaskNodeStatusFinishing       WorkflowTaskNodeStatus = "finishing"
	// 完成, 工作流终止状态, 不再重试 普遍含义: 任务执行成功, 注意针对ErrorWorkflowTaskFailedWithContinue这个错误,任务节点是completed而不是failed
	WorkflowTaskNodeStatusCompleted WorkflowTaskNodeStatus = "completed"
	// 失败, 工作流终止状态, 不再重试 普遍含义: 任务执行失败, 工作流终止,某个节点原因导致工作流终止
	WorkflowTaskNodeStatusFailed WorkflowTaskNodeStatus = "failed"
	// 取消, 工作流终止状态, 不再重试 普遍含义: 任务执行取消, 手动取消的,和人工手动操作有关系,目前没有使用到
	WorkflowTaskNodeStatusCancelled WorkflowTaskNodeStatus = "canceled"
)

Variables

View Source
var (
	LockFailedError        = errors.New("lock failed")
	LockFailedTimeOutError = errors.New("wait time out")
)
View Source
var (
	// 参数错误,调用地方使用
	ErrWorkflowParamInvalid = errors.New("invalid param")

	ErrWorkflowConfigNotFound              = errors.New("workflow config not found")
	ErrWorkflowDefinitionNotFound          = errors.New("workflow definition not found")
	ErrWorkflowTaskWorkerNotFound          = errors.New("workflow task worker not found")
	ErrWorkflowTaskWorkerAlreadyRegistered = errors.New("workflow task worker already registered")
	ErrWorkflowInstanceNotFound            = errors.New("workflow instance not found")
	ErrWorkflowTaskInstanceNotFound        = errors.New("workflow task instance not found")
	// 特殊的error 会影响流程的error
	//ErrorWorkflowTaskInstanceNotReady: 当前阶段还没有准备好,需要过一会儿来重试
	// 场景&应用: 审核中,每次都是审核中
	ErrorWorkflowTaskInstanceNotReady = errors.New("workflow task instance not ready") // 任务实例没有准备好,是正常的错误,常见的
	// ErrorWorkflowTaskFailedWithContinue: 任务实例失败,但是可以继续执行,可能当成另外一种完成
	// 场景&应用: 一些异步任务,对这个任务结果不关系,成功或者失败都行
	ErrorWorkflowTaskFailedWithContinue = errors.New("workflow task failed with continue") // 任务实例失败,但是可以继续执行
	// ErrWorkflowTaskFailedWithFailed: 任务实例失败,这个任务失败,整个工作流状态变成failed
	// 场景&应用: 一些关键参数丢失,整个工作流需要终止,无论重试多少次都不会成功
	ErrWorkflowTaskFailedWithFailed = errors.New("workflow task failed with termination") // 任务实例失败,但是终止整个工作流,工作流状态变成failed

	// 下面这个两个错误信息给业务上面使用,目前用于报警定义
	// 如果你希望这种错误在定时脚本打印error 使用errors.Wrapf(ErrWorkBussinessCriticalError, "err message: %s", err)
	// 如果你希望这种错误在定时脚本打印warn 使用errors.Wrapf(ErrWorkBussinessWarningError, "err message: %s", err)
	ErrWorkBussinessCriticalError = errors.New("work bussiness critical error") // 业务严重错误,需要人工介入处理
	ErrWorkBussinessWarningError  = errors.New("work bussiness warning error")  // 业务警告错误,答应warn级别日志
)

Functions

func Bool

func Bool(b bool) *bool

func BuildAddWorkfNode

func BuildAddWorkfNode(endNode *WorkflowTaskNodeDefinition, addNode *WorkflowTaskNodeDefinition, nextNodes []*WorkflowTaskNodeDefinition) error

func GetWorkflowInstanceStatusText

func GetWorkflowInstanceStatusText(status WorkflowInstanceStatus) string

func GetWorkflowTaskNodeStatusText

func GetWorkflowTaskNodeStatusText(status WorkflowTaskNodeStatus) string

func IsOverWorkflowInstanceStatus

func IsOverWorkflowInstanceStatus(status WorkflowInstanceStatus) bool

func IsOverWorkflowTaskNodeStatus

func IsOverWorkflowTaskNodeStatus(status WorkflowTaskNodeStatus) bool

func IsSeriousError

func IsSeriousError(err error) bool

IsSeriousError 目前只用在workflow_trigger_cron.go中, 用于判断是否是严重错误,如果是严重错误,则打error级别日志, 否则打warn级别日志 严重错误定义:需要人工介入处理处理, 1. 当前工作流实例不会重试,异常结束,如果一些绑定异常的数据 2. 或者当前工作流实例没有办法正常运行,需要人工介入处理,如配置不正确

func LoadWorkflowConfig

func LoadWorkflowConfig(config *WorkflowConfig) error

*

  • @description: 加载工作流配置 只做存储使用,config转化会在CreateWorkflow中完成,延迟加载,主要是解决RegisterWorkflowTask的依赖
  • @param config *WorkflowConfig
  • @return error

func PreloadingWorkflowDefinition

func PreloadingWorkflowDefinition() error

func RegisterWorkflowTask

func RegisterWorkflowTask(workflowType string, taskKey string, taskWorker WorkflowTaskNodeWorker) error

*

  • @description: 注册工作流任务节点
  • @param ctx context.Context
  • @param workflowType string
  • @param taskKey string
  • @param taskWorker WorkflowTaskWorker
  • @return error *

func String

func String(s string) *string

辅助函数:替代 String 和 Bool

func StructUnmarshal

func StructUnmarshal(ctx *JSONContext, v any) error

func UniqueStr

func UniqueStr(arr []string) []string

Types

type AddNodeExternalEventParams

type AddNodeExternalEventParams struct {
	WorkflowInstanceID int64  `json:"workflow_instance_id" validate:"gt=0"`
	TaskType           string `json:"task_type" validate:"required"`
	// TaskInstanceID     int64              `json:"task_instance_id" validate:"required"`
	NodeEvent *NodeExternalEvent `json:"node_event" validate:"required"`
}

type AsynchronousWaitCheckFunc

type AsynchronousWaitCheckFunc func(ctx context.Context, nodeContext *JSONContext) error

type BaseTaskWorker

type BaseTaskWorker struct {
	EmptyTaskWorker
}

func (BaseTaskWorker) AsynchronousWaitCheck

func (w BaseTaskWorker) AsynchronousWaitCheck(ctx context.Context, nodeContext *JSONContext) error

*

  • @description: 异步等待检查, base中不需要实现,需要实现,自己执行处理

type CreateWorkflowReq

type CreateWorkflowReq struct {
	WorkflowType string         // 工作流类型
	BusinessID   string         // 业务ID
	Context      map[string]any // 上下文,可以为空
	IsRun        bool           // 是否立即执行,如果为true,则立即执行
	TaskId       int64          // 任务id
}

type EmptyTaskWorker

type EmptyTaskWorker struct {
}

func (EmptyTaskWorker) AsynchronousWaitCheck

func (w EmptyTaskWorker) AsynchronousWaitCheck(ctx context.Context, nodeContext *JSONContext) error

func (EmptyTaskWorker) Run

func (w EmptyTaskWorker) Run(ctx context.Context, nodeContext *JSONContext) error

type EndNodeWorker

type EndNodeWorker struct {
	BaseTaskWorker
}

*

  • @description: 结束节点工作器,工作流结束节点,特殊节点

func (EndNodeWorker) Run

func (w EndNodeWorker) Run(ctx context.Context, nodeContext *JSONContext) error

type JSONContext

type JSONContext struct {
	// contains filtered or unexported fields
}

JSONContext 封装 JSON 上下文,提供便捷的读写方法

func MergeJSONContexts

func MergeJSONContexts(contexts ...*JSONContext) *JSONContext

MergeJSONContexts 合并多个上下文(后面的会覆盖前面的)

func NewByte2StrctPbValue

func NewByte2StrctPbValue(b []byte) *JSONContext

func NewJSONContext

func NewJSONContext(b []byte) *JSONContext

NewJSONContext 从字节创建 JSON 上下文

func NewJSONContextFromMap

func NewJSONContextFromMap(m map[string]any) *JSONContext

NewJSONContextFromMap 从 map 创建上下文

func (*JSONContext) Clone

func (c *JSONContext) Clone() *JSONContext

Clone 深拷贝上下文

func (*JSONContext) Delete

func (c *JSONContext) Delete(keys ...string)

Delete 删除指定路径的值

func (*JSONContext) Get

func (c *JSONContext) Get(keys ...string) (any, bool)

Get 获取值,支持嵌套路径 例如: Get("user", "name") 获取 user.name

func (*JSONContext) GetBool

func (c *JSONContext) GetBool(keys ...string) (bool, bool)

GetBool 获取布尔值

func (*JSONContext) GetFloat64

func (c *JSONContext) GetFloat64(keys ...string) (float64, bool)

GetFloat64 获取 float64 值

func (*JSONContext) GetInt64

func (c *JSONContext) GetInt64(keys ...string) (int64, bool)

GetInt64 获取 int64 值

func (*JSONContext) GetString

func (c *JSONContext) GetString(keys ...string) (string, bool)

GetString 获取字符串值

func (*JSONContext) Set

func (c *JSONContext) Set(keys []string, value any) error

Set 设置值,支持嵌套路径 例如: Set([]string{"user", "name"}, "张三") 设置 user.name = "张三"

func (*JSONContext) ToBytes

func (c *JSONContext) ToBytes() ([]byte, error)

ToBytes 转换为 JSON 字节

func (*JSONContext) ToBytesWithoutError

func (c *JSONContext) ToBytesWithoutError() []byte

func (*JSONContext) ToMap

func (c *JSONContext) ToMap() map[string]any

ToMap 返回底层 map(注意:返回的是引用)

func (*JSONContext) ToRawMessage

func (c *JSONContext) ToRawMessage() (json.RawMessage, error)

ToRawMessage 转换为 json.RawMessage

func (*JSONContext) Unmarshal

func (c *JSONContext) Unmarshal(v any) error

Unmarshal 将上下文反序列化到指定结构体

type NodeContextKey

type NodeContextKey = string

NodeContextKey 节点上下文key,用于获取节点上下文中的值

const (
	NodeContextKeyNodeEvent       NodeContextKey = "node_event"
	NodeContextKeySystem          NodeContextKey = "system"
	NodeContextKeyPreNodeContext  NodeContextKey = "pre_node_context"
	NodeContextKeyWorkflowContext NodeContextKey = "workflow_context"
	// 备注原因,一般和节点失败相关,表明为什么失败
	NodeContextKeyReason NodeContextKey = "reason"
)

type NodeDefinitionConfig

type NodeDefinitionConfig struct {
	ID            string   `json:"id"`               // 节点ID, 唯一标识, 用于标识节点
	Name          string   `json:"name"`             // 节点名称
	NextNodes     []string `json:"next_nodes"`       // 后置节点ID列表
	FailMaxCount  *int64   `json:"fail_max_count"`   // 失败次数达到 fail_max_count次后,<0的忽略
	MaxWaitTimeTs *int64   `json:"max_wait_time_ts"` // 最大等待时间,单位秒,<=0 忽略
}

NodeDefinitionConfig 节点定义配置

type NodeExternalEvent

type NodeExternalEvent struct {
	// 事件时间,单位秒,会用来做版本控制,最新的版本会覆盖旧的版本
	EventTs      int64  `json:"event_ts"`
	EventContent string `json:"event_content"`
}

type NormalTaskWorker

type NormalTaskWorker struct {
	BaseTaskWorker
	// contains filtered or unexported fields
}

func NewNormalTaskWorker

func NewNormalTaskWorker(
	funcRun RunFunc,
	funcAsynchronousWaitCheck AsynchronousWaitCheckFunc,
) *NormalTaskWorker

func (NormalTaskWorker) AsynchronousWaitCheck

func (w NormalTaskWorker) AsynchronousWaitCheck(ctx context.Context, nodeContext *JSONContext) error

func (NormalTaskWorker) Run

func (w NormalTaskWorker) Run(ctx context.Context, nodeContext *JSONContext) error

type Pager

type Pager struct {
	IsNoLimit *bool `json:"is_no_limit"`
	Page      int64 `json:"page"`
	Size      int64 `json:"size"`
}

type QueryWorkflowInstanceParams

type QueryWorkflowInstanceParams struct {
	WorkflowInstanceID *int64   `json:"workflow_instance_id"`
	WorkflowTypeIn     []string `json:"workflow_type_in"`
	BusinessID         *string  `json:"business_id"`
	StatusIn           []string `json:"status_in"`
	IDGreaterThan      *int64   `json:"id_greater_than"`
	TaskID             *int64   `json:"task_id"`
	OrderbyIDAsc       *bool    `json:"orderby_id_asc"`
	Page               *Pager   `json:"page"`
}

type QueryWorkflowTaskInstanceParams

type QueryWorkflowTaskInstanceParams struct {
	WorkflowTaskInstanceID *int64   `json:"workflow_task_instance_id"`
	WorkflowInstanceID     *int64   `json:"workflow_instance_id"`
	TaskType               *string  `json:"task_type"`
	StatusIn               []string `json:"status_in"`
	IDGreaterThan          *int64   `json:"id_greater_than"`
	OrderbyIDAsc           *bool    `json:"orderby_id_asc"`
	Page                   *Pager   `json:"page"`
}

type RegisterWorkflowTaskParams

type RegisterWorkflowTaskParams struct {
	WorkflowType string
	TaskKey      string
	TaskWorker   WorkflowTaskNodeWorker
	IsPublic     bool // 是否公开,如果为true,则可以被其他工作流使用
}

RegisterWorkflowTaskParams 注册工作流任务节点参数

type RestartWorkflowNodeParams

type RestartWorkflowNodeParams struct {
	WorkflowInstanceID      int64  `json:"workflow_instance_id" validate:"gt=0"`
	TaskType                string `json:"task_type" validate:"required"`
	IsForcedRestartWorkflow bool   `json:"is_forced_restart_workflow"` // 是否强制重启工作流,如果是true即使工作流实例已经结束,也会重启工作流

}

type RestartWorkflowParams

type RestartWorkflowParams struct {
	WorkflowInstanceID int64 `json:"workflow_instance_id" validate:"gt=0"`
	// Context            map[string]any // 上下文,如果有值,则覆盖掉原来的上下文
	IsRun bool // 是否立即执行,如果为true,则立即执行
}

type RootNodeWorker

type RootNodeWorker struct {
	BaseTaskWorker
}

*

  • @description: 根节点工作器,工作流开始节点,特殊节点

func (RootNodeWorker) Run

func (w RootNodeWorker) Run(ctx context.Context, nodeContext *JSONContext) error

type RunFunc

type RunFunc func(ctx context.Context, nodeContext *JSONContext) error

type TaskInstanceEntity

type TaskInstanceEntity struct {
	ID                 int64 //ID 可能为0,因为还没有创建
	WorkflowInstanceID int64
	TaskType           string
	TaskName           string
	Status             string
	NodeContext        *JSONContext
	CreatedAt          int64
	UpdatedAt          int64
	PreNodesKeys       []string
	NextNodesKeys      []string
}

type UpdateWorkflowInstanceField

type UpdateWorkflowInstanceField struct {
	Status          *string      `json:"status"`
	WorkflowContext *JSONContext `json:"workflow_context"`
}

type UpdateWorkflowInstanceParams

type UpdateWorkflowInstanceParams struct {
	Where    *UpdateWorkflowInstanceWhere `json:"where" validate:"required"`
	Fields   *UpdateWorkflowInstanceField `json:"field" validate:"required"`
	LimitMax int                          `json:"limit_max" validate:"required"`
}

type UpdateWorkflowInstanceWhere

type UpdateWorkflowInstanceWhere struct {
	IDIn     []int64  `json:"id_in"`
	StatusIn []string `json:"status_in"`
}

type UpdateWorkflowTaskInstanceField

type UpdateWorkflowTaskInstanceField struct {
	Status      *string      `json:"status"`
	NodeContext *JSONContext `json:"node_context"`
	FailCount   *int64       `json:"fail_count"`
}

type UpdateWorkflowTaskInstanceParams

type UpdateWorkflowTaskInstanceParams struct {
	Where    *UpdateWorkflowTaskInstanceWhere `json:"where" validate:"required"`
	Fields   *UpdateWorkflowTaskInstanceField `json:"field" validate:"required"`
	LimitMax int                              `json:"limit_max" validate:"required"`
}

type UpdateWorkflowTaskInstanceWhere

type UpdateWorkflowTaskInstanceWhere struct {
	IDIn []int64 `json:"id_in"`
}

type WorkflowConfig

type WorkflowConfig struct {
	ID    string                  `json:"id"`    // 工作流类型ID, 唯一标识, 用于标识工作流类型
	Name  string                  `json:"name"`  // 工作流类型名称
	Nodes []*NodeDefinitionConfig `json:"nodes"` // 构建工作流任务
}

WorkflowConfig 工作流配置,流程配置

type WorkflowDefinition

type WorkflowDefinition struct {
	ID         string
	Name       string
	NodesCount int64
	RootNode   *WorkflowTaskNodeDefinition
	Nodes      []*WorkflowTaskNodeDefinition // 节点列表,冗余字段,方便构建节点详情
}

WorkflowDefinition 工作流定义entity

func GetAndLoadWorkflowDefinition

func GetAndLoadWorkflowDefinition(workflowType string) (*WorkflowDefinition, error)

type WorkflowInstance

type WorkflowInstance struct {
	ID              int64
	WorkflowType    string
	BusinessID      string
	Status          string
	WorkflowContext *JSONContext
	TaskId          int64
	CreatedAt       int64
	UpdatedAt       int64
	Definitions     *WorkflowDefinition
}

type WorkflowInstanceDetailEntity

type WorkflowInstanceDetailEntity struct {
	ID              int64
	WorkflowType    string
	BusinessID      string
	Status          WorkflowInstanceStatus
	WorkflowContext *JSONContext
	CreatedAt       int64
	UpdatedAt       int64
	TaskInstances   []*TaskInstanceEntity
}

type WorkflowInstancePo

type WorkflowInstancePo struct {
	ID              int64                  `gorm:"column:id;primaryKey;autoIncrement" json:"id"	`
	WorkflowType    string                 `gorm:"column:workflow_type" json:"workflow_type"`
	BusinessID      string                 `gorm:"column:business_id" json:"business_id"`
	Status          WorkflowInstanceStatus `gorm:"column:status" json:"status"`
	WorkflowContext []byte                 `gorm:"column:workflow_context" json:"workflow_context"` // 工作流上下文
	TaskId          int64                  `gorm:"column:task_id" json:"task_id"`
	CreatedAt       int64                  `gorm:"column:created_at" json:"created_at"`
	UpdatedAt       int64                  `gorm:"column:updated_at" json:"updated_at"`
}

func (WorkflowInstancePo) TableName

func (WorkflowInstancePo) TableName() string

type WorkflowInstanceStatus

type WorkflowInstanceStatus = string
const (
	WorkflowInstanceStatusInit    WorkflowInstanceStatus = "init"
	WorkflowInstanceStatusRunning WorkflowInstanceStatus = "running"
	// 完成, 工作流终止状态, 不再重试 普遍含义: 任务执行成功
	WorkflowInstanceStatusCompleted WorkflowInstanceStatus = "completed"
	// 失败, 工作流终止状态, 不再重试 普遍含义: 任务执行失败, 工作流终止,某个节点原因导致工作流终止
	WorkflowInstanceStatusFailed WorkflowInstanceStatus = "failed"
	// 取消, 工作流终止状态, 不再重试 普遍含义: 任务执行取消, 手动取消的,和人工手动操作有关系,目前没有使用到
	WorkflowInstanceStatusCancelled WorkflowInstanceStatus = "canceled"
)

type WorkflowLock

type WorkflowLock interface {
	// NonBlockingSynchronized
	//  @Description:  1.非阻塞同步块,如果没有拿到锁,立刻返回错误
	//                 2.可以重入锁
	//  @param ctx 原来的ctx
	//  @param key 分布式锁的的key
	//  @param maxLockTimeDuration 锁最大的时间
	//  @param f 具体执行函数的闭包
	//  @return error
	NonBlockingSynchronized(ctx context.Context, key string, maxLockTimeDuration time.Duration, f func(context.Context) error) error
}

func NewLocalWorkflowLock

func NewLocalWorkflowLock() WorkflowLock

func NewRedisWorkflowLock

func NewRedisWorkflowLock(redisClient redis.Cmdable) WorkflowLock

type WorkflowRepo

type WorkflowRepo interface {
	CreateWorkflowInstance(ctx context.Context, workflowInstance *WorkflowInstancePo) (*WorkflowInstancePo, error)
	CreateWorkflowTaskInstance(ctx context.Context, workflowTaskInstance *WorkflowTaskInstancePo) (*WorkflowTaskInstancePo, error)
	QueryWorkflowInstance(ctx context.Context, param *QueryWorkflowInstanceParams) ([]*WorkflowInstancePo, error)
	CountWorkflowInstance(ctx context.Context, param *QueryWorkflowInstanceParams) (int64, error)
	QueryWorkflowTaskInstance(ctx context.Context, param *QueryWorkflowTaskInstanceParams) ([]*WorkflowTaskInstancePo, error)
	UpdateWorkflowInstance(ctx context.Context, param *UpdateWorkflowInstanceParams) error
	UpdateWorkflowTaskInstance(ctx context.Context, param *UpdateWorkflowTaskInstanceParams) error
	Transaction(ctx context.Context, fn func(ctx context.Context) error) error
}

func NewWorkflowRepo

func NewWorkflowRepo(db *gorm.DB) WorkflowRepo

type WorkflowService

type WorkflowService interface {
	/**
	 * @description: 创建工作流
	 * @param ctx context.Context
	 * @param req *CreateWorkflowReq
	 * @return *WorkflowInstance, error
	 */
	CreateWorkflow(ctx context.Context, req *CreateWorkflowReq) (*WorkflowInstance, error)
	/**
	 * @description: 查询工作流实例数量
	 * @param ctx context.Context
	 * @param params *QueryWorkflowInstanceParams
	 * @return int64, error
	 */
	CountWorkflowInstance(ctx context.Context, params *QueryWorkflowInstanceParams) (int64, error)
	/**
	 * @description: 查询工作流实例详情
	 * @param ctx context.Context
	 * @param params *QueryWorkflowInstanceParams
	 * @return []*WorkflowInstanceDetailEntity, error
	 */
	QueryWorkflowInstanceDetail(ctx context.Context, params *QueryWorkflowInstanceParams) ([]*WorkflowInstanceDetailEntity, error)
	/**
	 * @description: 查询工作流实例Po
	 * @param ctx context.Context
	 * @param params *QueryWorkflowInstanceParams
	 * @return []*WorkflowInstancePo, error
	 */
	QueryWorkflowInstancePo(ctx context.Context, params *QueryWorkflowInstanceParams) ([]*WorkflowInstancePo, error)
	/**
	 * @description: 运行工作流
	 *				 workflowID 为工作流实例ID,一个工作流实例只会被一个goroutine运行
	 *				 如果有其他goroutine正在运行该工作流实例,则返回错误
	 * @param ctx context.Context
	 * @param workflowID int64
	 * @return error
	 */
	RunWorkflow(ctx context.Context, workflowID int64) error
	/**
	 * @description: 取消工作流,手动取消工作流,目前没有使用,将来使用扩展
	 *				 workflowInstanceID 为工作流实例ID,一个工作流实例只会被一个goroutine运行
	 *				 如果有其他goroutine正在运行该工作流实例,则返回错误
	 * @param ctx context.Context
	 * @param workflowInstanceID int64
	 * @return error
	 */
	CancelWorkflowInstance(ctx context.Context, workflowInstanceID int64) error

	/**
	 * @description: 添加节点外部事件, 给外部输入使用,会写入节点上下文
	 * 				  一个工作流实例只会被一个goroutine运行,如果有其他goroutine正在运行该工作流实例,则返回错误
	 *                会将事件addParams.NodeEvent写入额外节点上下文的 NodeContextKeyNodeEvent(node_event)
	 * @param ctx context.Context
	 * @param addParams *AddNodeExternalEventParams
	 *				  addParams.WorkflowInstanceID 为工作流实例ID
	 *				  addParams.TaskType 为任务类型
	 *				  addParams.NodeEvent 为节点事件
	 *				  addParams.NodeEvent.EventTs 为事件时间, 会用来做版本控制,最新的版本会覆盖旧的版本
	 *				  addParams.NodeEvent.EventContent 为事件内容
	 * @return error
	 */
	AddNodeExternalEvent(ctx context.Context, addParams *AddNodeExternalEventParams) error

	/**
	 * @description: 重新开始工作流任务
	 *                如果isAsynchronous为true,一次性只能有一个进程操作工作流,可能会出现当前工作流正在被其他进程操作的情况
	 *                所以失败的情况可能会出现的比较频繁
	 * @param ctx context.Context
	 * @param restartParams *RestartWorkflowTaskParams
	 *				  restartWorkflowNodeParams.WorkflowInstanceID 为工作流实例ID
	 *				  restartWorkflowNodeParams.TaskType 为任务类型
	 *				  restartWorkflowNodeParams.IsAsynchronous 为是否异步重启,如果为true,则不等待任务执行完成,直接返回
	 * @return error
	 */
	RestartWorkflowNode(ctx context.Context, restartWorkflowNodeParams *RestartWorkflowNodeParams) error

	/**
	 * @description: 重启工作流实例, 只有失败和取消状态可以重启,正常完成的不能重启
	 * @param ctx context.Context
	 * @param restartWorkflowParams *RestartWorkflowParams 重启工作流参数
	 *				  restartWorkflowParams.WorkflowInstanceID 为工作流实例ID
	 *				  restartWorkflowParams.Context 为上下文,如果有值,则覆盖掉原来的上下文
	 *				  restartWorkflowParams.IsRun 为是否立即执行,如果为true,则立即执行
	 * @return error 重启工作流实例
	 */
	RestartWorkflowInstance(ctx context.Context, restartWorkflowParams *RestartWorkflowParams) error
}

func NewWorkflowService

func NewWorkflowService(repo WorkflowRepo, executeLock WorkflowLock) WorkflowService

type WorkflowServiceImpl

type WorkflowServiceImpl struct {
	// contains filtered or unexported fields
}

WorkflowServiceImpl 工作流服务

func (*WorkflowServiceImpl) AddNodeExternalEvent

func (s *WorkflowServiceImpl) AddNodeExternalEvent(ctx context.Context, addParams *AddNodeExternalEventParams) error

添加节点外部事件,会覆盖掉旧的事件

func (*WorkflowServiceImpl) CancelWorkflowInstance

func (s *WorkflowServiceImpl) CancelWorkflowInstance(ctx context.Context, workflowInstanceID int64) error

func (*WorkflowServiceImpl) CountWorkflowInstance

func (s *WorkflowServiceImpl) CountWorkflowInstance(ctx context.Context, params *QueryWorkflowInstanceParams) (int64, error)

func (*WorkflowServiceImpl) CreateWorkflow

func (*WorkflowServiceImpl) QueryWorkflowInstanceDetail

func (s *WorkflowServiceImpl) QueryWorkflowInstanceDetail(ctx context.Context, params *QueryWorkflowInstanceParams) ([]*WorkflowInstanceDetailEntity, error)

func (*WorkflowServiceImpl) QueryWorkflowInstancePo

func (s *WorkflowServiceImpl) QueryWorkflowInstancePo(ctx context.Context, params *QueryWorkflowInstanceParams) ([]*WorkflowInstancePo, error)

func (*WorkflowServiceImpl) RestartWorkflowInstance

func (s *WorkflowServiceImpl) RestartWorkflowInstance(ctx context.Context, restartParams *RestartWorkflowParams) error

func (*WorkflowServiceImpl) RestartWorkflowNode

func (s *WorkflowServiceImpl) RestartWorkflowNode(ctx context.Context, restartParams *RestartWorkflowNodeParams) error

func (*WorkflowServiceImpl) RunWorkflow

func (s *WorkflowServiceImpl) RunWorkflow(ctx context.Context, workflowID int64) error

type WorkflowTaskInstancePo

type WorkflowTaskInstancePo struct {
	ID                 int64                  `gorm:"column:id;primaryKey;autoIncrement"`
	WorkflowInstanceID int64                  `gorm:"column:workflow_instance_id"`
	TaskType           string                 `gorm:"column:task_type"`
	Status             WorkflowTaskNodeStatus `gorm:"column:status"`
	FailCount          int64                  `gorm:"column:fail_count"`
	NodeContext        []byte                 `gorm:"column:node_context"` // 节点上下文, input,output结合在一起
	CreatedAt          int64                  `gorm:"column:created_at"`
	UpdatedAt          int64                  `gorm:"column:updated_at"`
}

func (WorkflowTaskInstancePo) TableName

func (WorkflowTaskInstancePo) TableName() string

type WorkflowTaskNode

type WorkflowTaskNode struct {
	ID                 int64
	WorkflowInstanceID int64
	TaskType           string
	Status             string
	NodeContext        *JSONContext
	CreatedAt          int64
	UpdatedAt          int64
	FailCount          int64
}

WorkflowTaskNode 工作流任务节点entity

type WorkflowTaskNodeDefinition

type WorkflowTaskNodeDefinition struct {
	TaskType      string
	TaskName      string
	FailMaxCount  int64 // 失败次数达到 fail_max_count次后,<0的忽略
	MaxWaitTimeTs int64 // 最大等待时间,单位秒,<=0 忽略
	PreNodes      []*WorkflowTaskNodeDefinition
	NextNodes     []*WorkflowTaskNodeDefinition
	TaskWorker    WorkflowTaskNodeWorker // 工作流任务节点工作器,需要外部实现
}

WorkflowTaskNodeDefinition 工作流任务节点定义entity

func NewEndTaskNodeDefinition

func NewEndTaskNodeDefinition() *WorkflowTaskNodeDefinition

func NewRootTaskNodeDefinition

func NewRootTaskNodeDefinition() *WorkflowTaskNodeDefinition

type WorkflowTaskNodeStatus

type WorkflowTaskNodeStatus = string

type WorkflowTaskNodeWorker

type WorkflowTaskNodeWorker interface {
	/**
	 * @description:  任务执行
	 * @return error nil表示执行成功了,不需要处理
	 * @param ctx context.Context 上下文
	 * @param nodeContext *JSONContext 节点上下文, run函数自己维护,run中更改了nodeContext,那么就会同步更改到数据库里面
	 */
	Run(ctx context.Context, nodeContext *JSONContext) error
	/**
	 * @description:  异步等待检查
	 * @return error nil表示检查成功了,不需要处理
	 * @param ctx context.Context 上下文
	 * @param nodeContext *JSONContext 节点上下文, run函数自己维护,run中更改了nodeContext,那么就会同步更改到数据库里面
	 */
	AsynchronousWaitCheck(ctx context.Context, nodeContext *JSONContext) error
}

WorkflowTaskNodeWorker 工作流任务节点工作器,需要外部实现

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL