using MQTTnet; using MQTTnet.Server; using System.Text; using System.Text.Json; namespace PetWash.Api.Services; public class MqttService : IHostedService { private readonly MqttServer _mqttServer; private readonly ILogger _logger; public MqttService(ILogger logger) { _logger = logger; var optionsBuilder = new MqttServerOptionsBuilder() .WithDefaultEndpoint() .WithDefaultEndpointPort(1883); _mqttServer = new MqttFactory().CreateMqttServer(optionsBuilder.Build()); _mqttServer.ClientConnectedAsync += OnClientConnected; _mqttServer.ClientDisconnectedAsync += OnClientDisconnected; } public async Task StartAsync(CancellationToken cancellationToken) { await _mqttServer.StartAsync(); _logger.LogInformation("MQTT Broker started on port 1883"); } public async Task StopAsync(CancellationToken cancellationToken) { await _mqttServer.StopAsync(); _logger.LogInformation("MQTT Broker stopped"); } private Task OnClientConnected(ClientConnectedEventArgs args) { _logger.LogInformation($"Client connected: {args.ClientId}"); return Task.CompletedTask; } private Task OnClientDisconnected(ClientDisconnectedEventArgs args) { _logger.LogInformation($"Client disconnected: {args.ClientId}"); return Task.CompletedTask; } public async Task PublishAsync(string topic, object payload) { var json = JsonSerializer.Serialize(payload); var message = new MqttApplicationMessageBuilder() .WithTopic(topic) .WithPayload(json) .WithQualityOfServiceLevel(MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce) .WithRetainFlag(false) .Build(); await _mqttServer.InjectApplicationMessage(new InjectedMqttApplicationMessage(message) { SenderClientId = "PetWash.Api" }); _logger.LogInformation($"Published to {topic}: {json}"); } }