Skip to content

Python distribute Celery

celery env setup

Install and config Redis

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
sudo apt-get install redis
cp /etc/redis/redis.conf

# modify following line
> bind 127.0.0.1 ::1
> protected-mode yes
## into

> # bind 127.0.0.1 ::1
> protected-mode no

# start redis with custom config
sudo redis-server /etc/redis/redis_all.conf &

install python modules

1
2
pip install redis
pip install celery

simple celery example

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
# task.py
from celery import Celery
from time import sleep

REDIS_IP = x.x.x.x

app = Celery('tasks', backend=REDIS_IP, broker=REDIS_IP)

@app.task
def async_job(x, y):
    sleep(0.1)
    return x + y

Install celery and redis. Start worker on each machine.

1
celery -A tasks worker -l INFO

Stop worker on each machine, Ctrl + c

Start worker on each machine in the background.

1
celery multi start w1 -A tasks -l INFO

Restart worker on each machine in the background.

1
celery multi restart w1 -A tasks -l INFO

Stop worker on each machine in the background.

1
celery multi stop w1 -A tasks -l INFO

On client machine, with ipython as example

1
2
from tasks import cadd
%timeit -n 1 -r 1 [cadd.delay(i, i) for i in range(1000)]

You will found task is running on each hosts.

celery Manual routing

Assign different queque for each tasks

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
from kombu import Exchange, Queue
app.conf.task_queues = {
    Queue('feed_tasks',    routing_key='feed.#'),
    Queue('regular_tasks', routing_key='task.#'),
    Queue('image_tasks',   exchange=Exchange('mediatasks', type='direct'),
                           routing_key='image.compress'),
}

app.conf.task_routes = {
    ('feed.tasks.*', {'queue': 'feeds'}),
    ('web.tasks.*', {'queue': 'web'}),
    (re.compile(r'(video|image)\.tasks\..*'), {'queue': 'media'}),
}

Note: celery 4.0 introduced new lower case settings and setting organization. for example: CELERY_QUEUES is old style config, which is replace by task_queues.


Github based Comments, sign in Github required.