Uses a SQL Server varbinary to store attachments for messages.
Two settings are required as part of the default usage:
- A connection factory that returns an open instance of a SqlConnection. Note that any Exception that occurs during opening the connection should be handled by the factory.
- A default time to keep for attachments.
configuration.EnableAttachments(
connectionFactory: async cancel =>
{
var connection = new SqlConnection(connectionString);
try
{
await connection.OpenAsync(cancel).ConfigureAwait(false);
return connection;
}
catch
{
await connection.DisposeAsync();
throw;
}
},
timeToKeep: _ => TimeSpan.FromDays(7));Extract out the connection factory to a helper method
async Task<SqlConnection> OpenConnection(Cancel cancel)
{
var connection = new SqlConnection(connectionString);
try
{
await connection.OpenAsync(cancel).ConfigureAwait(false);
return connection;
}
catch
{
await connection.DisposeAsync();
throw;
}
}Also uses the NServiceBus.Attachments.Sql.TimeToKeep.Default method for attachment cleanup.
This usage results in the following:
configuration.EnableAttachments(
connectionFactory: OpenConnection,
timeToKeep: TimeToKeep.Default);Attachments can leverage the ambient SQL connectivity from either the transport and/or the persister.
If both UseSynchronizedStorageSessionConnectivity and UseTransportConnectivity are defined, the SynchronizedStorageSession will be used first, followed by the TransportTransaction.
Ambient connectivity applies to attachment writes only — attachment saves run on the ambient connection/transaction so the save is atomic with the receive (under SendsAtomicWithReceive) or the persister's storage session. Attachment reads always run on a fresh connection from the connectionFactory and are not enlisted in the receive transaction. This lets a handler hold an OpenOutgoingAttachment sink open while reading incoming attachments without colliding with the write on a non-MARS connection. Each read call (GetStream, CopyTo, GetBytes, ProcessStream, etc.) opens its own short-lived connection; SQL connection pooling makes this cheap.
To use the ambient SynchronizedStorageSession persister:
var attachments = configuration.EnableAttachments(
OpenConnection,
TimeToKeep.Default);
attachments.UseSynchronizedStorageSessionConnectivity();This approach attempts to use the SynchronizedStorageSession using the following steps:
- For the current context attempt to retrieve an instance of
SynchronizedStorageSession. If noSynchronizedStorageSessionexists, don't continue and fall back to the SqlConnection retrieved by theconnectionFactory. - Attempt to retrieve a property named 'Transaction' that is a SqlTransaction from the
SynchronizedStorageSession. If it exists, use it for outgoing attachment operations in the current pipeline. - Attempt to retrieve a property named 'Connection' that is a SqlConnection from the
SynchronizedStorageSession. If it exists, use it for outgoing attachment operations in the current pipeline.
The properties are retrieved using reflection since there is no API in NServiceBus to access SynchronizedStorageSession data via type.
To use the ambient transport transaction:
var attachments = configuration.EnableAttachments(
OpenConnection,
TimeToKeep.Default);
attachments.UseTransportConnectivity();This approach attempts to use the transport transaction using the following steps:
- For the current context, attempt to retrieve an instance of
TransportTransaction. If noTransportTransactionexists, don't continue and fall back to using the SqlConnection retrieved by theconnectionFactory. - Attempt to retrieve an instance of Transaction from the
TransportTransaction. If it exists, use it in SqlConnection.EnlistTransaction with an instance of SqlConnection retrieved by theconnectionFactory. Then use that SqlConnection for outgoing attachment operations in the current pipeline. - Attempt to retrieve an instance of SqlTransaction from the
TransportTransaction. If it exists, use it for outgoing attachment operations in the current pipeline. - Attempt to retrieve an instance of SqlConnection from the
TransportTransaction. If it exists, use it for outgoing attachment operations in the current pipeline. - Any attachments associated with a message send will be deleted after message processing.
To streamline development the attachment installer is, by default, executed at endpoint startup, in the same manner as all other installers.
configuration.EnableInstallers();
var attachments = configuration.EnableAttachments(
connectionFactory: OpenConnection,
timeToKeep: TimeToKeep.Default);NOTE: Note that this is also a valid approach for higher level environments.
However in higher level environment scenarios, where standard installers are being run, but the SQL attachment installation has been executed as part of a deployment, it may be necessary to explicitly disable the attachment installer executing while leaving standard installers enabled.
configuration.EnableInstallers();
var attachments = configuration.EnableAttachments(
connectionFactory: OpenConnection,
timeToKeep: TimeToKeep.Default);
attachments.DisableInstaller();The default table name and schema is dbo.MessageAttachments. It can be changed with the following:
var attachments = configuration.EnableAttachments(
connectionFactory: OpenConnection,
timeToKeep: TimeToKeep.Default,
table: "CustomAttachmentsTableName");Attachment cleanup is enabled by default. It can be disabled using the following:
var attachments = configuration.EnableAttachments(
fileShare: "networkSharePath",
timeToKeep: TimeToKeep.Default);
attachments.DisableCleanupTask();var attachments = configuration.EnableAttachments(
connectionFactory: OpenConnection,
timeToKeep: TimeToKeep.Default);
attachments.DisableCleanupTask();When the cleanup task runs it uses the Expiry column to determine if a given attachment should be deleted. This column is populated when an attachment is written. When adding an attachment to an outgoing message, all methods accept an optional parameter timeToKeep of the type GetTimeToKeep. GetTimeToKeep is defined as:
public delegate TimeSpan GetTimeToKeep(TimeSpan? messageTimeToBeReceived);
Where messageTimeToBeReceived is value of TimeToBeReceived. If no timeToKeep parameter for a specific attachment is defined then the endpoint level timeToKeep is used.
The result of timeToKeep is then added to the current date and persisted to the Expiry column.
The method TimeToKeep.Default provides a recommended default for for attachment lifetime calculation:
- If TimeToBeReceived is defined then keep attachment for twice that time.
- Else; keep for 10 days.
| API | Use when | Memory behavior |
|---|---|---|
AddStream |
Large payloads or data generated incrementally (recommended for large data) | Streams via System.IO.Pipelines with backpressure. Memory stays bounded regardless of payload size. |
Add(Stream) |
An existing Stream instance is available |
Bridges to AddStream internally via CopyToAsync. |
AddFromIncoming |
The outgoing data is produced by transforming an incoming attachment of the current message | Reads from incoming and writes to the outgoing pipe at the same time. No intermediate buffer (unless bufferSource/bufferSink is set). |
OpenOutgoingAttachment |
The handler needs the converter's output values (e.g. a "truncated" flag) before composing the outgoing message | Streams straight to storage during handler execution. No intermediate buffer. |
AddBytes / AddString |
Small payloads already in memory (config, metadata, small documents) | Full payload allocated in memory. |
Add(AttachmentFactory) |
Number of attachments not known at compile time | Dynamic. Each attachment uses the memory model of its content. |
AddFile |
File on disk | Convenience wrapper over AddStream. |
AddStream (using System.IO.Pipelines):
┌──────────┐ ┌───────────┐ ┌──────────────┐ ┌─────────┐
│ Writer │─write─>│ Pipe │─read──>│ Attachments │─read──>│ Storage │
│ Code │ │ (buffer) │ │ Library │ │ (SQL/FS)│
└──────────┘ └───────────┘ └──────────────┘ └─────────┘
Writer and reader run concurrently. Pipe applies backpressure
so the writer pauses if the reader falls behind.
While the below examples illustrate adding an attachment to SendOptions, equivalent operations can be performed on PublishOptions and ReplyOptions
Use AddStream to provide a delegate that writes to a stream. Internally the library uses System.IO.Pipelines.Pipe to bridge the writer with storage, enabling concurrent streaming with backpressure. No intermediate MemoryStream, byte[], or temp file is needed.
class HandlerFactory :
IHandleMessages<MyMessage>
{
public Task Handle(MyMessage message, HandlerContext context)
{
var sendOptions = new SendOptions();
var attachments = sendOptions.Attachments();
attachments.AddStream(
name: "attachment1",
writer: async stream =>
{
await using var source = File.OpenRead("FilePath.txt");
await source.CopyToAsync(stream);
});
return context.Send(new OtherMessage(), sendOptions);
}
}For synchronous writers, an Action<Stream> overload is available. The async approach above is preferred as it avoids blocking the thread during I/O.
class HandlerSyncFactory :
IHandleMessages<MyMessage>
{
public Task Handle(MyMessage message, HandlerContext context)
{
var sendOptions = new SendOptions();
var attachments = sendOptions.Attachments();
attachments.AddStream(
name: "attachment1",
writer: stream =>
{
using var source = File.OpenRead("FilePath.txt");
source.CopyTo(stream);
});
return context.Send(new OtherMessage(), sendOptions);
}
}class HandlerStreamWriter :
IHandleMessages<MyMessage>
{
public Task Handle(MyMessage message, HandlerContext context)
{
var document = new Document();
var sendOptions = new SendOptions();
var attachments = sendOptions.Attachments();
attachments.AddStream(
name: "attachment1",
writer: document.SaveAsync);
return context.Send(new OtherMessage(), sendOptions);
}
}Use Add when a Stream instance is already available. Internally bridges to AddStream via CopyToAsync.
class HandlerInstance :
IHandleMessages<MyMessage>
{
public Task Handle(MyMessage message, HandlerContext context)
{
var sendOptions = new SendOptions();
var attachments = sendOptions.Attachments();
var stream = File.OpenRead("FilePath.txt");
attachments.Add(
name: "attachment1",
stream: stream,
cleanup: () => File.Delete("FilePath.txt"));
return context.Send(new OtherMessage(), sendOptions);
}
}Use AddFromIncoming to produce an outgoing attachment by transforming an incoming attachment of the current message. The library opens the incoming read inside the outgoing pipeline so the source and sink streams are live at the same time. No intermediate MemoryStream or temp file is needed.
This is useful when the handler reads an incoming attachment, runs it through a converter, and forwards the converted bytes — without the handler needing to manage the lifetime of the incoming SQL/file stream across the deferred AddStream writer.
class HandlerFromIncoming :
IHandleMessages<MyMessage>
{
public Task Handle(MyMessage message, HandlerContext context)
{
var replyOptions = new ReplyOptions();
var attachments = replyOptions.Attachments();
attachments.AddFromIncoming(
fromName: "input",
toName: "output",
transform: async (source, sink, cancel) =>
{
using var reader = new StreamReader(source, leaveOpen: true);
var content = await reader.ReadToEndAsync(cancel);
await using var writer = new StreamWriter(sink, leaveOpen: true);
await writer.WriteAsync(content.ToUpperInvariant());
});
return context.Reply(new OtherMessage(), replyOptions);
}
}bufferSource: true buffers the incoming data to a seekable MemoryStream before the transform runs — use when the transform requires Length/Position/Seek on its input (e.g. email/MIME parsers).
bufferSink: true runs the transform against a seekable MemoryStream and drains it to storage afterwards — use when the transform requires seek operations on its output (e.g. some Aspose libraries).
NOTE: The transform runs during the outgoing pipeline (after context.Reply / context.Send is called). Any value the transform produces (e.g. a "truncated" flag, encoding metadata) cannot influence the outgoing message body, since the body has already been finalized by the caller. Use OpenOutgoingAttachment (below) when the handler needs such values in the outgoing message.
Use OpenOutgoingAttachment when the converter's output values (a "truncated" flag, encoding metadata, page count) need to be in the outgoing message body. The handler writes directly to storage during its execution and is free to read any state from the conversion before composing the reply. The library allocates the outgoing message id, sets it on the options, and registers the saved attachment so the outgoing pipeline emits the Attachments header but skips the deferred save.
class HandlerImmediateWrite :
IHandleMessages<MyMessage>
{
public async Task Handle(MyMessage message, HandlerContext context)
{
var replyOptions = new ReplyOptions();
bool truncated;
await using (var sink = await context.OpenOutgoingAttachment(
replyOptions,
"TheAttachmentName"))
{
// Convert directly to the sink. The result (e.g. truncated)
// is available before the reply body is composed.
truncated = MyConverter.Convert(message.Source, sink);
}
await context.Reply(
new OtherMessage
{
Truncated = truncated
},
replyOptions);
}
}NOTE: If the handler succeeds in saving but the outgoing dispatch fails (or the handler exits without ever sending the message), the saved attachment becomes an orphan. The cleanup task removes orphans by Expiry, but the asymmetry depends on TransportTransactionMode. Under SendsAtomicWithReceive everything is atomic; under ReceiveOnly / None the asymmetry is the same one the deferred APIs already have.
Approaches to using attachments for the current incoming message.
Processes an attachment with a specific name.
class HandlerProcessStream :
IHandleMessages<MyMessage>
{
public Task Handle(MyMessage message, HandlerContext context)
{
var attachments = context.Attachments();
return attachments.ProcessStream(
name: "attachment1",
action: async(stream, token) =>
{
// Use the attachment stream. in this example copy to a file
await using var fileToCopyTo = File.Create("FilePath.txt");
await stream.CopyToAsync(fileToCopyTo, token);
},
context.CancellationToken);
}
}class HandlerProcessStream :
IHandleMessages<MyMessage>
{
public Task Handle(MyMessage message, HandlerContext context)
{
var attachments = context.Attachments();
return attachments.ProcessStream(
name: "attachment1",
action: async (stream, token) =>
{
// Use the attachment stream. in this example copy to a file
await using var fileToCopyTo = File.Create("FilePath.txt");
await stream.CopyToAsync(fileToCopyTo, token);
},
context.CancellationToken);
}
}Processes all attachments.
class HandlerProcessStreams :
IHandleMessages<MyMessage>
{
public Task Handle(MyMessage message, HandlerContext context)
{
var attachments = context.Attachments();
return attachments.ProcessStreams(
action: async (stream, cancel) =>
{
// Use the attachment stream. in this example copy to a file
await using var file = File.Create($"{stream.Name}.txt");
await stream.CopyToAsync(file, cancel);
},
context.CancellationToken);
}
}class HandlerProcessStreams :
IHandleMessages<MyMessage>
{
public Task Handle(MyMessage message, HandlerContext context)
{
var attachments = context.Attachments();
return attachments.ProcessStreams(
action: async (stream, cancel) =>
{
// Use the attachment stream. in this example copy to a file
await using var file = File.Create($"{stream.Name}.txt");
await stream.CopyToAsync(file, cancel);
},
context.CancellationToken);
}
}Copy an attachment with a specific name to another stream.
class HandlerCopyTo :
IHandleMessages<MyMessage>
{
public async Task Handle(MyMessage message, HandlerContext context)
{
var attachments = context.Attachments();
await using var fileToCopyTo = File.Create("FilePath.txt");
await attachments.CopyTo("attachment1", fileToCopyTo, context.CancellationToken);
}
}class HandlerCopyTo :
IHandleMessages<MyMessage>
{
public async Task Handle(MyMessage message, HandlerContext context)
{
var attachments = context.Attachments();
await using var fileToCopyTo = File.Create("FilePath.txt");
await attachments.CopyTo("attachment1", fileToCopyTo, context.CancellationToken);
}
}Get a stream for an attachment with a specific name.
class HandlerGetStream :
IHandleMessages<MyMessage>
{
public async Task Handle(MyMessage message, HandlerContext context)
{
var attachments = context.Attachments();
await using var stream = await attachments.GetStream("attachment1", context.CancellationToken);
// Use the attachment stream. in this example copy to a file
await using var fileToCopyTo = File.Create("FilePath.txt");
await stream.CopyToAsync(fileToCopyTo, context.CancellationToken);
}
}class HandlerGetStream :
IHandleMessages<MyMessage>
{
public async Task Handle(MyMessage message, HandlerContext context)
{
var attachments = context.Attachments();
await using var attachment = await attachments.GetStream("attachment1", context.CancellationToken);
// Use the attachment stream. in this example copy to a file
await using var fileToCopyTo = File.Create("FilePath.txt");
await attachment.CopyToAsync(fileToCopyTo, context.CancellationToken);
}
}Get a byte array for an attachment with a specific name.
WARNING: This should only be used the data size is know to be small as it causes the full size of the attachment to be allocated in memory.
class HandlerGetBytes :
IHandleMessages<MyMessage>
{
public async Task Handle(MyMessage message, HandlerContext context)
{
var attachments = context.Attachments();
var bytes = await attachments.GetBytes("attachment1", context.CancellationToken);
// use the byte array
}
}class HandlerGetBytes :
IHandleMessages<MyMessage>
{
public async Task Handle(MyMessage message, HandlerContext context)
{
var attachments = context.Attachments();
var bytes = await attachments.GetBytes("attachment1", context.CancellationToken);
// use the byte array
}
}All of the above examples have companion methods that are suffixed with ForMessage. These methods allow a handler or saga to read any attachments as long as the message id for that attachment is known. For example processing all attachments for a specific message could be done as follows
class HandlerProcessStreamsForMessage :
IHandleMessages<MyMessage>
{
public Task Handle(MyMessage message, HandlerContext context)
{
var attachments = context.Attachments();
return attachments.ProcessStreamsForMessage(
messageId: "theMessageId",
action: async (stream, cancel) =>
{
// Use the attachment stream. in this example copy to a file
await using var toCopyTo = File.Create($"{stream.Name}.txt");
await stream.CopyToAsync(toCopyTo, cancel);
},
context.CancellationToken);
}
}class HandlerProcessStreamsForMessage :
IHandleMessages<MyMessage>
{
public Task Handle(MyMessage message, HandlerContext context)
{
var attachments = context.Attachments();
return attachments.ProcessStreamsForMessage(
messageId: "theMessageId",
action: async (stream, cancel) =>
{
// Use the attachment stream. in this example copy to a file
await using var file = File.Create($"{stream.Name}.txt");
await stream.CopyToAsync(file, cancel);
},
context.CancellationToken);
}
}This can be helpful in a saga that is operating in a Scatter-Gather mode. So instead of storing all binaries inside the saga persister, the saga can instead store the message ids and then, at a latter point in time, access those attachments.
The below examples also use the NServiceBus.Testing extension.
public class Handler :
IHandleMessages<MyMessage>
{
public Task Handle(MyMessage message, HandlerContext context)
{
var options = new SendOptions();
var attachments = options.Attachments();
attachments.AddStream(
"theName",
async stream =>
{
await using var source = File.OpenRead("aFilePath");
await source.CopyToAsync(stream);
});
return context.Send(new OtherMessage(), options);
}
}[Test]
public async Task TestOutgoingAttachments()
{
//Arrange
var context = new RecordingHandlerContext();
var handler = new Handler();
//Act
await handler.Handle(new(), context);
// Assert
var sentMessage = context.Sent.Single();
var attachments = sentMessage.Options.Attachments();
var attachment = attachments.Items.Single();
await Assert.That(attachment.Name).Contains("theName");
await Assert.That(attachments.HasPendingAttachments).IsTrue();
}To mock or verify incoming attachments is it necessary to inject a instance of IMessageAttachments into the current IMessageHandlerContext. This can be done using the MockAttachmentHelper.InjectAttachmentsInstance() extension method which exists in the NServiceBus.Attachments.Testing namespace.
var context = new RecordingHandlerContext();
var mockMessageAttachments = new MyMessageAttachments();
context.InjectAttachmentsInstance(mockMessageAttachments);var context = new RecordingHandlerContext();
var mockMessageAttachments = new MyMessageAttachments();
context.InjectAttachmentsInstance(mockMessageAttachments);The implementation of IMessageHandlerContext can be a custom coded mock or constructed using any of the popular mocking/assertion frameworks.
There is a default implementation of IMessageAttachments named MockMessageAttachments. This implementation stubs out all methods. All members are virtual so it can be used as simplified base class for custom mocks.
public class CustomMockMessageAttachments :
MockMessageAttachments
{
public override Task<AttachmentBytes> GetBytes(Cancel cancel = default)
{
GetBytesWasCalled = true;
return Task.FromResult(new AttachmentBytes("name", [5]));
}
public bool GetBytesWasCalled { get; private set; }
}Putting these parts together allows a handler, using incoming attachments, to be tested.
public class Handler :
IHandleMessages<MyMessage>
{
public async Task Handle(MyMessage message, HandlerContext context)
{
var attachment = context.Attachments();
var bytes = await attachment.GetBytes(context.CancellationToken);
}
}[Test]
public async Task TestIncomingAttachment()
{
//Arrange
var context = new RecordingHandlerContext();
var handler = new Handler();
var mockMessageAttachments = new CustomMockMessageAttachments();
context.InjectAttachmentsInstance(mockMessageAttachments);
//Act
await handler.Handle(new(), context);
//Assert
await Assert.That(mockMessageAttachments.GetBytesWasCalled).IsTrue();
}