Embed an IMAP email client into a Celery worker by extending Celery with bootsteps.[source]

Register the email client subsystem in the application boot steps. module

class, email=False, **kwargs)[source]

Bases: EmailBootStep

Run the global email receiver in background thread.

name = 'email client'
stop(consumer)[source] module

Definitions of custom Celery signals related to emails.

These signals allow us to keep the VOEvent validation code decoupled from the email client itself. = <Signal: email_received providing_args={'rfc822'}>

Fired whenever an email message is received.


rfc822 (bytes) – The RFC 822 contents of the message.


Register an email listener like this:

import email
import email.policy

def on_email_received(rfc822, **kwargs):
    # Parse the RFC822 email.
    message = email.message_from_bytes(rfc822, policy=email.policy.default)
    # Print some of the message headers.
    print('Subject:', message['Subject'])
    print('From:', message['From'])
    # Print the plain-text message body.
    body = message.get_body(['plain']).get_content()