using MQTTnet; using MQTTnet.Client; using System.Text; using System.Text.Json; namespace PetWashControl.Services; public class MqttClientService { private readonly IMqttClient _mqttClient; private readonly MqttClientOptions _options; private readonly ConfigurationService _config; public event Action? MessageReceived; public bool IsConnected => _mqttClient.IsConnected; public MqttClientService(ConfigurationService? config = null) { _config = config ?? new ConfigurationService(); var factory = new MqttFactory(); _mqttClient = factory.CreateMqttClient(); _options = new MqttClientOptionsBuilder() .WithTcpServer(_config.MqttBrokerHost, _config.MqttBrokerPort) .WithClientId(_config.MqttClientId) .WithCleanSession() .Build(); _mqttClient.ApplicationMessageReceivedAsync += OnMessageReceived; _mqttClient.DisconnectedAsync += OnDisconnected; } public async Task ConnectAsync() { if (_mqttClient.IsConnected) return; try { await _mqttClient.ConnectAsync(_options, CancellationToken.None); // 订阅设备状态和命令主题 var subscribeOptions = new MqttClientSubscribeOptionsBuilder() .WithTopicFilter("device/status") .WithTopicFilter("device/command") .Build(); await _mqttClient.SubscribeAsync(subscribeOptions, CancellationToken.None); } catch (Exception ex) { throw new Exception($"MQTT连接失败: {ex.Message}", ex); } } public async Task DisconnectAsync() { if (_mqttClient.IsConnected) { await _mqttClient.DisconnectAsync(); } } public async Task PublishAsync(string topic, object payload) { if (!_mqttClient.IsConnected) { throw new InvalidOperationException("MQTT客户端未连接"); } var json = JsonSerializer.Serialize(payload); var message = new MqttApplicationMessageBuilder() .WithTopic(topic) .WithPayload(json) .WithQualityOfServiceLevel(MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce) .Build(); await _mqttClient.PublishAsync(message, CancellationToken.None); } private Task OnMessageReceived(MqttApplicationMessageReceivedEventArgs args) { var topic = args.ApplicationMessage.Topic; var payloadBytes = args.ApplicationMessage.PayloadSegment.ToArray(); var payload = Encoding.UTF8.GetString(payloadBytes); MessageReceived?.Invoke(topic, payload); return Task.CompletedTask; } private async Task OnDisconnected(MqttClientDisconnectedEventArgs args) { // 自动重连 if (!args.ClientWasConnected) return; await Task.Delay(TimeSpan.FromSeconds(5)); try { await ConnectAsync(); } catch { // 重连失败,等待下次重试 } } }