From 80b03000f6054b6fdfd357e6e420cccca3993c8e Mon Sep 17 00:00:00 2001 From: Ibar Date: Wed, 15 Apr 2026 18:02:38 +0700 Subject: [PATCH 1/6] fix thread related errors when running send_queued_mail --- post_office/models.py | 6 +++++ tests/test_commands.py | 50 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 56 insertions(+) diff --git a/post_office/models.py b/post_office/models.py index 2aad7ce1..e82d6ca0 100644 --- a/post_office/models.py +++ b/post_office/models.py @@ -112,8 +112,14 @@ def __str__(self): def email_message(self): """ Returns Django EmailMessage object for sending. + + The connection is always re-fetched from the thread-local registry so + that worker threads each use their own backend/session rather than + sharing the one that was embedded when prepare_email_message() ran in + the main thread. """ if self._cached_email_message: + self._cached_email_message.connection = connections[self.backend_alias or 'default'] return self._cached_email_message return self.prepare_email_message() diff --git a/tests/test_commands.py b/tests/test_commands.py index 70181d83..cb0946f3 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -1,5 +1,7 @@ import datetime import os +import threading +from unittest.mock import patch from django.core.files.base import ContentFile from django.core.management import call_command @@ -132,3 +134,51 @@ def test_failed_deliveries_logging(self): ) call_command('send_queued_mail', log_level=2) self.assertEqual(email.logs.count(), 1) + + @override_settings( + POST_OFFICE={ + 'BACKENDS': { + 'default': 'django.core.mail.backends.dummy.EmailBackend', + }, + 'BATCH_SIZE': 10, + 'THREADS_PER_PROCESS': 2, + } + ) + def test_send_queued_mail_threads_use_independent_connections(self): + """ + Worker threads must each obtain their own thread-local connection from the + registry rather than sharing the connection embedded during prepare_email_message(). + """ + for _ in range(3): + Email.objects.create(from_email='from@example.com', to=['to@example.com'], status=STATUS.queued) + + # Map thread_id -> set of connection object ids seen via email_message() + conn_usage: dict[int, set[int]] = {} + usage_lock = threading.Lock() + original_email_message = Email.email_message + + def tracking_email_message(self): + msg = original_email_message(self) + with usage_lock: + conn_usage.setdefault(threading.current_thread().ident, set()).add(id(msg.connection)) + return msg + + with patch.object(Email, 'email_message', tracking_email_message): + call_command('send_queued_mail', processes=1) + + self.assertEqual(Email.objects.filter(status=STATUS.sent).count(), 3) + + # email_message() is only called inside worker threads (dispatch runs in the pool) + self.assertGreater(len(conn_usage), 0, 'No email_message() calls were tracked') + + # Build a map from connection id -> set of threads that used it + conn_to_threads: dict[int, set[int]] = {} + for tid, conn_ids in conn_usage.items(): + for cid in conn_ids: + conn_to_threads.setdefault(cid, set()).add(tid) + + shared = {cid: tids for cid, tids in conn_to_threads.items() if len(tids) > 1} + self.assertFalse( + shared, + f'Backend connections were shared across threads - thread safety violated: {shared}', + ) From 3c86447e4b023d7bff1186a7208965b50dbfef61 Mon Sep 17 00:00:00 2001 From: Ibar Date: Thu, 16 Apr 2026 11:14:51 +0700 Subject: [PATCH 2/6] attach connection on dispatch() instead on prepare_email_message --- post_office/models.py | 18 ++++++------------ tests/test_commands.py | 22 +++++++++++----------- 2 files changed, 17 insertions(+), 23 deletions(-) diff --git a/post_office/models.py b/post_office/models.py index e82d6ca0..5db663e8 100644 --- a/post_office/models.py +++ b/post_office/models.py @@ -112,14 +112,8 @@ def __str__(self): def email_message(self): """ Returns Django EmailMessage object for sending. - - The connection is always re-fetched from the thread-local registry so - that worker threads each use their own backend/session rather than - sharing the one that was embedded when prepare_email_message() ran in - the main thread. """ if self._cached_email_message: - self._cached_email_message.connection = connections[self.backend_alias or 'default'] return self._cached_email_message return self.prepare_email_message() @@ -145,7 +139,6 @@ def prepare_email_message(self): multipart_template = None html_message = self.html_message - connection = connections[self.backend_alias or 'default'] if isinstance(self.headers, dict) or self.expires_at or self.message_id: headers = dict(self.headers or {}) if self.expires_at: @@ -165,7 +158,6 @@ def prepare_email_message(self): bcc=self.bcc, cc=self.cc, headers=headers, - connection=connection, ) msg.attach_alternative(html_message, 'text/html') else: @@ -177,7 +169,6 @@ def prepare_email_message(self): bcc=self.bcc, cc=self.cc, headers=headers, - connection=connection, ) msg.content_subtype = 'html' if hasattr(multipart_template, 'attach_related'): @@ -192,7 +183,6 @@ def prepare_email_message(self): bcc=self.bcc, cc=self.cc, headers=headers, - connection=connection, ) for attachment in self.attachments.all(): @@ -212,12 +202,16 @@ def prepare_email_message(self): self._cached_email_message = msg return msg - def dispatch(self, log_level=None, disconnect_after_delivery=True, commit=True): + def dispatch(self, log_level=None, disconnect_after_delivery=True, commit=True, connection=None): """ Sends email and log the result. """ try: - self.email_message().send() + msg = self.email_message() + if connection is None: + connection = connections[self.backend_alias or 'default'] + msg.connection = connection + msg.send() status = STATUS.sent message = '' exception_type = '' diff --git a/tests/test_commands.py b/tests/test_commands.py index cb0946f3..ad488e66 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -9,6 +9,7 @@ from django.test.utils import override_settings from django.utils.timezone import now +from post_office.connections import ConnectionHandler from post_office.models import STATUS, Attachment, Email @@ -146,30 +147,29 @@ def test_failed_deliveries_logging(self): ) def test_send_queued_mail_threads_use_independent_connections(self): """ - Worker threads must each obtain their own thread-local connection from the - registry rather than sharing the connection embedded during prepare_email_message(). + Worker threads must each obtain their own thread-local connection from + ConnectionHandler so that no single connection is shared across threads. """ for _ in range(3): Email.objects.create(from_email='from@example.com', to=['to@example.com'], status=STATUS.queued) - # Map thread_id -> set of connection object ids seen via email_message() + # Map thread_id -> set of connection object ids fetched from ConnectionHandler conn_usage: dict[int, set[int]] = {} usage_lock = threading.Lock() - original_email_message = Email.email_message + original_getitem = ConnectionHandler.__getitem__ - def tracking_email_message(self): - msg = original_email_message(self) + def tracking_getitem(self, alias): + conn = original_getitem(self, alias) with usage_lock: - conn_usage.setdefault(threading.current_thread().ident, set()).add(id(msg.connection)) - return msg + conn_usage.setdefault(threading.current_thread().ident, set()).add(id(conn)) + return conn - with patch.object(Email, 'email_message', tracking_email_message): + with patch.object(ConnectionHandler, '__getitem__', tracking_getitem): call_command('send_queued_mail', processes=1) self.assertEqual(Email.objects.filter(status=STATUS.sent).count(), 3) - # email_message() is only called inside worker threads (dispatch runs in the pool) - self.assertGreater(len(conn_usage), 0, 'No email_message() calls were tracked') + self.assertGreater(len(conn_usage), 0, 'No connections were fetched from ConnectionHandler') # Build a map from connection id -> set of threads that used it conn_to_threads: dict[int, set[int]] = {} From 65909b678ff1ece20ab8534dcad55e2a74f01270 Mon Sep 17 00:00:00 2001 From: Ibar Date: Thu, 16 Apr 2026 12:13:25 +0700 Subject: [PATCH 3/6] fix test + revert prepare_email_message + ruff --- post_office/mail.py | 21 +++++++++-------- post_office/models.py | 8 +++++-- tests/test_commands.py | 52 +----------------------------------------- tests/test_mail.py | 50 ++++++++++++++++++++++------------------ tests/test_models.py | 19 ++++++++++++++- 5 files changed, 64 insertions(+), 86 deletions(-) diff --git a/post_office/mail.py b/post_office/mail.py index 97a62a5b..cafc8c04 100644 --- a/post_office/mail.py +++ b/post_office/mail.py @@ -41,11 +41,12 @@ def _send_email(email: Email, log_level: int) -> tuple[bool, Optional[Exception]]: try: - email.dispatch(log_level=log_level, commit=False, disconnect_after_delivery=False) - logger.debug('Successfully sent email #%d' % email.id) + connection = connections[email.backend_alias or 'default'] + email.dispatch(log_level=log_level, commit=False, disconnect_after_delivery=False, connection=connection) + logger.debug(f'Successfully sent email #{email.id}') return True, None except Exception as e: - logger.exception('Failed to send email #%d' % email.id) + logger.exception(f'Failed to send email #{email.id}') return False, e @@ -162,17 +163,17 @@ def send( try: recipients = parse_emails(recipients) except ValidationError as e: - raise ValidationError('recipients: %s' % e.message) + raise ValidationError(f'recipients: {e.message}') try: cc = parse_emails(cc) except ValidationError as e: - raise ValidationError('c: %s' % e.message) + raise ValidationError(f'c: {e.message}') try: bcc = parse_emails(bcc) except ValidationError as e: - raise ValidationError('bcc: %s' % e.message) + raise ValidationError(f'bcc: {e.message}') if sender is None: sender = settings.DEFAULT_FROM_EMAIL @@ -206,7 +207,7 @@ def send( template = get_email_template(template, language) if backend and backend not in get_available_backends().keys(): - raise ValueError('%s is not a valid backend alias' % backend) + raise ValueError(f'{backend} is not a valid backend alias') email = create( sender, @@ -296,7 +297,7 @@ def send_queued(processes: int = 1, log_level: Optional[int] = None) -> tuple[in total_sent, total_failed, total_requeued = 0, 0, 0 total_email = len(queued_emails) - logger.info('Started sending %s emails with %s processes.' % (total_email, processes)) + logger.info(f'Started sending {total_email} emails with {processes} processes.') if log_level is None: log_level = get_log_level() @@ -363,7 +364,7 @@ def _send_bulk( failed_emails = [] # This is a list of two tuples (email, exception) email_count = len(emails) - logger.info('Process started, sending %s emails' % email_count) + logger.info(f'Process started, sending {email_count} emails') emails_to_send = [] @@ -376,7 +377,7 @@ def _send_bulk( email.prepare_email_message() emails_to_send.append(email) except Exception as e: - logger.exception('Failed to prepare email #%d' % email.id) + logger.exception(f'Failed to prepare email #{email.id}') failed_emails.append((email, e)) number_of_threads = min(get_threads_per_process(), email_count) diff --git a/post_office/models.py b/post_office/models.py index 5db663e8..db606367 100644 --- a/post_office/models.py +++ b/post_office/models.py @@ -104,7 +104,7 @@ def __init__(self, *args, **kwargs): self._cached_email_message = None def __repr__(self): - return '<%s: %s>' % (self.__class__.__name__, self.to) + return f'<{self.__class__.__name__}: {self.to}>' def __str__(self): return f'{", ".join(self.to)}' @@ -139,6 +139,7 @@ def prepare_email_message(self): multipart_template = None html_message = self.html_message + connection = connections[self.backend_alias or 'default'] if isinstance(self.headers, dict) or self.expires_at or self.message_id: headers = dict(self.headers or {}) if self.expires_at: @@ -158,6 +159,7 @@ def prepare_email_message(self): bcc=self.bcc, cc=self.cc, headers=headers, + connection=connection, ) msg.attach_alternative(html_message, 'text/html') else: @@ -169,6 +171,7 @@ def prepare_email_message(self): bcc=self.bcc, cc=self.cc, headers=headers, + connection=connection, ) msg.content_subtype = 'html' if hasattr(multipart_template, 'attach_related'): @@ -183,6 +186,7 @@ def prepare_email_message(self): bcc=self.bcc, cc=self.cc, headers=headers, + connection=connection, ) for attachment in self.attachments.all(): @@ -325,7 +329,7 @@ class Meta: ordering = ['name'] def __str__(self): - return '%s %s' % (self.name, self.language) + return f'{self.name} {self.language}' def natural_key(self): return (self.name, self.language, self.default_template) diff --git a/tests/test_commands.py b/tests/test_commands.py index ad488e66..d9b9f5ca 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -1,15 +1,12 @@ import datetime import os -import threading -from unittest.mock import patch from django.core.files.base import ContentFile from django.core.management import call_command -from django.test import TestCase, TransactionTestCase +from django.test import TestCase from django.test.utils import override_settings from django.utils.timezone import now -from post_office.connections import ConnectionHandler from post_office.models import STATUS, Attachment, Email @@ -135,50 +132,3 @@ def test_failed_deliveries_logging(self): ) call_command('send_queued_mail', log_level=2) self.assertEqual(email.logs.count(), 1) - - @override_settings( - POST_OFFICE={ - 'BACKENDS': { - 'default': 'django.core.mail.backends.dummy.EmailBackend', - }, - 'BATCH_SIZE': 10, - 'THREADS_PER_PROCESS': 2, - } - ) - def test_send_queued_mail_threads_use_independent_connections(self): - """ - Worker threads must each obtain their own thread-local connection from - ConnectionHandler so that no single connection is shared across threads. - """ - for _ in range(3): - Email.objects.create(from_email='from@example.com', to=['to@example.com'], status=STATUS.queued) - - # Map thread_id -> set of connection object ids fetched from ConnectionHandler - conn_usage: dict[int, set[int]] = {} - usage_lock = threading.Lock() - original_getitem = ConnectionHandler.__getitem__ - - def tracking_getitem(self, alias): - conn = original_getitem(self, alias) - with usage_lock: - conn_usage.setdefault(threading.current_thread().ident, set()).add(id(conn)) - return conn - - with patch.object(ConnectionHandler, '__getitem__', tracking_getitem): - call_command('send_queued_mail', processes=1) - - self.assertEqual(Email.objects.filter(status=STATUS.sent).count(), 3) - - self.assertGreater(len(conn_usage), 0, 'No connections were fetched from ConnectionHandler') - - # Build a map from connection id -> set of threads that used it - conn_to_threads: dict[int, set[int]] = {} - for tid, conn_ids in conn_usage.items(): - for cid in conn_ids: - conn_to_threads.setdefault(cid, set()).add(tid) - - shared = {cid: tids for cid, tids in conn_to_threads.items() if len(tids) > 1} - self.assertFalse( - shared, - f'Backend connections were shared across threads - thread safety violated: {shared}', - ) diff --git a/tests/test_mail.py b/tests/test_mail.py index 26f0cbc5..23515f07 100644 --- a/tests/test_mail.py +++ b/tests/test_mail.py @@ -131,31 +131,37 @@ def test_send_bulk(self): self.assertEqual(len(mail.outbox), 1) self.assertEqual(mail.outbox[0].subject, 'send bulk') - @override_settings(EMAIL_BACKEND='tests.test_mail.ConnectionTestingBackend') + @override_settings( + EMAIL_BACKEND='tests.test_mail.ConnectionTestingBackend', + POST_OFFICE={ + 'BACKENDS': { + 'connection_tester': 'tests.test_mail.ConnectionTestingBackend', + }, + 'THREADS_PER_PROCESS': 1, + }, + ) def test_send_bulk_reuses_open_connection(self): """ - Ensure _send_bulk() only opens connection once to send multiple emails. + Ensure _send_bulk() opens one connection per thread, not one per email. + With THREADS_PER_PROCESS=1: main thread opens one during prepare, worker + thread opens one during send — total 2 opens for any number of emails. """ global connection_counter self.assertEqual(connection_counter, 0) - email = Email.objects.create( - to=['to@example.com'], - from_email='bob@example.com', - subject='', - message='', - status=STATUS.queued, - backend_alias='connection_tester', - ) - email_2 = Email.objects.create( - to=['to@example.com'], - from_email='bob@example.com', - subject='', - message='', - status=STATUS.queued, - backend_alias='connection_tester', + email_1, email_2, email_3 = Email.objects.bulk_create( + [ + Email( + to=['to@example.com'], + from_email='bob@example.com', + subject='', + message='', + status=STATUS.queued, + backend_alias='connection_tester', + ) for _ in range(3) + ] ) - _send_bulk([email, email_2]) - self.assertEqual(connection_counter, 1) + _send_bulk([email_1, email_2, email_3]) + self.assertEqual(connection_counter, 2) def test_get_queued(self): """ @@ -381,9 +387,9 @@ def test_create_with_template_and_empty_context(self): email = create(sender='from@example.com', recipients=['to@example.com'], template=template, context=context) today = timezone.datetime.today() current_year = today.year - self.assertEqual(email.subject, 'Subject %d' % current_year) - self.assertEqual(email.message, 'Content %d' % current_year) - self.assertEqual(email.html_message, 'HTML %d' % current_year) + self.assertEqual(email.subject, f'Subject {current_year}') + self.assertEqual(email.message, f'Content {current_year}') + self.assertEqual(email.html_message, f'HTML {current_year}') self.assertEqual(email.context, None) self.assertIsNotNone(email.template) diff --git a/tests/test_models.py b/tests/test_models.py index b1d62974..ca74e0cf 100644 --- a/tests/test_models.py +++ b/tests/test_models.py @@ -1,7 +1,7 @@ import json import os - from datetime import datetime, timedelta +from unittest.mock import MagicMock from django.conf import settings as django_settings, settings from django.core import mail @@ -111,6 +111,23 @@ def test_dispatch_with_override_recipients(self): self.assertEqual(mail.outbox[0].to, ['override@gmail.com']) settings.POST_OFFICE = previous_settings + def test_dispatch_uses_provided_connection(self): + """ + Ensure dispatch() uses the explicitly passed connection to send the + message instead of fetching one from the registry. + """ + email = Email.objects.create( + to=['to@example.com'], + from_email='from@example.com', + subject='Test provided connection', + message='Message', + backend_alias='locmem', + ) + mock_conn = MagicMock() + mock_conn.send_messages.return_value = 1 + email.dispatch(connection=mock_conn, disconnect_after_delivery=False) + mock_conn.send_messages.assert_called_once() + def test_status_and_log(self): """ Ensure that status and log are set properly on successful sending From ab7812d05cd0b3d9cdf3cbd8096c003bb9b4dc37 Mon Sep 17 00:00:00 2001 From: Ibar Date: Thu, 16 Apr 2026 13:33:01 +0700 Subject: [PATCH 4/6] update --- post_office/models.py | 10 +++++++--- tests/test_models.py | 22 +++++++++++++++------- 2 files changed, 22 insertions(+), 10 deletions(-) diff --git a/post_office/models.py b/post_office/models.py index db606367..302e1962 100644 --- a/post_office/models.py +++ b/post_office/models.py @@ -209,12 +209,16 @@ def prepare_email_message(self): def dispatch(self, log_level=None, disconnect_after_delivery=True, commit=True, connection=None): """ Sends email and log the result. + + If ``connection`` is provided, it overrides the connection embedded in + the email message by ``prepare_email_message()``. This allows callers + (e.g. worker threads) to supply a thread-local connection rather than + reusing one that was opened in a different thread. """ try: msg = self.email_message() - if connection is None: - connection = connections[self.backend_alias or 'default'] - msg.connection = connection + if connection: + msg.connection = connection msg.send() status = STATUS.sent message = '' diff --git a/tests/test_models.py b/tests/test_models.py index ca74e0cf..65f626fb 100644 --- a/tests/test_models.py +++ b/tests/test_models.py @@ -1,7 +1,7 @@ import json import os from datetime import datetime, timedelta -from unittest.mock import MagicMock +from unittest.mock import MagicMock, patch from django.conf import settings as django_settings, settings from django.core import mail @@ -11,6 +11,7 @@ from django.forms.models import modelform_factory from django.test import TestCase from django.utils import timezone +from django.core.mail.backends.locmem import EmailBackend as LocMemEmailBackend from post_office.models import Email, Log, PRIORITY, STATUS, EmailTemplate, Attachment from post_office.mail import send @@ -113,8 +114,8 @@ def test_dispatch_with_override_recipients(self): def test_dispatch_uses_provided_connection(self): """ - Ensure dispatch() uses the explicitly passed connection to send the - message instead of fetching one from the registry. + Ensure dispatch() overrides msg.connection with the explicitly passed + connection, even when prepare_email_message() already embedded one. """ email = Email.objects.create( to=['to@example.com'], @@ -123,10 +124,17 @@ def test_dispatch_uses_provided_connection(self): message='Message', backend_alias='locmem', ) - mock_conn = MagicMock() - mock_conn.send_messages.return_value = 1 - email.dispatch(connection=mock_conn, disconnect_after_delivery=False) - mock_conn.send_messages.assert_called_once() + + mocked_connection = MagicMock() + mocked_connection.send_messages.return_value = 1 + + # sanity check, original connection embedded in email_message() should be a LocMemEmailBackend instance + self.assertTrue(isinstance(email.email_message().connection, LocMemEmailBackend)) + + email.dispatch(connection=mocked_connection) + # message object's connection should be overridden by the provided one + self.assertEqual(email.email_message().connection, mocked_connection) + mocked_connection.send_messages.assert_called_once() def test_status_and_log(self): """ From ac5cd2898e2370b2d6911ce21f64f832398851a2 Mon Sep 17 00:00:00 2001 From: Ibar Date: Thu, 16 Apr 2026 13:59:32 +0700 Subject: [PATCH 5/6] Update models.py --- post_office/models.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/post_office/models.py b/post_office/models.py index 302e1962..32c8a203 100644 --- a/post_office/models.py +++ b/post_office/models.py @@ -217,7 +217,7 @@ def dispatch(self, log_level=None, disconnect_after_delivery=True, commit=True, """ try: msg = self.email_message() - if connection: + if connection is not None: msg.connection = connection msg.send() status = STATUS.sent From bf28963a26043139cae0fd5044814520a7f0a414 Mon Sep 17 00:00:00 2001 From: Ibar Date: Thu, 16 Apr 2026 14:04:13 +0700 Subject: [PATCH 6/6] fix ruff --- tests/test_mail.py | 3 ++- tests/test_models.py | 4 ++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/tests/test_mail.py b/tests/test_mail.py index 23515f07..9ad7e226 100644 --- a/tests/test_mail.py +++ b/tests/test_mail.py @@ -157,7 +157,8 @@ def test_send_bulk_reuses_open_connection(self): message='', status=STATUS.queued, backend_alias='connection_tester', - ) for _ in range(3) + ) + for _ in range(3) ] ) _send_bulk([email_1, email_2, email_3]) diff --git a/tests/test_models.py b/tests/test_models.py index 65f626fb..d9372064 100644 --- a/tests/test_models.py +++ b/tests/test_models.py @@ -1,7 +1,7 @@ import json import os from datetime import datetime, timedelta -from unittest.mock import MagicMock, patch +from unittest.mock import MagicMock from django.conf import settings as django_settings, settings from django.core import mail @@ -133,7 +133,7 @@ def test_dispatch_uses_provided_connection(self): email.dispatch(connection=mocked_connection) # message object's connection should be overridden by the provided one - self.assertEqual(email.email_message().connection, mocked_connection) + self.assertEqual(email.email_message().connection, mocked_connection) mocked_connection.send_messages.assert_called_once() def test_status_and_log(self):