Client Usage

This guide describes various ways to use the VibeMQ client.

Using with Dependency Injection (IVibeMQClient)

In ASP.NET Core or Worker Service you can inject ``IVibeMQClient`` and use it without calling ConnectAsync or managing disposal. The client is shared (Singleton) and connects lazily on first PublishAsync or SubscribeAsync. See DI Integration for registration and examples.

using VibeMQ.Client;
using VibeMQ.Client.DependencyInjection;

// Registration (e.g. in Program.cs)
services.AddLogging();
services.AddVibeMQClient(settings => {
    settings.Host = "localhost";
    settings.Port = 8080;
    settings.ClientOptions.AuthToken = "my-token";
});

// In any service
public class MyService {
    private readonly IVibeMQClient _vibeMQ;
    public MyService(IVibeMQClient vibeMQ) => _vibeMQ = vibeMQ;

    public async Task SendAsync() {
        await _vibeMQ.PublishAsync("queue", new { Text = "Hello" });
    }
}

The concrete type VibeMQClient implements IVibeMQClient. For manual connection and full control, use VibeMQClient.ConnectAsync as in the sections below.

Connecting to Server

Basic Connection

using VibeMQ.Client;

await using var client = await VibeMQClient.ConnectAsync(
    "localhost",
    8080
);

Console.WriteLine($"Connected: {client.IsConnected}");

Connection with Authentication

var client = await VibeMQClient.ConnectAsync(
    "localhost",
    8080,
    new ClientOptions {
        AuthToken = "my-secret-token"
    }
);

Connection with Logging

using Microsoft.Extensions.Logging;

using var loggerFactory = LoggerFactory.Create(builder => {
    builder.SetMinimumLevel(LogLevel.Information).AddConsole();
});

var logger = loggerFactory.CreateLogger<VibeMQClient>();

var client = await VibeMQClient.ConnectAsync(
    "localhost",
    8080,
    new ClientOptions { AuthToken = "my-token" },
    logger
);

Publishing Messages

Basic Publishing

await client.PublishAsync("notifications", new {
    Title = "Hello",
    Body = "This is a test message",
    Timestamp = DateTime.Now
});

Publishing with Headers

You can attach custom headers to messages, including priority:

using VibeMQ.Core.Enums;

// Publish with custom headers
await client.PublishAsync("orders", orderData, new Dictionary<string, string> {
    ["correlationId"] = Guid.NewGuid().ToString(),
    ["source"] = "order-service",
    ["version"] = "1.0"
});

// Publish with priority via headers
await client.PublishAsync("alerts", alertData, new Dictionary<string, string> {
    ["priority"] = MessagePriority.Critical.ToString()
});

// Publish with both priority and custom headers
await client.PublishAsync("orders", orderData, new Dictionary<string, string> {
    ["priority"] = MessagePriority.High.ToString(),
    ["correlationId"] = Guid.NewGuid().ToString(),
    ["source"] = "order-service"
});

Typed Publishing

Create a class for the message:

public class OrderCreated {
    public string OrderId { get; set; }
    public decimal Amount { get; set; }
    public DateTime CreatedAt { get; set; }
}

Use it:

await client.PublishAsync("orders.created", new OrderCreated {
    OrderId = "ORD-123",
    Amount = 99.99m,
    CreatedAt = DateTime.UtcNow
});

Subscribing to Messages

Basic Subscription

await using var subscription = await client.SubscribeAsync<dynamic>(
    "notifications",
    async msg => {
        Console.WriteLine($"Received: {msg.Title} - {msg.Body}");
    }
);

Typed Subscription

public class Notification {
    public string Title { get; set; }
    public string Body { get; set; }
}

await using var subscription = await client.SubscribeAsync<Notification>(
    "notifications",
    async notification => {
        Console.WriteLine($"{notification.Title}: {notification.Body}");
        await ProcessNotificationAsync(notification);
    }
);

Subscription with Error Handling

await using var subscription = await client.SubscribeAsync<Notification>(
    "notifications",
    async notification => {
        try {
            await ProcessNotificationAsync(notification);
        } catch (Exception ex) {
            Console.WriteLine($"Processing error: {ex.Message}");
            throw;  // Broker will retry delivery
        }
    }
);

Class-based Subscriptions

Instead of using lambda handlers, you can create handler classes implementing IMessageHandler<T>:

using VibeMQ.Core.Interfaces;

// Define message handler
public class OrderHandler : IMessageHandler<OrderCreated> {
    private readonly ILogger<OrderHandler> _logger;

    public OrderHandler(ILogger<OrderHandler> logger) {
        _logger = logger;
    }

    public async Task HandleAsync(OrderCreated message, CancellationToken cancellationToken) {
        _logger.LogInformation("Processing order {OrderId}", message.OrderId);
        await ProcessOrderAsync(message, cancellationToken);
    }

    private Task ProcessOrderAsync(OrderCreated order, CancellationToken ct) {
        // Process order
        return Task.CompletedTask;
    }
}

// Subscribe using handler class
await using var subscription = await client.SubscribeAsync<OrderCreated, OrderHandler>("orders.created");

This approach provides better testability, dependency injection support, and cleaner code organization.

Multiple Subscriptions

var subscriptions = new List<IAsyncDisposable>();

// Subscribe to multiple queues
subscriptions.Add(await client.SubscribeAsync<Order>(
    "orders.created",
    async order => await HandleOrderAsync(order)
));

subscriptions.Add(await client.SubscribeAsync<Payment>(
    "payments.completed",
    async payment => await HandlePaymentAsync(payment)
));

subscriptions.Add(await client.SubscribeAsync<Notification>(
    "notifications",
    async notification => await ShowNotificationAsync(notification)
));

// Release resources
foreach (var subscription in subscriptions) {
    await subscription.DisposeAsync();
}

Unsubscribing from Queue

Automatic Unsubscribe

When using await using, unsubscription happens automatically:

await using var subscription = await client.SubscribeAsync<dynamic>(
    "notifications",
    async msg => { /* processing */ }
);
// DisposeAsync() is called automatically

Manual Unsubscribe

var subscription = await client.SubscribeAsync<dynamic>(
    "notifications",
    async msg => { /* processing */ }
);

// When you need to unsubscribe
await subscription.DisposeAsync();

Or via client method:

await client.UnsubscribeAsync("notifications");

Creating Queues

You can create queues explicitly before publishing or subscribing:

using VibeMQ.Core.Configuration;
using VibeMQ.Core.Enums;

// Create queue with default options
await client.CreateQueueAsync("my-queue");

// Create queue with custom options
var options = new QueueOptions {
    Mode = DeliveryMode.FanOutWithAck,
    MaxQueueSize = 5000,
    EnableDeadLetterQueue = true,
    MaxRetryAttempts = 5
};
await client.CreateQueueAsync("my-queue", options);

Note

Queue management methods (DeleteQueueAsync, GetQueueInfoAsync, ListQueuesAsync) are available on the server side via IQueueManager, not on the client. Queues are typically created automatically when publishing to a non-existent queue (if EnableAutoCreate is enabled) or configured on the server side.

Client Settings

ClientOptions

var options = new ClientOptions {
    // Authentication
    AuthToken = "my-secret-token",

    // Keep-alive
    KeepAliveInterval = TimeSpan.FromSeconds(30),

    // Command timeout
    CommandTimeout = TimeSpan.FromSeconds(10),

    // TLS
    UseTls = false,
    SkipCertificateValidation = false,

    // Reconnect policy
    ReconnectPolicy = new ReconnectPolicy {
        MaxAttempts = 10,
        InitialDelay = TimeSpan.FromSeconds(1),
        MaxDelay = TimeSpan.FromMinutes(5),
        UseExponentialBackoff = true
    }
};

ReconnectPolicy

Configure reconnection policy:

ReconnectPolicy = new ReconnectPolicy {
    MaxAttempts = int.MaxValue,      // Max attempts
    InitialDelay = TimeSpan.FromSeconds(1),  // Initial delay
    MaxDelay = TimeSpan.FromMinutes(5),      // Maximum delay
    UseExponentialBackoff = true     // Exponential increase
}

How it works:

  • Attempt 1: immediately

  • Attempt 2: after 1s

  • Attempt 3: after 2s

  • Attempt 4: after 4s

  • Attempt 5: after 8s

  • Attempt N: after 5min (maximum)

TLS/SSL Connection

Connection with TLS:

var client = await VibeMQClient.ConnectAsync(
    "localhost",
    8080,
    new ClientOptions {
        UseTls = true,
        AuthToken = "my-token"
    }
);

Warning

For production, use valid certificates. SkipCertificateValidation = true only for tests!

Handling Disconnections

Automatic Reconnection

Client automatically reconnects on connection loss:

var client = await VibeMQClient.ConnectAsync(
    "localhost",
    8080,
    new ClientOptions {
        ReconnectPolicy = new ReconnectPolicy {
            MaxAttempts = 10,
            UseExponentialBackoff = true
        }
    }
);

// Subscription will be restored after reconnect
await using var subscription = await client.SubscribeAsync<dynamic>(
    "notifications",
    async msg => { /* processing */ }
);

Status Check

if (client.IsConnected) {
    await client.PublishAsync("queue", data);
} else {
    Console.WriteLine("Client disconnected");
}

Events (Optional)

For status tracking, you can use periodic checks:

_ = Task.Run(async () => {
    while (true) {
        await Task.Delay(5000);
        Console.WriteLine($"Status: {(client.IsConnected ? "Connected" : "Disconnected")}");
    }
});

Disconnecting

Graceful Disconnect

await client.DisconnectAsync();

Using Using

await using var client = await VibeMQClient.ConnectAsync("localhost", 8080);

// Work with client

// DisposeAsync() is called automatically

Full resource release:

await client.DisposeAsync();

Usage Examples

Simple Publisher

using VibeMQ.Client;

await using var publisher = await VibeMQClient.ConnectAsync("localhost", 8080, new ClientOptions {
    AuthToken = "my-token"
});

Console.WriteLine("Publisher connected. Enter message (Enter to exit):");

while (true) {
    var input = Console.ReadLine();
    if (string.IsNullOrWhiteSpace(input)) break;

    await publisher.PublishAsync("messages", new {
        Text = input,
        Timestamp = DateTime.Now
    });

    Console.WriteLine("✓ Message sent");
}

Simple Subscriber

using VibeMQ.Client;

await using var subscriber = await VibeMQClient.ConnectAsync("localhost", 8080, new ClientOptions {
    AuthToken = "my-token"
});

await using var subscription = await subscriber.SubscribeAsync<dynamic>(
    "messages",
    async msg => {
        Console.WriteLine($"📨 {msg.Text} (at {msg.Timestamp})");
    }
);

Console.WriteLine("Subscriber started. Press Enter to exit...");
Console.ReadLine();

Task Processing Worker

using VibeMQ.Client;

public class OrderProcessor {
    private readonly VibeMQClient _client;

    public OrderProcessor(VibeMQClient client) {
        _client = client;
    }

    public async Task StartAsync(CancellationToken cancellationToken) {
        await using var subscription = await _client.SubscribeAsync<Order>(
            "orders.process",
            async order => {
                try {
                    await ProcessOrderAsync(order);
                    Console.WriteLine($"✓ Order {order.Id} processed");
                } catch (Exception ex) {
                    Console.WriteLine($"✗ Error: {ex.Message}");
                    throw;  // For retry
                }
            },
            cancellationToken
        );

        await Task.Delay(Timeout.Infinite, cancellationToken);
    }

    private Task ProcessOrderAsync(Order order) {
        // Order processing
        return Task.CompletedTask;
    }
}

public class Order {
    public string Id { get; set; }
    public decimal Amount { get; set; }
    public string Customer { get; set; }
}

Event Bus for Microservices

using VibeMQ.Client;

public class EventBus {
    private readonly VibeMQClient _client;
    private readonly ILogger<EventBus> _logger;

    public EventBus(VibeMQClient client, ILogger<EventBus> logger) {
        _client = client;
        _logger = logger;
    }

    public async Task PublishAsync<T>(string eventType, T eventData) {
        await _client.PublishAsync($"events.{eventType}", eventData, new Dictionary<string, string> {
            ["event_type"] = eventType,
            ["timestamp"] = DateTime.UtcNow.ToString("O")
        });

        _logger.LogInformation("Event {EventType} published", eventType);
    }

    public async Task SubscribeAsync<T>(string eventType, Func<T, Task> handler) {
        await _client.SubscribeAsync<T>(
            $"events.{eventType}",
            async eventData => {
                _logger.LogInformation("Received event {EventType}", eventType);
                await handler(eventData);
            }
        );
    }
}

Troubleshooting

Connection Error

Error: Connection refused

Causes:

  • Server not running

  • Wrong port

  • Firewall blocking connection

Solution:

// Check connection parameters
var client = await VibeMQClient.ConnectAsync(
    "localhost",  // Or correct host
    8080,         // Or correct port
    new ClientOptions { ... }
);

Authentication Error

Error: Authentication failed

Solution: Make sure tokens match:

// Server
.UseAuthentication("my-token")

// Client
new ClientOptions { AuthToken = "my-token" }

Connection Timeout

Error: Connection timeout

Solution: Increase timeout:

new ClientOptions {
    CommandTimeout = TimeSpan.FromSeconds(30)
}

Frequent Disconnections

Cause: Network or server issues

Solution: Configure reconnect policy:

new ClientOptions {
    ReconnectPolicy = new ReconnectPolicy {
        MaxAttempts = 50,  // Increase attempts
        InitialDelay = TimeSpan.FromSeconds(2),
        MaxDelay = TimeSpan.FromMinutes(1)
    }
}

Next Steps