MCPcopy Index your code
hub / github.com/gocelery/gocelery

github.com/gocelery/gocelery @v1.0

repository ↗ · DeepWiki ↗ · release v1.0 ↗ · Ask this repo → · + Follow
113 symbols 344 edges 18 files 79 documented · 70% updated 2y agov1.0 · 2019-03-01★ 2,48347 open issues
README

gocelery

Go Client/Server for Celery Distributed Task Queue

Build Status Coverage Status Go Report Card GoDoc License motivation FOSSA Status

Why?

Having being involved in a number of projects migrating server from python to go, I have realized Go can help improve performance of existing python web applications. Celery distributed tasks are used heavily in many python web applications and this library allows you to implement celery workers in Go as well as being able to submit celery tasks in Go.

You can also use this library as pure go distributed task queue.

Go Celery Worker in Action

demo

Supported Brokers/Backends

Now supporting both Redis and AMQP!!

  • Redis (broker/backend)
  • AMQP (broker/backend) - does not allow concurrent use of channels

Celery Configuration

Celery must be configured to use json instead of default pickle encoding. This is because Go currently has no stable support for decoding pickle objects. Pass below configuration parameters to use json.

CELERY_TASK_SERIALIZER='json',
CELERY_ACCEPT_CONTENT=['json'],  # Ignore other content
CELERY_RESULT_SERIALIZER='json',
CELERY_ENABLE_UTC=True,

Celery Worker Example

Run Celery Worker implemented in Go

// example/worker/main.go

// Celery Task
func add(a int, b int) int {
    return a + b
}

func main() {
    // create broker and backend
    celeryBroker := gocelery.NewRedisCeleryBroker("redis://localhost:6379")
    celeryBackend := gocelery.NewRedisCeleryBackend("redis://localhost:6379")

    // use AMQP instead
    // celeryBroker := gocelery.NewAMQPCeleryBroker("amqp://")
    // celeryBackend := gocelery.NewAMQPCeleryBackend("amqp://")

    // Configure with 2 celery workers
    celeryClient, _ := gocelery.NewCeleryClient(celeryBroker, celeryBackend, 2)

    // worker.add name reflects "add" task method found in "worker.py"
    celeryClient.Register("worker.add", add)

    // Start Worker - blocking method
    go celeryClient.StartWorker()

    // Wait 30 seconds and stop all workers
    time.Sleep(30 * time.Second)
    celeryClient.StopWorker()
}
go run example/worker/main.go

You can use custom struct instead to hold shared structures.


type MyStruct struct {
    MyInt int
}

func (so *MyStruct) add(a int, b int) int {
    return a + b + so.MyInt
}

// code omitted ...

ms := &MyStruct{10}
celeryClient.Register("worker.add", ms.add)

// code omitted ...

Submit Task from Python Client

# example/test.py

from celery import Celery

app = Celery('tasks',
    broker='redis://localhost:6379',
    backend='redis://localhost:6379'
)

@app.task
def add(x, y):
    return x + y

if __name__ == '__main__':
    # submit celery task to be executed in Go workers
    ar = add.apply_async((5456, 2878), serializer='json')
    print(ar.get())
python example/test.py

Celery Client Example

Run Celery Worker implemented in Python

# example/worker.py

from celery import Celery

app = Celery('tasks',
    broker='redis://localhost:6379',
    backend='redis://localhost:6379'
)

@app.task
def add(x, y):
    return x + y
cd example
celery -A worker worker --loglevel=debug --without-heartbeat --without-mingle

Submit Task from Go Client

func main() {
    // create broker and backend
    celeryBroker := gocelery.NewRedisCeleryBroker("redis://localhost:6379")
    celeryBackend := gocelery.NewRedisCeleryBackend("redis://localhost:6379")

    // use AMQP instead
    // celeryBroker := gocelery.NewAMQPCeleryBroker("amqp://")
    // celeryBackend := gocelery.NewAMQPCeleryBackend("amqp://")

    // create client
    celeryClient, _ := gocelery.NewCeleryClient(celeryBroker, celeryBackend, 0)

    // send task
    asyncResult, err := celeryClient.Delay("worker.add", 3, 5)
    if err != nil {
        panic(err)
    }

    // check if result is ready
    isReady, _ := asyncResult.Ready()
    fmt.Printf("ready status %v\n", isReady)

    // get result with 5s timeout
    res, err = asyncResult.Get(5 * time.Second)
    if err != nil {
        fmt.Println(err)
    } else {
        fmt.Println(res)
    }
}
go run example/client/main.go

Sample Celery Task Message

{
    "expires": null,
    "utc": true,
    "args": [5456, 2878],
    "chord": null,
    "callbacks": null,
    "errbacks": null,
    "taskset": null,
    "id": "c8535050-68f1-4e18-9f32-f52f1aab6d9b",
    "retries": 0,
    "task": "worker.add",
    "timelimit": [null, null],
    "eta": null,
    "kwargs": {}
}

Contributing

You are more than welcome to make any contributions. Please create Pull Request for any changes.

LICENSE

The gocelery is offered under MIT license.

FOSSA Status

Extension points exported contracts — how you extend this code

CeleryBroker (Interface)
CeleryBroker is interface for celery broker database [2 implementers]
gocelery.go
CeleryBackend (Interface)
CeleryBackend is interface for celery backend database [2 implementers]
gocelery.go
CeleryTask (Interface)
CeleryTask is an interface that represents actual task Passing CeleryTask interface instead of function pointer avoids r [2 …
gocelery.go

Core symbols most depended-on inside this repo

Get
called by 16
gocelery.go
generateUUID
called by 15
uuid.go
NewRedisCeleryBroker
called by 5
redis_broker.go
GetTaskMessage
called by 5
gocelery.go
NewRedisCeleryBackend
called by 5
redis_backend.go
releaseCeleryMessage
called by 4
message.go
getReflectionResultMessage
called by 4
message.go
releaseResultMessage
called by 4
message.go

Shape

Function 51
Method 43
Struct 16
Interface 3

Languages

Go98%
Python2%

Modules by API surface

gocelery.go21 symbols
message.go18 symbols
amqp_broker.go13 symbols
gocelery_test.go10 symbols
worker.go9 symbols
worker_test.go7 symbols
redis_broker.go6 symbols
amqp_backend.go6 symbols
example/worker/main.go5 symbols
broker_test.go5 symbols
redis_backend.go4 symbols
backend_test.go4 symbols

For agents

$ claude mcp add gocelery \
  -- python -m otcore.mcp_server <graph>

⬇ download graph artifact