I have a few tasks like this:
@celery.task
def generate():
sleep(1.0)
print "Generate done!"
return 'result'
@celery.task
def lower(result):
sleep(1.0)
print "Lower done!"
return result.lower()
@celery.task
def upper(result):
sleep(1.0)
print "Upper done!"
return result.upper()
@celery.task
def upload(result):
sleep(1.0)
print "Upload done for: %s!" % (result,)
return 'upload'
@celery.task
def callback(results):
print "It's all done! %s" % (results,)
I'm creating a chord that looks like this:
chord(
header=chain(
generate.s(),
group(
chain(lower.s(), upload.s()),
chain(upper.s(), upload.s())
)
), body=callback.s()
).delay()
The problem that I'm experiencing is that my callback, which is supposed to fire after all tasks have been completed, seems to fire right after generate.
In case it's not clear, the workflow is like this:
- Generate a result, then pass its result onto members of a group, so as to achieve parallelism:
- Group one will take the result from
generate, convert it to lowercase usinglower, and then upload the result usingupload. - Group two will take the result from
generate, convert it to uppercase usingupper, and then upload the result using `upload'.
- Group one will take the result from
- After all of this is done, the
callbacktask callback should be called.
Expected
The callback task will be called at least 3 seconds after starting.
Actual
The callback task is called around 1 second after starting, and does not wait for members of the group to finish executing.
Here are the logs proving that it doesn't wait for groups:
[2013-11-17 18:20:40,447: WARNING/PoolWorker-8] Generate done!
[2013-11-17 18:20:41,493: WARNING/PoolWorker-6] Upper done!
[2013-11-17 18:20:41,493: WARNING/PoolWorker-1] Lower done!
[2013-11-17 18:20:41,535: WARNING/PoolWorker-6] It's all done! [('e0016a35-d538-4e96-ad86-6ddf91ef4a09', [('b1af78a9-7935-4037-84e4-9fae6d7c027e', None), ('d69c4c99-af9c-476f-af7d-7f647c4d9c83', None)])]
[2013-11-17 18:20:42,522: WARNING/PoolWorker-7] Upload done for: result!
[2013-11-17 18:20:42,523: WARNING/PoolWorker-5] Upload done for: RESULT!
It seems that Celery doesn't wait on groups. Is there a way to have Celery wait until ALL tasks, including members of groups, are finished executing?