2026-02-25 15:43:47 +08:00
|
|
|
using MQTTnet;
|
|
|
|
|
using MQTTnet.Client;
|
|
|
|
|
using System.Text;
|
|
|
|
|
using System.Text.Json;
|
|
|
|
|
|
|
|
|
|
namespace PetWashControl.Services;
|
|
|
|
|
|
|
|
|
|
public class MqttClientService
|
|
|
|
|
{
|
|
|
|
|
private readonly IMqttClient _mqttClient;
|
|
|
|
|
private readonly MqttClientOptions _options;
|
|
|
|
|
private readonly ConfigurationService _config;
|
|
|
|
|
|
|
|
|
|
public event Action<string, string>? MessageReceived;
|
|
|
|
|
public bool IsConnected => _mqttClient.IsConnected;
|
|
|
|
|
|
|
|
|
|
public MqttClientService(ConfigurationService? config = null)
|
|
|
|
|
{
|
|
|
|
|
_config = config ?? new ConfigurationService();
|
|
|
|
|
|
|
|
|
|
var factory = new MqttFactory();
|
|
|
|
|
_mqttClient = factory.CreateMqttClient();
|
|
|
|
|
|
|
|
|
|
_options = new MqttClientOptionsBuilder()
|
|
|
|
|
.WithTcpServer(_config.MqttBrokerHost, _config.MqttBrokerPort)
|
|
|
|
|
.WithClientId(_config.MqttClientId)
|
2026-03-03 16:49:57 +08:00
|
|
|
//.WithCredentials(_config.MqttId, _config.MqttApiKey)
|
2026-02-25 15:43:47 +08:00
|
|
|
.WithCleanSession()
|
|
|
|
|
.Build();
|
|
|
|
|
|
|
|
|
|
_mqttClient.ApplicationMessageReceivedAsync += OnMessageReceived;
|
|
|
|
|
_mqttClient.DisconnectedAsync += OnDisconnected;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public async Task ConnectAsync()
|
|
|
|
|
{
|
|
|
|
|
if (_mqttClient.IsConnected)
|
|
|
|
|
return;
|
|
|
|
|
|
|
|
|
|
try
|
|
|
|
|
{
|
|
|
|
|
await _mqttClient.ConnectAsync(_options, CancellationToken.None);
|
|
|
|
|
|
|
|
|
|
// 订阅设备状态和命令主题
|
|
|
|
|
var subscribeOptions = new MqttClientSubscribeOptionsBuilder()
|
|
|
|
|
.WithTopicFilter("device/status")
|
|
|
|
|
.WithTopicFilter("device/command")
|
|
|
|
|
.Build();
|
|
|
|
|
|
|
|
|
|
await _mqttClient.SubscribeAsync(subscribeOptions, CancellationToken.None);
|
|
|
|
|
}
|
|
|
|
|
catch (Exception ex)
|
|
|
|
|
{
|
|
|
|
|
throw new Exception($"MQTT连接失败: {ex.Message}", ex);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public async Task DisconnectAsync()
|
|
|
|
|
{
|
|
|
|
|
if (_mqttClient.IsConnected)
|
|
|
|
|
{
|
|
|
|
|
await _mqttClient.DisconnectAsync();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public async Task PublishAsync(string topic, object payload)
|
|
|
|
|
{
|
|
|
|
|
if (!_mqttClient.IsConnected)
|
|
|
|
|
{
|
|
|
|
|
throw new InvalidOperationException("MQTT客户端未连接");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var json = JsonSerializer.Serialize(payload);
|
|
|
|
|
var message = new MqttApplicationMessageBuilder()
|
|
|
|
|
.WithTopic(topic)
|
|
|
|
|
.WithPayload(json)
|
|
|
|
|
.WithQualityOfServiceLevel(MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce)
|
|
|
|
|
.Build();
|
|
|
|
|
|
|
|
|
|
await _mqttClient.PublishAsync(message, CancellationToken.None);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private Task OnMessageReceived(MqttApplicationMessageReceivedEventArgs args)
|
|
|
|
|
{
|
|
|
|
|
var topic = args.ApplicationMessage.Topic;
|
|
|
|
|
var payloadBytes = args.ApplicationMessage.PayloadSegment.ToArray();
|
|
|
|
|
var payload = Encoding.UTF8.GetString(payloadBytes);
|
|
|
|
|
|
|
|
|
|
MessageReceived?.Invoke(topic, payload);
|
|
|
|
|
return Task.CompletedTask;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private async Task OnDisconnected(MqttClientDisconnectedEventArgs args)
|
|
|
|
|
{
|
|
|
|
|
// 自动重连
|
|
|
|
|
if (!args.ClientWasConnected)
|
|
|
|
|
return;
|
|
|
|
|
|
|
|
|
|
await Task.Delay(TimeSpan.FromSeconds(5));
|
|
|
|
|
|
|
|
|
|
try
|
|
|
|
|
{
|
|
|
|
|
await ConnectAsync();
|
|
|
|
|
}
|
|
|
|
|
catch
|
|
|
|
|
{
|
|
|
|
|
// 重连失败,等待下次重试
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|