Source code for gwcelery.email.bootsteps
from threading import Thread
from celery import bootsteps
from celery.utils.log import get_logger
from imapclient import IMAPClient
from safe_netrc import netrc
from .signals import email_received
__all__ = ('Receiver',)
log = get_logger(__name__)
class EmailBootStep(bootsteps.ConsumerStep):
"""Generic boot step to limit us to appropriate kinds of workers.
Only include this bootstep in workers that are started with the
``--email`` command line option.
"""
def __init__(self, consumer, email=False, **kwargs):
self.enabled = bool(email)
def start(self, consumer):
log.info('Starting %s', self.name)
def stop(self, consumer):
log.info('Stopping %s', self.name)
[docs]class Receiver(EmailBootStep):
"""Run the global email receiver in background thread."""
name = 'email client'
def _runloop(self):
username, _, password = netrc().authenticators(self._host)
with IMAPClient(self._host, use_uid=True) as conn:
conn.login(username, password)
conn.select_folder('inbox')
while self._running:
messages = conn.search()
for msgid, data in conn.fetch(messages, ['RFC822']).items():
email_received.send(None, rfc822=data[b'RFC822'])
conn.delete_messages(msgid)
conn.idle()
responses = []
while self._running and not responses:
responses = conn.idle_check(timeout=5)
conn.idle_done()
[docs] def create(self, consumer):
super().create(consumer)
self._host = consumer.app.conf['email_host']
self._running = True
self._thread = Thread(target=self._runloop)
[docs] def start(self, consumer):
super().start(consumer)
self._thread.start()
[docs] def stop(self, consumer):
super().stop(consumer)
self._running = False
self._thread.join()