Persistence & Storage
This guide describes the persistence layer in VibeMQ — how messages, queues, and dead-letter entries survive server restarts.
Overview
By default VibeMQ keeps everything in memory: fast, zero-config, but all state is lost on restart. Starting with version 1.3.0 you can plug in a storage provider so that queues, messages, and DLQ entries are persisted to disk (or to an external database in future versions).
Key design decisions:
Write-ahead pattern — a message is saved to storage before it enters the in-memory queue. On ACK the persisted copy is removed.
Single interface —
IStorageProvidercovers messages, queue metadata, and DLQ in one contract.Separate NuGet packages — each provider lives in its own assembly so you only take the dependency you need.
Backward compatible —
InMemoryis the default; existing code works without changes.
Storage Providers
Provider |
Description |
|---|---|
|
Default. No persistence, all in-memory. |
|
SQLite-based. Zero-config, single-file DB. |
|
Redis-backed. Low latency, optional. |
Future providers (RocksDB, PostgreSQL) are planned — see the project roadmap.
Quick Start
InMemory (default)
No configuration required — this is the default behavior:
var broker = BrokerBuilder.Create()
.UsePort(2925)
.Build();
SQLite
Install the package:
dotnet add package VibeMQ.Server.Storage.Sqlite
Configure via the fluent builder:
using VibeMQ.Server;
using VibeMQ.Server.Storage.Sqlite;
var broker = BrokerBuilder.Create()
.UsePort(2925)
.UseSqliteStorage(options => {
options.DatabasePath = "vibemq.db";
options.EnableWal = true;
options.BusyTimeoutMs = 5000;
})
.Build();
await broker.RunAsync(cancellationToken);
Or via Dependency Injection:
using VibeMQ.Server.DependencyInjection;
using VibeMQ.Server.Storage.Sqlite;
services.AddVibeMQSqliteStorage(options => {
options.DatabasePath = "vibemq.db";
});
services.AddVibeMQBroker(options => {
options.Port = 2925;
});
Note
Register the storage provider before AddVibeMQBroker so that the broker picks it up from the DI container.
Redis
Install the package:
dotnet add package VibeMQ.Server.Storage.Redis
Configure via the fluent builder:
using VibeMQ.Server;
using VibeMQ.Server.Storage.Redis;
var broker = BrokerBuilder.Create()
.UsePort(2925)
.UseRedisStorage("localhost:6379", options => {
options.Database = 0;
options.KeyPrefix = "vibemq";
})
.Build();
await broker.RunAsync(cancellationToken);
Or via Dependency Injection (connection string or configuration section):
using VibeMQ.Server.DependencyInjection;
using VibeMQ.Server.Storage.Redis;
services.AddVibeMQRedisStorage("localhost:6379", options => {
options.KeyPrefix = "vibemq";
});
services.AddVibeMQBroker(options => { options.Port = 2925; });
With configuration (e.g. appsettings.json section VibeMQ:Storage:Redis or ConnectionStrings:Redis):
services.AddVibeMQRedisStorage(configuration.GetSection("VibeMQ:Storage:Redis"));
services.AddVibeMQBroker(options => { options.Port = 2925; });
Note
Redis is a network backend; ensure the instance is reachable and consider timeouts (ConnectTimeoutMs, SyncTimeoutMs) for your environment.
SQLite Configuration
SqliteStorageOptions
Parameter |
Default |
Description |
|---|---|---|
|
|
Path to the SQLite database file |
|
|
Enable WAL journal mode for better concurrent read/write performance |
|
|
Milliseconds to wait when the DB is locked |
Database Schema
The SQLite provider automatically creates the following tables on first startup:
queues — persisted queue metadata:
CREATE TABLE IF NOT EXISTS queues (
name TEXT PRIMARY KEY,
options_json TEXT NOT NULL,
created_at TEXT NOT NULL
);
messages — persisted messages with foreign key to queues:
CREATE TABLE IF NOT EXISTS messages (
id TEXT PRIMARY KEY,
queue_name TEXT NOT NULL REFERENCES queues(name) ON DELETE CASCADE,
payload_json TEXT,
timestamp TEXT NOT NULL,
headers_json TEXT,
version INTEGER NOT NULL DEFAULT 1,
priority INTEGER NOT NULL DEFAULT 1,
delivery_attempts INTEGER NOT NULL DEFAULT 0
);
CREATE INDEX IF NOT EXISTS ix_messages_queue_timestamp
ON messages(queue_name, timestamp);
CREATE INDEX IF NOT EXISTS ix_messages_queue_priority
ON messages(queue_name, priority DESC, timestamp);
dead_letters — persisted dead-lettered messages:
CREATE TABLE IF NOT EXISTS dead_letters (
id INTEGER PRIMARY KEY AUTOINCREMENT,
message_id TEXT NOT NULL,
message_json TEXT NOT NULL,
reason INTEGER NOT NULL,
failed_at TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS ix_dead_letters_failed_at
ON dead_letters(failed_at);
Note
Deleting a queue (RemoveQueueAsync) cascade-deletes all its messages via the foreign key constraint.
Redis Configuration
RedisStorageOptions
Parameter |
Default |
Description |
|---|---|---|
|
|
Redis connection string (host:port, etc.) |
|
|
Redis database number |
|
|
Key prefix for all VibeMQ keys |
|
|
Connection timeout in milliseconds |
|
|
Per-operation sync timeout in milliseconds |
Redis uses LIST for pending message order, HASH for message and queue metadata, and a SET for queue names. Keys follow {KeyPrefix}:q:{queue}:meta, {KeyPrefix}:q:{queue}:pending, {KeyPrefix}:m:{id}, etc.
How Persistence Works
Write-Ahead Pattern
When a message is published:
Publisher
│
▼
SaveMessageAsync(message) ← write to storage first
│
▼
queue.Enqueue(message) ← then add to in-memory queue
│
▼
PublishAck → Publisher
If the server crashes after the storage write but before the in-memory enqueue, the message is recovered on next startup.
Acknowledgment Flow
When a subscriber acknowledges a message:
Subscriber
│
▼
AckTracker.TryAcknowledge(messageId)
│
▼
RemoveMessageAsync(messageId) ← delete from storage
│
▼
Metrics updated
Startup Recovery
On server startup, the broker automatically recovers persisted state:
IStorageProvider.InitializeAsync()— creates schema if neededGetAllQueuesAsync()— loads all queue metadataFor each queue:
GetPendingMessagesAsync(queueName)— replays undelivered messagesQueues are recreated with their original
CreatedAttimestamps
BrokerServer.RunAsync()
│
▼
QueueManager.InitializeAsync()
│
├── StorageProvider.InitializeAsync()
│
├── GetAllQueuesAsync()
│ │
│ ▼
│ Recreate MessageQueue for each persisted queue
│
└── GetPendingMessagesAsync(queueName)
│
▼
Enqueue recovered messages
Dead Letter Queue Persistence
Failed messages are persisted to the dead_letters table before being added to the in-memory DLQ:
Message delivery failed (max retries exceeded)
│
▼
SaveDeadLetteredMessageAsync(dlqMessage) ← persist first
│
▼
In-memory DLQ enqueue
Storage Management
The IStorageManagement interface provides optional maintenance operations. Not all providers support it — check at runtime:
if (storageProvider is IStorageManagement mgmt)
{
// Backup
await mgmt.BackupAsync("/backups/vibemq-backup.db");
// Restore
await mgmt.RestoreAsync("/backups/vibemq-backup.db");
// Compact (VACUUM for SQLite)
await mgmt.CompactAsync();
// Statistics
var stats = await mgmt.GetStatsAsync();
Console.WriteLine($"Messages: {stats.TotalMessages}");
Console.WriteLine($"Queues: {stats.TotalQueues}");
Console.WriteLine($"Dead-lettered: {stats.TotalDeadLettered}");
Console.WriteLine($"Storage size: {stats.StorageSizeBytes} bytes");
}
StorageStats
Property |
Type |
Description |
|---|---|---|
|
|
Total persisted messages |
|
|
Total persisted queues |
|
|
Total dead-lettered entries |
|
|
Database file size in bytes |
IStorageProvider Interface
All storage providers implement this interface:
public interface IStorageProvider : IAsyncDisposable
{
// Lifecycle
Task InitializeAsync(CancellationToken cancellationToken = default);
Task<bool> IsAvailableAsync(CancellationToken cancellationToken = default);
// Messages
Task SaveMessageAsync(BrokerMessage message, CancellationToken ct = default);
Task SaveMessagesAsync(IReadOnlyList<BrokerMessage> messages, CancellationToken ct = default);
Task<BrokerMessage?> GetMessageAsync(string id, CancellationToken ct = default);
Task<bool> RemoveMessageAsync(string id, CancellationToken ct = default);
Task<IReadOnlyList<BrokerMessage>> GetPendingMessagesAsync(string queueName, CancellationToken ct = default);
// Queues
Task SaveQueueAsync(string name, QueueOptions options, CancellationToken ct = default);
Task RemoveQueueAsync(string name, CancellationToken ct = default);
Task<IReadOnlyList<StoredQueue>> GetAllQueuesAsync(CancellationToken ct = default);
// Dead Letter Queue
Task SaveDeadLetteredMessageAsync(DeadLetteredMessage message, CancellationToken ct = default);
Task<IReadOnlyList<DeadLetteredMessage>> GetDeadLetteredMessagesAsync(int count, CancellationToken ct = default);
Task<bool> RemoveDeadLetteredMessageAsync(string messageId, CancellationToken ct = default);
}
Contract rules:
InitializeAsyncmust be called before any other operations.RemoveQueueAsyncmust cascade-delete all messages in the queue.SaveMessagesAsynchas a default implementation that callsSaveMessageAsyncin a loop. Providers should override for batch optimization.IsAvailableAsyncreturnstruefor embedded providers (InMemory, SQLite). Network providers should check connectivity.All methods must be thread-safe.
Implementing a Custom Provider
To create your own storage provider:
Create a new class library project referencing
VibeMQ.Core.Implement
IStorageProvider(and optionallyIStorageManagement).Add an extension method for
BrokerBuilderand/orIServiceCollection:
public static class MyStorageExtensions
{
public static BrokerBuilder UseMyStorage(
this BrokerBuilder builder,
Action<MyStorageOptions>? configure = null)
{
var options = new MyStorageOptions();
configure?.Invoke(options);
return builder.UseStorageProvider(_ => new MyStorageProvider(options));
}
public static IServiceCollection AddVibeMQMyStorage(
this IServiceCollection services,
Action<MyStorageOptions>? configure = null)
{
var options = new MyStorageOptions();
configure?.Invoke(options);
services.AddSingleton<IStorageProvider>(new MyStorageProvider(options));
return services;
}
}
Configuration Examples
Development (InMemory)
var broker = BrokerBuilder.Create()
.UsePort(2925)
.Build();
Production (SQLite)
var broker = BrokerBuilder.Create()
.UsePort(2925)
.UseAuthentication(Environment.GetEnvironmentVariable("VIBEMQ_TOKEN"))
.UseSqliteStorage(options => {
options.DatabasePath = "/data/vibemq.db";
options.EnableWal = true;
})
.Build();
Production with DI (SQLite)
services.AddVibeMQSqliteStorage(options => {
options.DatabasePath = "/data/vibemq.db";
options.EnableWal = true;
});
services.AddVibeMQBroker(options => {
options.Port = 2925;
options.EnableAuthentication = true;
options.AuthToken = Environment.GetEnvironmentVariable("VIBEMQ_TOKEN");
});
appsettings.json (SQLite)
{
"VibeMQ": {
"Port": 2925,
"EnableAuthentication": true,
"AuthToken": "my-secret-token"
}
}
Persistence is configured via UseSqliteStorage / UseRedisStorage on the
builder or via DI (e.g. AddVibeMQSqliteStorage), not via BrokerOptions.
Migration from Previous Versions
If you are upgrading from VibeMQ 1.2.x:
IMessageStoreis now deprecated. It still works but will be removed in a future version. Migrate toIStorageProvider.InMemoryMessageStoreis deprecated in favor ofInMemoryStorageProvider.No breaking changes — the default behavior (in-memory) is preserved. Adding persistence is opt-in.
Next Steps
Server Setup — server configuration
Configuration — all configuration parameters
DI Integration — DI integration
Architecture — system architecture