Features
This guide describes all capabilities and features of VibeMQ.
Publish/Subscribe (Pub/Sub)
VibeMQ implements the publish/subscribe pattern through message queues.
Basic concepts:
Publisher — client sending messages
Subscriber — client receiving messages
Queue — buffer for storing messages
Broker — server managing queues
Basic example:
// Publisher
await client.PublishAsync("notifications", new {
Title = "Hello",
Body = "World"
});
// Subscriber
await using var subscription = await client.SubscribeAsync<dynamic>(
"notifications",
async msg => {
Console.WriteLine($"{msg.Title}: {msg.Body}");
}
);
Delivery Modes
VibeMQ supports four message delivery modes:
Round-robin
Description: Each message is delivered to one subscriber cyclically.
Publisher → [Queue] → Subscriber 1 (message 1)
→ Subscriber 2 (message 2)
→ Subscriber 1 (message 3)
Configuration:
options.DefaultDeliveryMode = DeliveryMode.RoundRobin;
Use cases:
Task processing by multiple workers
Load balancing
Task queues
Fan-out with Acknowledgment
Description: Message is delivered to all subscribers, each must acknowledge receipt.
Publisher → [Queue] → Subscriber 1 (copy + ACK)
→ Subscriber 2 (copy + ACK)
→ Subscriber 3 (copy + ACK)
Configuration:
options.DefaultDeliveryMode = DeliveryMode.FanOutWithAck;
MaxRetryAttempts is not in QueueDefaults; set it in QueueOptions when creating a queue (e.g. client.CreateQueueAsync("name", new QueueOptions { MaxRetryAttempts = 3 })). See Client Usage and Configuration.
Use cases:
Notification broadcasting
Data replication
Audit and logging
Fan-out without Acknowledgment
Description: Message is delivered to all subscribers without confirmation.
Publisher → [Queue] → Subscriber 1 (copy)
→ Subscriber 2 (copy)
→ Subscriber 3 (copy)
Configuration:
options.DefaultDeliveryMode = DeliveryMode.FanOutWithoutAck;
Use cases:
Broadcast messages
Real-time updates
Data streaming
Priority-based
Description: Messages are delivered by priority.
[Critical] → [High] → [Normal] → [Low]
Priorities:
Priority |
Value |
Description |
|---|---|---|
Critical |
3 |
Critical, delivered first |
High |
2 |
High priority |
Normal |
1 |
Normal (default) |
Low |
0 |
Low, delivered last |
Configuration:
options.DefaultDeliveryMode = DeliveryMode.PriorityBased;
Publish message:
await client.PublishAsync("alerts", message);
Publish with priority:
using VibeMQ.Core.Enums;
await client.PublishAsync("alerts", message, new Dictionary<string, string> {
["priority"] = MessagePriority.Critical.ToString()
});
Delivery Guarantees
Acknowledgments (ACK)
VibeMQ uses an acknowledgment mechanism for delivery guarantees:
Broker → Deliver message → Client
│
│ Processing...
│
◀────── ACK ─────────┘
How it works:
Broker sends message to client
ACK wait timer starts
Client processes message and sends ACK
Broker receives ACK and marks message as delivered
Automatic ACKs:
By default, the client automatically sends ACK after successful processing:
await using var subscription = await client.SubscribeAsync<dynamic>(
"notifications",
async msg => {
await ProcessMessageAsync(msg);
// ACK is sent automatically
}
);
Retry Attempts
If ACK is not received within the timeout, the message is resent:
Attempt 1 → Timeout → Attempt 2 → Timeout → Attempt 3 → DLQ
Configuration:
options.MaxRetryAttempts = 3;
Exponential backoff:
Exponential delay is used between attempts:
Attempt 1: immediately
Attempt 2: after 1s
Attempt 3: after 2s
Attempt 4: after 4s
…
Dead Letter Queue (DLQ)
Messages that fail to deliver after all attempts are moved to Dead Letter Queue:
options.EnableDeadLetterQueue = true;
options.DeadLetterQueueName = "dead-letters";
options.MaxRetryAttempts = 3;
Reasons for DLQ:
Maximum delivery attempts exceeded
Message TTL expired
Deserialization error
Exception in handler
DLQ processing:
var dlqMessages = await queueManager.GetDeadLetterMessagesAsync(100);
foreach (var message in dlqMessages) {
// Retry or log
await RetryOrLogAsync(message);
}
Queue Management
Creating a Queue
Automatic creation:
When publishing to a non-existent queue, it’s created automatically:
options.EnableAutoCreate = true;
Manual creation:
await queueManager.CreateQueueAsync("my-queue", new QueueOptions {
Mode = DeliveryMode.RoundRobin,
MaxQueueSize = 10_000,
MessageTtl = TimeSpan.FromHours(1),
});
Deleting a Queue
await queueManager.DeleteQueueAsync("my-queue");
Getting Information
GetQueueInfoAsync returns a complete snapshot of a queue’s current state, including all
configuration settings:
var info = await client.GetQueueInfoAsync("orders");
if (info is null) {
Console.WriteLine("Queue does not exist.");
return;
}
// Runtime state
Console.WriteLine($"Name: {info.Name}");
Console.WriteLine($"Messages: {info.MessageCount}");
Console.WriteLine($"Subscribers: {info.SubscriberCount}");
Console.WriteLine($"Created: {info.CreatedAt:u}");
// Configuration
Console.WriteLine($"Mode: {info.DeliveryMode}");
Console.WriteLine($"Max size: {info.MaxSize}");
Console.WriteLine($"Message TTL: {info.MessageTtl?.ToString() ?? "none"}");
Console.WriteLine($"DLQ enabled: {info.EnableDeadLetterQueue}");
Console.WriteLine($"DLQ name: {info.DeadLetterQueueName ?? "auto"}");
Console.WriteLine($"Overflow: {info.OverflowStrategy}");
Console.WriteLine($"Max retries: {info.MaxRetryAttempts}");
QueueInfo fields:
- ame``
stringQueue name.
MessageCountintMessages currently waiting in the queue.
SubscriberCountintNumber of active subscribers.
CreatedAtDateTimeUTC timestamp of queue creation.
DeliveryModeDeliveryModeDelivery mode configured for the queue.
MaxSizeintMaximum message capacity.
MessageTtlTimeSpan?Per-message time-to-live.
null= no TTL.
EnableDeadLetterQueueboolWhether a DLQ is enabled.
DeadLetterQueueNamestring?DLQ name.
null= auto-generated.
OverflowStrategyOverflowStrategyAction taken when the queue is full.
MaxRetryAttemptsintDelivery attempts before DLQ routing.
List Queues
var queues = await client.ListQueuesAsync();
foreach (var queueName in queues) {
Console.WriteLine(queueName);
}
Queue Declarations
Instead of calling CreateQueueAsync manually, you can declare all queues your application
needs as part of ClientOptions. On every ConnectAsync the client:
Creates any queue that does not yet exist.
Compares the declared settings against the live queue and classifies each difference as
Info,Soft, orHard.Applies the
OnConflictstrategy (Ignore,Fail, orOverride) for anySoftorHarddifferences found.
await using var client = await VibeMQClient.ConnectAsync(
"localhost",
2925,
new ClientOptions()
// Production queue — any drift is a deploy error
.DeclareQueue("orders", q => {
q.Mode = DeliveryMode.FanOutWithAck;
q.MaxQueueSize = 50_000;
q.EnableDeadLetterQueue = true;
q.MessageTtl = TimeSpan.FromHours(24);
}, onConflict: QueueConflictResolution.Fail)
// Analytics — drift is acceptable
.DeclareQueue("analytics-events", q => {
q.MaxQueueSize = 200_000;
q.OverflowStrategy = OverflowStrategy.DropOldest;
})
);
See Client Usage for a full description of conflict resolution, the QueueConflictException
type, and DI integration.
Message Priorities
VibeMQ supports message priorities for important deliveries.
Priority levels:
public enum MessagePriority {
Low = 0, // Low
Normal = 1, // Normal (default)
High = 2, // High
Critical = 3 // Critical
}
Publish messages:
using VibeMQ.Core.Enums;
// Critical message with priority
await client.PublishAsync("alerts", alertData, new Dictionary<string, string> {
["priority"] = MessagePriority.Critical.ToString()
});
// High priority notification
await client.PublishAsync("notifications", data, new Dictionary<string, string> {
["priority"] = MessagePriority.High.ToString()
});
// Normal priority (default) - no headers needed
await client.PublishAsync("logs", logData);
Keep-alive (PING/PONG)
Active connections are maintained using the keep-alive mechanism:
Client Server
│ │
│─── PING (30s) ─────────▶│
│ │
│◄── PONG (immediate) ────│
Client configuration:
var client = await VibeMQClient.ConnectAsync(
"localhost",
2925,
new ClientOptions {
KeepAliveInterval = TimeSpan.FromSeconds(30)
}
);
Automatic Reconnections
The client automatically reconnects when the connection is lost.
Reconnect policy configuration:
var client = await VibeMQClient.ConnectAsync(
"localhost",
2925,
new ClientOptions {
ReconnectPolicy = new ReconnectPolicy {
MaxAttempts = 10,
InitialDelay = TimeSpan.FromSeconds(1),
MaxDelay = TimeSpan.FromMinutes(5),
UseExponentialBackoff = true
}
}
);
Parameters:
Parameter |
Default |
Description |
|---|---|---|
|
int.MaxValue |
Maximum number of attempts |
|
1s |
Initial delay |
|
5min |
Maximum delay |
|
true |
Exponential increase |
Authentication
username/password authentication:
On server:
var broker = BrokerBuilder.Create()
.UseAuthorization(options => {
options.SuperuserUsername = "admin";
options.SuperuserPassword = "my-secret-password";
})
.Build();
On client:
var client = await VibeMQClient.ConnectAsync(
"localhost",
2925,
new ClientOptions { Username = "admin", Password = "my-secret-password"
}
);
Warning
Use complex passwords (12+ characters) and store them in a secure location.
TLS/SSL Encryption
Transport encryption support:
On server:
.UseTls(options => {
options.Enabled = true;
options.CertificatePath = "/path/to/cert.pfx";
options.CertificatePassword = "cert-password";
})
On client:
var client = await VibeMQClient.ConnectAsync(
"localhost",
2925,
new ClientOptions {
UseTls = true,
SkipCertificateValidation = false // Only for tests!
}
);
Rate Limiting
Overload protection:
Configuration:
.ConfigureRateLimiting(options => {
options.Enabled = true;
options.MaxConnectionsPerIpPerWindow = 20;
options.ConnectionWindow = TimeSpan.FromSeconds(60);
options.MaxMessagesPerClientPerSecond = 1000;
});
Parameters:
Parameter |
Default |
Description |
|---|---|---|
|
true |
Enable rate limiting |
|
20 |
Max connections per IP per window |
|
60s |
Time window (seconds) |
|
1000 |
Max messages per second per client |
Graceful Shutdown
Correct server shutdown without message loss:
var cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) => {
e.Cancel = true;
cts.Cancel();
};
await broker.RunAsync(cts.Token);
// StopAsync() is called automatically
Shutdown stages:
Stop accepting new connections
Notify clients about shutdown
Wait for in-flight message processing (up to 30s)
Close all connections
Clean up resources
Health Checks
HTTP endpoints for monitoring:
Enable:
.ConfigureHealthChecks(options => {
options.Enabled = true;
options.Port = 2926;
})
Endpoints:
GET /health/— health status (200 OK or 503)GET /metrics/— broker metrics (JSON)
Example /health/ response:
{
"status": "healthy",
"active_connections": 15,
"queue_count": 5,
"memory_usage_mb": 256
}
Example /metrics/ response:
{
"total_messages_published": 125000,
"total_messages_delivered": 124850,
"total_messages_acknowledged": 124800,
"active_connections": 15,
"active_queues": 5,
"memory_usage_bytes": 268435456,
"average_delivery_latency_ms": 2.5,
"timestamp": "2026-02-18T10:30:00Z",
"uptime": "02:15:30.5000000"
}
Metrics
Counters:
TotalMessagesPublished— total publishedTotalMessagesDelivered— total deliveredTotalMessagesAcknowledged— total acknowledgedTotalRetries— retry attemptsTotalDeadLettered— in DLQTotalErrors— errorsTotalConnectionsAccepted— connections acceptedTotalConnectionsRejected— connections rejected
Gauge metrics:
ActiveConnections— active connectionsActiveQueues— active queuesInFlightMessages— in flightMemoryUsageBytes— memory usage
Latency:
AverageDeliveryLatencyMs— average delivery latency
Persistence & Storage
VibeMQ supports pluggable storage providers for message durability across server restarts.
Default behavior (InMemory):
All state is kept in memory — zero configuration, maximum performance. All data is lost on restart.
With SQLite storage:
dotnet add package VibeMQ.Server.Storage.Sqlite
var broker = BrokerBuilder.Create()
.UsePort(2925)
.UseSqliteStorage(options => {
options.DatabasePath = "/data/vibemq.db";
})
.Build();
How it works:
When a message is published it is saved to storage before entering the in-memory queue (write-ahead).
When the subscriber acknowledges the message it is removed from storage.
On server restart, all queues and unacknowledged messages are recovered automatically.
DLQ persistence:
Failed messages are saved to the dead_letters table before being added to the in-memory DLQ so they survive restarts.
Storage providers:
Provider |
Description |
|---|---|
|
Default. Fast, no durability. |
|
SQLite-based, single-file DB, zero-config. |
See Persistence & Storage for full configuration reference, database schema, and custom provider guide.
Next Steps
Server Setup — server configuration
Client Usage — client usage
Persistence & Storage — persistence & storage
Monitoring — monitoring