Examples

This guide contains practical examples of using VibeMQ.

Basic Examples

Simple Publisher

using Microsoft.Extensions.Logging;
using VibeMQ.Client;

using var loggerFactory = LoggerFactory.Create(builder => {
    builder.SetMinimumLevel(LogLevel.Information).AddConsole();
});

var logger = loggerFactory.CreateLogger<VibeMQClient>();

await using var publisher = await VibeMQClient.ConnectAsync(
    "localhost",
    2925,
    new ClientOptions { Username = "admin", Password = "my-password"  },
    logger
);

Console.WriteLine("Publisher connected. Enter message (Enter to exit):");

while (true) {
    var input = Console.ReadLine();
    if (string.IsNullOrWhiteSpace(input)) break;

    await publisher.PublishAsync("messages", new {
        Text = input,
        Timestamp = DateTime.Now.ToString("HH:mm:ss")
    });

    Console.WriteLine("✓ Message sent");
}

Simple Subscriber

using Microsoft.Extensions.Logging;
using VibeMQ.Client;

using var loggerFactory = LoggerFactory.Create(builder => {
    builder.SetMinimumLevel(LogLevel.Information).AddConsole();
});

var logger = loggerFactory.CreateLogger<VibeMQClient>();

await using var subscriber = await VibeMQClient.ConnectAsync(
    "localhost",
    2925,
    new ClientOptions { Username = "admin", Password = "my-password"  },
    logger
);

await using var subscription = await subscriber.SubscribeAsync<dynamic>(
    "messages",
    async msg => {
        Console.WriteLine($"📨 {msg.Text} (at {msg.Timestamp})");
    }
);

Console.WriteLine("Subscriber started. Press Enter to exit...");
Console.ReadLine();

Server

using Microsoft.Extensions.Logging;
using VibeMQ.Server;
using VibeMQ.Enums;

using var loggerFactory = LoggerFactory.Create(builder => {
    builder.SetMinimumLevel(LogLevel.Information).AddConsole();
});

var broker = BrokerBuilder.Create()
    .UsePort(2925)
    .UseAuthorization(options => {
    options.SuperuserUsername = "admin";
    options.SuperuserPassword = "my-password";
})
    .ConfigureQueues(options => {
        options.DefaultDeliveryMode = DeliveryMode.RoundRobin;
        options.MaxQueueSize = 10_000;
        options.EnableAutoCreate = true;
    })
    .ConfigureHealthChecks(options => {
        options.Enabled = true;
        options.Port = 2926;
    })
    .UseLoggerFactory(loggerFactory)
    .Build();

Console.WriteLine("Starting VibeMQ server...");
await broker.RunAsync(CancellationToken.None);

Task Queues

Order Processing

Publisher (OrderService.cs):

using VibeMQ.Client;

public class OrderService {
    private readonly VibeMQClient _client;

    public OrderService(VibeMQClient client) {
        _client = client;
    }

    public async Task CreateOrderAsync(Order order) {
        // Save order to database
        await SaveOrderAsync(order);

        // Publish event
        await _client.PublishAsync("orders.created", new {
            OrderId = order.Id,
            Amount = order.Amount,
            CustomerId = order.CustomerId,
            CreatedAt = DateTime.UtcNow
        });
    }

    private Task SaveOrderAsync(Order order) {
        // Simulate saving
        return Task.CompletedTask;
    }
}

public class Order {
    public string Id { get; set; }
    public decimal Amount { get; set; }
    public string CustomerId { get; set; }
}

Handler (OrderProcessor.cs):

using VibeMQ.Client;

public class OrderProcessor : BackgroundService {
    private readonly IVibeMQClientFactory _clientFactory;
    private readonly ILogger<OrderProcessor> _logger;

    public OrderProcessor(
        IVibeMQClientFactory clientFactory,
        ILogger<OrderProcessor> logger) {
        _clientFactory = clientFactory;
        _logger = logger;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken) {
        await using var client = await _clientFactory.CreateAsync(stoppingToken);

        await using var subscription = await client.SubscribeAsync<dynamic>(
            "orders.created",
            async order => {
                _logger.LogInformation(
                    "Processing order {OrderId} for amount {Amount}",
                    order.OrderId,
                    order.Amount
                );

                try {
                    await ProcessOrderAsync(order);
                    _logger.LogInformation("Order {OrderId} processed", order.OrderId);
                } catch (Exception ex) {
                    _logger.LogError(ex, "Error processing order {OrderId}", order.OrderId);
                    throw;  // For retry
                }
            },
            stoppingToken
        );

        _logger.LogInformation("OrderProcessor started");

        try {
            await Task.Delay(Timeout.Infinite, stoppingToken);
        } catch (OperationCanceledException) {
            _logger.LogInformation("OrderProcessor stopped");
        }
    }

    private Task ProcessOrderAsync(dynamic order) {
        // Order processing
        return Task.CompletedTask;
    }
}

DI Registration:

using VibeMQ.Client.DependencyInjection;

var host = Host.CreateDefaultBuilder(args)
    .ConfigureServices(services => {
        services.AddVibeMQClient(settings => {
            settings.Host = "localhost";
            settings.Port = 2925;
            settings.ClientOptions.Username = "admin";
            settings.ClientOptions.Password = "my-password";
        });

        services.AddHostedService<OrderProcessor>();
    })
    .Build();

await host.RunAsync();

Class-based Subscriptions

Using class-based handlers with automatic subscription:

using VibeMQ.Attributes;
using VibeMQ.Interfaces;
using VibeMQ.Client.DependencyInjection;

// Define message types
public class OrderCreated {
    public string OrderId { get; set; }
    public decimal Amount { get; set; }
    public string CustomerId { get; set; }
}

// Define handler with Queue attribute
[Queue("orders.created")]
public class OrderHandler : IMessageHandler<OrderCreated> {
    private readonly IOrderRepository _repository;
    private readonly ILogger<OrderHandler> _logger;

    public OrderHandler(IOrderRepository repository, ILogger<OrderHandler> logger) {
        _repository = repository;
        _logger = logger;
    }

    public async Task HandleAsync(OrderCreated message, CancellationToken cancellationToken) {
        _logger.LogInformation("Processing order {OrderId}", message.OrderId);

        try {
            await _repository.ProcessOrderAsync(message, cancellationToken);
            _logger.LogInformation("Order {OrderId} processed successfully", message.OrderId);
        } catch (Exception ex) {
            _logger.LogError(ex, "Error processing order {OrderId}", message.OrderId);
            throw;  // Retry delivery
        }
    }
}

// Register handler and enable automatic subscriptions
var host = Host.CreateDefaultBuilder(args)
    .ConfigureServices(services => {
        services.AddVibeMQClient(settings => {
            settings.Host = "localhost";
            settings.Port = 2925;
            settings.ClientOptions.Username = "admin";
            settings.ClientOptions.Password = "my-password";
        });

        services.AddScoped<IOrderRepository, OrderRepository>();
        services.AddMessageHandler<OrderCreated, OrderHandler>()
            .AddMessageHandlerSubscriptions();  // Auto-subscribe on startup
    })
    .Build();

await host.RunAsync();

Background Tasks with Priorities

using VibeMQ.Client;
using VibeMQ.Enums;

public class TaskQueueService {
    private readonly VibeMQClient _client;

    public TaskQueueService(VibeMQClient client) {
        _client = client;
    }

    public async Task EnqueueTaskAsync(TaskData task, MessagePriority priority) {
        await _client.PublishAsync("tasks", task, new Dictionary<string, string> {
            ["priority"] = priority.ToString(),
            ["task_type"] = task.Type,
            ["created_at"] = DateTime.UtcNow.ToString("O")
        });
    }
}

public class TaskData {
    public string Id { get; set; }
    public string Type { get; set; }
    public Dictionary<string, object> Data { get; set; }
}

Usage:

// Normal task
await taskQueue.EnqueueTaskAsync(new TaskData {
    Id = "task_1",
    Type = "email",
    Data = new { To = "user@example.com" }
}, MessagePriority.Normal);

// Critical task
await taskQueue.EnqueueTaskAsync(new TaskData {
    Id = "task_2",
    Type = "payment",
    Data = new { Amount = 100 }
}, MessagePriority.Critical);

Event Bus

EventBus Implementation

using VibeMQ.Client.DependencyInjection;

public interface IEventBus {
    Task PublishAsync<T>(string eventType, T eventData, CancellationToken ct = default);
    Task<IAsyncDisposable> SubscribeAsync<T>(string eventType, Func<T, Task> handler, CancellationToken ct = default);
}

public class VibeMQEventBus : IEventBus {
    private readonly IVibeMQClientFactory _clientFactory;
    private readonly ILogger<VibeMQEventBus> _logger;
    private readonly List<IAsyncDisposable> _subscriptions = new();

    public VibeMQEventBus(
        IVibeMQClientFactory clientFactory,
        ILogger<VibeMQEventBus> logger) {
        _clientFactory = clientFactory;
        _logger = logger;
    }

    public async Task PublishAsync<T>(string eventType, T eventData, CancellationToken ct = default) {
        await using var client = await _clientFactory.CreateAsync(ct);

        await client.PublishAsync($"events.{eventType}", eventData, new Dictionary<string, string> {
            ["event_type"] = eventType,
            ["timestamp"] = DateTime.UtcNow.ToString("O")
        }, ct);

        _logger.LogInformation("Event {EventType} published", eventType);
    }

    public async Task<IAsyncDisposable> SubscribeAsync<T>(string eventType, Func<T, Task> handler, CancellationToken ct = default) {
        var client = await _clientFactory.CreateAsync(ct);

        var subscription = await client.SubscribeAsync<T>(
            $"events.{eventType}",
            async eventData => {
                _logger.LogInformation("Received event {EventType}", eventType);
                await handler(eventData);
            },
            ct
        );

        _subscriptions.Add(subscription);
        return subscription;
    }
}

Registration:

services.AddSingleton<IEventBus, VibeMQEventBus>();

Usage:

public class OrderService {
    private readonly IEventBus _eventBus;

    public OrderService(IEventBus eventBus) {
        _eventBus = eventBus;
    }

    public async Task CreateOrderAsync(Order order) {
        await SaveOrderAsync(order);

        await _eventBus.PublishAsync("order.created", new {
            OrderId = order.Id,
            Amount = order.Amount
        });

        await _eventBus.PublishAsync("order.validated", new {
            OrderId = order.Id,
            ValidatedAt = DateTime.UtcNow
        });
    }
}

public class EmailService {
    private IAsyncDisposable? _subscription;

    public EmailService(IEventBus eventBus) {
        _ = Task.Run(async () => {
            _subscription = await eventBus.SubscribeAsync<OrderCreated>("order.created", async order => {
                await SendOrderConfirmationEmailAsync(order);
            });
        });
    }

    public async ValueTask DisposeAsync() {
        if (_subscription != null) {
            await _subscription.DisposeAsync();
        }
    }
}

Microservices

Data Synchronization Between Services

User Service (Users.cs):

public class UserService {
    private readonly IEventBus _eventBus;

    public UserService(IEventBus eventBus) {
        _eventBus = eventBus;
    }

    public async Task RegisterUserAsync(User user) {
        await SaveUserAsync(user);

        await _eventBus.PublishAsync("user.registered", new {
            UserId = user.Id,
            Email = user.Email,
            RegisteredAt = DateTime.UtcNow
        });
    }
}

Notification Service (Notifications.cs):

public class NotificationService : BackgroundService {
    private readonly IEventBus _eventBus;
    private readonly ILogger<NotificationService> _logger;

    public NotificationService(IEventBus eventBus, ILogger<NotificationService> logger) {
        _eventBus = eventBus;
        _logger = logger;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken) {
        await _eventBus.SubscribeAsync<dynamic>("user.registered", async data => {
            _logger.LogInformation(
                "Sending welcome email to user {Email}",
                data.Email
            );

            await SendWelcomeEmailAsync(data.Email);
        });

        await Task.Delay(Timeout.Infinite, stoppingToken);
    }
}

Analytics Service (Analytics.cs):

public class AnalyticsService : BackgroundService {
    private readonly IEventBus _eventBus;
    private readonly ILogger<AnalyticsService> _logger;

    public AnalyticsService(IEventBus eventBus, ILogger<AnalyticsService> logger) {
        _eventBus = eventBus;
        _logger = logger;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken) {
        await _eventBus.SubscribeAsync<dynamic>("user.registered", async data => {
            _logger.LogInformation("User registered: {UserId}", data.UserId);
            await TrackEventAsync("user_registered", data);
        });

        await _eventBus.SubscribeAsync<dynamic>("order.created", async data => {
            _logger.LogInformation("Order created: {OrderId}", data.OrderId);
            await TrackEventAsync("order_created", data);
        });

        await Task.Delay(Timeout.Infinite, stoppingToken);
    }
}

CQRS with VibeMQ

Commands:

public class CommandBus {
    private readonly VibeMQClient _client;

    public CommandBus(VibeMQClient client) {
        _client = client;
    }

    public async Task SendAsync<T>(string commandName, T command) {
        await _client.PublishAsync($"commands.{commandName}", command, new Dictionary<string, string> {
            ["command_type"] = typeof(T).Name,
            ["correlation_id"] = Guid.NewGuid().ToString()
        });
    }
}

Queries:

public class QueryBus {
    private readonly VibeMQClient _client;

    public QueryBus(VibeMQClient client) {
        _client = client;
    }

    public async Task<TResponse> AskAsync<TResponse>(string queryName, object query)
        where TResponse : class {
        var correlationId = Guid.NewGuid().ToString();
        var tcs = new TaskCompletionSource<TResponse>();

        // Temporary subscription for response (handler receives payload only; include CorrelationId in response type)
        await using var subscription = await _client.SubscribeAsync<QueryResponse<TResponse>>(
            $"queries.{queryName}.response",
            msg => {
                if (msg.CorrelationId == correlationId) {
                    tcs.TrySetResult(msg.Data);
                }
                return Task.CompletedTask;
            }
        );

        // Send request
        await _client.PublishAsync($"queries.{queryName}", query, new Dictionary<string, string> {
            ["correlation_id"] = correlationId,
            ["reply_to"] = $"queries.{queryName}.response"
        });

        // Wait for response with timeout
        using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
        cts.Token.Register(() => tcs.TrySetException(new TimeoutException("Query timeout")));

        return await tcs.Task;
    }
}

public class QueryResponse<T> {
    public string CorrelationId { get; set; } = "";
    public T? Data { get; set; }
}

Monitoring and Logging

Custom Logger

public class FileLogger : ILogger {
    private readonly string _filePath;
    private readonly SemaphoreSlim _semaphore = new(1, 1);

    public FileLogger(string filePath) {
        _filePath = filePath;
    }

    public IDisposable BeginScope<TState>(TState state) => null;

    public bool IsEnabled(LogLevel logLevel) => logLevel >= LogLevel.Information;

    public void Log<TState>(
        LogLevel logLevel,
        EventId eventId,
        TState state,
        Exception exception,
        Func<TState, Exception, string> formatter) {

        if (!IsEnabled(logLevel)) return;

        _ = Task.Run(async () => {
            await _semaphore.WaitAsync();
            try {
                var message = $"{DateTime.Now:O} [{logLevel}] {formatter(state, exception)}";
                await File.AppendAllTextAsync(_filePath, message + Environment.NewLine);
            } finally {
                _semaphore.Release();
            }
        });
    }
}

public class FileLoggerProvider : ILoggerProvider {
    private readonly string _filePath;

    public FileLoggerProvider(string filePath) {
        _filePath = filePath;
    }

    public ILogger CreateLogger(string categoryName) => new FileLogger(_filePath);

    public void Dispose() { }
}

Usage:

using var loggerFactory = LoggerFactory.Create(builder =>
    builder.AddProvider(new FileLoggerProvider("logs/vibemq.log")));

var broker = BrokerBuilder.Create()
    .UsePort(2925)
    .UseLoggerFactory(loggerFactory)
    .Build();

Application Metrics

public class AppMetrics {
    private readonly IBrokerMetrics _brokerMetrics;
    private readonly ILogger<AppMetrics> _logger;
    private readonly Timer _timer;

    public AppMetrics(
        IBrokerMetrics brokerMetrics,
        ILogger<AppMetrics> logger) {
        _brokerMetrics = brokerMetrics;
        _logger = logger;

        _timer = new Timer(_ => LogMetrics(), null, TimeSpan.Zero, TimeSpan.FromSeconds(30));
    }

    private void LogMetrics() {
        var snapshot = _brokerMetrics.GetSnapshot();

        _logger.LogInformation(
            "Metrics: Published={Published}, Delivered={Delivered}, " +
            "Connections={Connections}, Latency={Latency:F2}ms",
            snapshot.TotalMessagesPublished,
            snapshot.TotalMessagesDelivered,
            snapshot.ActiveConnections,
            snapshot.AverageDeliveryLatencyMs
        );
    }

    public void Dispose() => _timer.Dispose();
}

Next Steps