Examples
This guide contains practical examples of using VibeMQ.
Basic Examples
Simple Publisher
using Microsoft.Extensions.Logging;
using VibeMQ.Client;
using var loggerFactory = LoggerFactory.Create(builder => {
builder.SetMinimumLevel(LogLevel.Information).AddConsole();
});
var logger = loggerFactory.CreateLogger<VibeMQClient>();
await using var publisher = await VibeMQClient.ConnectAsync(
"localhost",
8080,
new ClientOptions { AuthToken = "my-token" },
logger
);
Console.WriteLine("Publisher connected. Enter message (Enter to exit):");
while (true) {
var input = Console.ReadLine();
if (string.IsNullOrWhiteSpace(input)) break;
await publisher.PublishAsync("messages", new {
Text = input,
Timestamp = DateTime.Now.ToString("HH:mm:ss")
});
Console.WriteLine("✓ Message sent");
}
Simple Subscriber
using Microsoft.Extensions.Logging;
using VibeMQ.Client;
using var loggerFactory = LoggerFactory.Create(builder => {
builder.SetMinimumLevel(LogLevel.Information).AddConsole();
});
var logger = loggerFactory.CreateLogger<VibeMQClient>();
await using var subscriber = await VibeMQClient.ConnectAsync(
"localhost",
8080,
new ClientOptions { AuthToken = "my-token" },
logger
);
await using var subscription = await subscriber.SubscribeAsync<dynamic>(
"messages",
async msg => {
Console.WriteLine($"📨 {msg.Text} (at {msg.Timestamp})");
}
);
Console.WriteLine("Subscriber started. Press Enter to exit...");
Console.ReadLine();
Server
using Microsoft.Extensions.Logging;
using VibeMQ.Server;
using VibeMQ.Core.Enums;
using var loggerFactory = LoggerFactory.Create(builder => {
builder.SetMinimumLevel(LogLevel.Information).AddConsole();
});
var broker = BrokerBuilder.Create()
.UsePort(8080)
.UseAuthentication("my-token")
.ConfigureQueues(options => {
options.DefaultDeliveryMode = DeliveryMode.RoundRobin;
options.MaxQueueSize = 10_000;
options.EnableAutoCreate = true;
})
.ConfigureHealthChecks(options => {
options.Enabled = true;
options.Port = 8081;
})
.UseLoggerFactory(loggerFactory)
.Build();
Console.WriteLine("Starting VibeMQ server...");
await broker.RunAsync(CancellationToken.None);
Task Queues
Order Processing
Publisher (OrderService.cs):
using VibeMQ.Client;
public class OrderService {
private readonly VibeMQClient _client;
public OrderService(VibeMQClient client) {
_client = client;
}
public async Task CreateOrderAsync(Order order) {
// Save order to database
await SaveOrderAsync(order);
// Publish event
await _client.PublishAsync("orders.created", new {
OrderId = order.Id,
Amount = order.Amount,
CustomerId = order.CustomerId,
CreatedAt = DateTime.UtcNow
});
}
private Task SaveOrderAsync(Order order) {
// Simulate saving
return Task.CompletedTask;
}
}
public class Order {
public string Id { get; set; }
public decimal Amount { get; set; }
public string CustomerId { get; set; }
}
Handler (OrderProcessor.cs):
using VibeMQ.Client;
public class OrderProcessor : BackgroundService {
private readonly IVibeMQClientFactory _clientFactory;
private readonly ILogger<OrderProcessor> _logger;
public OrderProcessor(
IVibeMQClientFactory clientFactory,
ILogger<OrderProcessor> logger) {
_clientFactory = clientFactory;
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken) {
await using var client = await _clientFactory.CreateAsync(stoppingToken);
await using var subscription = await client.SubscribeAsync<dynamic>(
"orders.created",
async order => {
_logger.LogInformation(
"Processing order {OrderId} for amount {Amount}",
order.OrderId,
order.Amount
);
try {
await ProcessOrderAsync(order);
_logger.LogInformation("Order {OrderId} processed", order.OrderId);
} catch (Exception ex) {
_logger.LogError(ex, "Error processing order {OrderId}", order.OrderId);
throw; // For retry
}
},
stoppingToken
);
_logger.LogInformation("OrderProcessor started");
try {
await Task.Delay(Timeout.Infinite, stoppingToken);
} catch (OperationCanceledException) {
_logger.LogInformation("OrderProcessor stopped");
}
}
private Task ProcessOrderAsync(dynamic order) {
// Order processing
return Task.CompletedTask;
}
}
DI Registration:
using VibeMQ.Client.DependencyInjection;
var host = Host.CreateDefaultBuilder(args)
.ConfigureServices(services => {
services.AddVibeMQClient(settings => {
settings.Host = "localhost";
settings.Port = 8080;
settings.ClientOptions.AuthToken = "my-token";
});
services.AddHostedService<OrderProcessor>();
})
.Build();
await host.RunAsync();
Background Tasks with Priorities
using VibeMQ.Client;
using VibeMQ.Core.Enums;
public class TaskQueueService {
private readonly VibeMQClient _client;
public TaskQueueService(VibeMQClient client) {
_client = client;
}
public async Task EnqueueTaskAsync(TaskData task, MessagePriority priority) {
await _client.PublishAsync("tasks", task, options => {
options.Priority = priority;
options.Headers = new Dictionary<string, string> {
["task_type"] = task.Type,
["created_at"] = DateTime.UtcNow.ToString("O")
};
});
}
}
public class TaskData {
public string Id { get; set; }
public string Type { get; set; }
public Dictionary<string, object> Data { get; set; }
}
Usage:
// Normal task
await taskQueue.EnqueueTaskAsync(new TaskData {
Id = "task_1",
Type = "email",
Data = new { To = "user@example.com" }
}, MessagePriority.Normal);
// Critical task
await taskQueue.EnqueueTaskAsync(new TaskData {
Id = "task_2",
Type = "payment",
Data = new { Amount = 100 }
}, MessagePriority.Critical);
Event Bus
EventBus Implementation
using VibeMQ.Client.DependencyInjection;
public interface IEventBus {
Task PublishAsync<T>(string eventType, T eventData, CancellationToken ct = default);
IDisposable SubscribeAsync<T>(string eventType, Func<T, Task> handler);
}
public class VibeMQEventBus : IEventBus {
private readonly IVibeMQClientFactory _clientFactory;
private readonly ILogger<VibeMQEventBus> _logger;
private readonly List<IAsyncDisposable> _subscriptions = new();
public VibeMQEventBus(
IVibeMQClientFactory clientFactory,
ILogger<VibeMQEventBus> logger) {
_clientFactory = clientFactory;
_logger = logger;
}
public async Task PublishAsync<T>(string eventType, T eventData, CancellationToken ct = default) {
await using var client = await _clientFactory.CreateAsync(ct);
await client.PublishAsync($"events.{eventType}", eventData, options => {
options.Headers = new Dictionary<string, string> {
["event_type"] = eventType,
["timestamp"] = DateTime.UtcNow.ToString("O")
};
});
_logger.LogInformation("Event {EventType} published", eventType);
}
public IDisposable SubscribeAsync<T>(string eventType, Func<T, Task> handler) {
// Async subscription
var subscriptionTask = SubscribeInternalAsync<T>(eventType, handler);
return new AsyncDisposableWrapper(subscriptionTask);
}
private async Task<IAsyncDisposable> SubscribeInternalAsync<T>(
string eventType,
Func<T, Task> handler) {
var client = await _clientFactory.CreateAsync();
var subscription = await client.SubscribeAsync<T>(
$"events.{eventType}",
async eventData => {
_logger.LogInformation("Received event {EventType}", eventType);
await handler(eventData);
}
);
_subscriptions.Add(subscription);
return subscription;
}
}
// Wrapper for async disposable
public class AsyncDisposableWrapper : IDisposable {
private readonly Task<IAsyncDisposable> _subscriptionTask;
private IAsyncDisposable? _subscription;
public AsyncDisposableWrapper(Task<IAsyncDisposable> subscriptionTask) {
_subscriptionTask = subscriptionTask;
}
public void Dispose() {
_ = Task.Run(async () => {
if (_subscription == null) {
_subscription = await _subscriptionTask;
}
await _subscription.DisposeAsync();
});
}
}
Registration:
services.AddSingleton<IEventBus, VibeMQEventBus>();
Usage:
public class OrderService {
private readonly IEventBus _eventBus;
public OrderService(IEventBus eventBus) {
_eventBus = eventBus;
}
public async Task CreateOrderAsync(Order order) {
await SaveOrderAsync(order);
await _eventBus.PublishAsync("order.created", new {
OrderId = order.Id,
Amount = order.Amount
});
await _eventBus.PublishAsync("order.validated", new {
OrderId = order.Id,
ValidatedAt = DateTime.UtcNow
});
}
}
public class EmailService {
public EmailService(IEventBus eventBus) {
eventBus.SubscribeAsync<OrderCreated>("order.created", async order => {
await SendOrderConfirmationEmailAsync(order);
});
}
}
Microservices
Data Synchronization Between Services
User Service (Users.cs):
public class UserService {
private readonly IEventBus _eventBus;
public UserService(IEventBus eventBus) {
_eventBus = eventBus;
}
public async Task RegisterUserAsync(User user) {
await SaveUserAsync(user);
await _eventBus.PublishAsync("user.registered", new {
UserId = user.Id,
Email = user.Email,
RegisteredAt = DateTime.UtcNow
});
}
}
Notification Service (Notifications.cs):
public class NotificationService : BackgroundService {
private readonly IEventBus _eventBus;
private readonly ILogger<NotificationService> _logger;
public NotificationService(IEventBus eventBus, ILogger<NotificationService> logger) {
_eventBus = eventBus;
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken) {
await _eventBus.SubscribeAsync<dynamic>("user.registered", async data => {
_logger.LogInformation(
"Sending welcome email to user {Email}",
data.Email
);
await SendWelcomeEmailAsync(data.Email);
});
await Task.Delay(Timeout.Infinite, stoppingToken);
}
}
Analytics Service (Analytics.cs):
public class AnalyticsService : BackgroundService {
private readonly IEventBus _eventBus;
private readonly ILogger<AnalyticsService> _logger;
public AnalyticsService(IEventBus eventBus, ILogger<AnalyticsService> logger) {
_eventBus = eventBus;
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken) {
await _eventBus.SubscribeAsync<dynamic>("user.registered", async data => {
_logger.LogInformation("User registered: {UserId}", data.UserId);
await TrackEventAsync("user_registered", data);
});
await _eventBus.SubscribeAsync<dynamic>("order.created", async data => {
_logger.LogInformation("Order created: {OrderId}", data.OrderId);
await TrackEventAsync("order_created", data);
});
await Task.Delay(Timeout.Infinite, stoppingToken);
}
}
CQRS with VibeMQ
Commands:
public class CommandBus {
private readonly VibeMQClient _client;
public CommandBus(VibeMQClient client) {
_client = client;
}
public async Task SendAsync<T>(string commandName, T command) {
await _client.PublishAsync($"commands.{commandName}", command, options => {
options.Headers = new Dictionary<string, string> {
["command_type"] = typeof(T).Name,
["correlation_id"] = Guid.NewGuid().ToString()
};
});
}
}
Queries:
public class QueryBus {
private readonly VibeMQClient _client;
public QueryBus(VibeMQClient client) {
_client = client;
}
public async Task<TResponse> AskAsync<TResponse>(string queryName, object query) {
var correlationId = Guid.NewGuid().ToString();
var tcs = new TaskCompletionSource<TResponse>();
// Temporary subscription for response
var subscription = await _client.SubscribeAsync<TResponse>(
$"queries.{queryName}.response",
msg => {
if (msg.Headers?["correlation_id"] == correlationId) {
tcs.SetResult(msg);
}
return Task.CompletedTask;
}
);
// Send request
await _client.PublishAsync($"queries.{queryName}", query, options => {
options.Headers = new Dictionary<string, string> {
["correlation_id"] = correlationId,
["reply_to"] = $"queries.{queryName}.response"
};
});
// Wait for response with timeout
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
cts.Token.Register(() => {
tcs.TrySetException(new TimeoutException("Query timeout"));
subscription.DisposeAsync();
});
return await tcs.Task;
}
}
Monitoring and Logging
Custom Logger
public class FileLogger : ILogger {
private readonly string _filePath;
private readonly SemaphoreSlim _semaphore = new(1, 1);
public FileLogger(string filePath) {
_filePath = filePath;
}
public IDisposable BeginScope<TState>(TState state) => null;
public bool IsEnabled(LogLevel logLevel) => logLevel >= LogLevel.Information;
public void Log<TState>(
LogLevel logLevel,
EventId eventId,
TState state,
Exception exception,
Func<TState, Exception, string> formatter) {
if (!IsEnabled(logLevel)) return;
_ = Task.Run(async () => {
await _semaphore.WaitAsync();
try {
var message = $"{DateTime.Now:O} [{logLevel}] {formatter(state, exception)}";
await File.AppendAllTextAsync(_filePath, message + Environment.NewLine);
} finally {
_semaphore.Release();
}
});
}
}
public class FileLoggerProvider : ILoggerProvider {
private readonly string _filePath;
public FileLoggerProvider(string filePath) {
_filePath = filePath;
}
public ILogger CreateLogger(string categoryName) => new FileLogger(_filePath);
public void Dispose() { }
}
Usage:
using var loggerFactory = new FileLoggerProvider("logs/vibemq.log");
var broker = BrokerBuilder.Create()
.UsePort(8080)
.UseLoggerFactory(loggerFactory)
.Build();
Application Metrics
public class AppMetrics {
private readonly IBrokerMetrics _brokerMetrics;
private readonly ILogger<AppMetrics> _logger;
private readonly Timer _timer;
public AppMetrics(
IBrokerMetrics brokerMetrics,
ILogger<AppMetrics> logger) {
_brokerMetrics = brokerMetrics;
_logger = logger;
_timer = new Timer(_ => LogMetrics(), null, TimeSpan.Zero, TimeSpan.FromSeconds(30));
}
private void LogMetrics() {
var snapshot = _brokerMetrics.GetSnapshot();
_logger.LogInformation(
"Metrics: Published={Published}, Delivered={Delivered}, " +
"Connections={Connections}, Latency={Latency:F2}ms",
snapshot.TotalMessagesPublished,
snapshot.TotalMessagesDelivered,
snapshot.ActiveConnections,
snapshot.AverageDeliveryLatencyMs
);
}
public void Dispose() => _timer.Dispose();
}
Next Steps
Quick Start — quick start
Client Usage — client usage
DI Integration — DI integration