Task registers a standard (non-durable) task with the workflow
(opts create.WorkflowTask[I, O], fn func(ctx worker.HatchetContext, input I) (interface{}, error))
| 217 | |
| 218 | // Task registers a standard (non-durable) task with the workflow |
| 219 | func (w *workflowDeclarationImpl[I, O]) Task(opts create.WorkflowTask[I, O], fn func(ctx worker.HatchetContext, input I) (interface{}, error)) *task.TaskDeclaration[I] { |
| 220 | name := opts.Name |
| 221 | |
| 222 | // Use reflection to validate the function type |
| 223 | fnType := reflect.TypeOf(fn) |
| 224 | if fnType.Kind() != reflect.Func || |
| 225 | fnType.NumIn() != 2 || |
| 226 | fnType.NumOut() != 2 || |
| 227 | !fnType.Out(1).Implements(reflect.TypeOf((*error)(nil)).Elem()) { |
| 228 | panic("Invalid function type for task " + name + ": must be func(I, worker.HatchetContext) (*T, error)") |
| 229 | } |
| 230 | |
| 231 | // Create a setter function that can set this specific output type to the corresponding field in O |
| 232 | w.outputSetters[name] = func(result *O, output interface{}) { |
| 233 | resultValue := reflect.ValueOf(result).Elem() |
| 234 | field := resultValue.FieldByName(name) |
| 235 | |
| 236 | // If the field isn't found by name, try to find it by JSON tag |
| 237 | resultType := resultValue.Type() |
| 238 | for i := 0; i < resultType.NumField(); i++ { |
| 239 | fieldType := resultType.Field(i) |
| 240 | jsonTag := fieldType.Tag.Get("json") |
| 241 | // Extract the name part from the json tag (before any comma) |
| 242 | if commaIdx := strings.Index(jsonTag, ","); commaIdx > 0 { |
| 243 | jsonTag = jsonTag[:commaIdx] |
| 244 | } |
| 245 | if jsonTag == name || strings.EqualFold(fieldType.Name, name) { |
| 246 | field = resultValue.Field(i) |
| 247 | break |
| 248 | } |
| 249 | } |
| 250 | |
| 251 | if field.IsValid() && field.CanSet() { |
| 252 | outputValue := reflect.ValueOf(output).Elem() |
| 253 | field.Set(outputValue) |
| 254 | } |
| 255 | } |
| 256 | |
| 257 | // Create a generic task function that wraps the specific one |
| 258 | genericFn := func(ctx worker.HatchetContext, input I) (*any, error) { |
| 259 | // Use reflection to call the specific function |
| 260 | fnValue := reflect.ValueOf(fn) |
| 261 | inputs := []reflect.Value{reflect.ValueOf(ctx), reflect.ValueOf(input)} |
| 262 | results := fnValue.Call(inputs) |
| 263 | |
| 264 | // Handle errors |
| 265 | if !results[1].IsNil() { |
| 266 | return nil, results[1].Interface().(error) |
| 267 | } |
| 268 | |
| 269 | // Return the output as any |
| 270 | output := results[0].Interface() |
| 271 | return &output, nil |
| 272 | } |
| 273 | |
| 274 | // Initialize pointers only for non-zero values |
| 275 | var retryBackoffFactor *float32 |
| 276 | var retryMaxBackoffSeconds *int32 |