Files
petwash/PetWashControl/Services/MqttClientService.cs

111 lines
3.2 KiB
C#
Raw Normal View History

2026-02-25 15:43:47 +08:00
using MQTTnet;
using MQTTnet.Client;
using System.Text;
using System.Text.Json;
namespace PetWashControl.Services;
public class MqttClientService
{
private readonly IMqttClient _mqttClient;
private readonly MqttClientOptions _options;
private readonly ConfigurationService _config;
public event Action<string, string>? MessageReceived;
public bool IsConnected => _mqttClient.IsConnected;
public MqttClientService(ConfigurationService? config = null)
{
_config = config ?? new ConfigurationService();
var factory = new MqttFactory();
_mqttClient = factory.CreateMqttClient();
_options = new MqttClientOptionsBuilder()
.WithTcpServer(_config.MqttBrokerHost, _config.MqttBrokerPort)
.WithClientId(_config.MqttClientId)
2026-03-03 16:49:57 +08:00
//.WithCredentials(_config.MqttId, _config.MqttApiKey)
2026-02-25 15:43:47 +08:00
.WithCleanSession()
.Build();
_mqttClient.ApplicationMessageReceivedAsync += OnMessageReceived;
_mqttClient.DisconnectedAsync += OnDisconnected;
}
public async Task ConnectAsync()
{
if (_mqttClient.IsConnected)
return;
try
{
await _mqttClient.ConnectAsync(_options, CancellationToken.None);
// 订阅设备状态和命令主题
var subscribeOptions = new MqttClientSubscribeOptionsBuilder()
.WithTopicFilter("device/status")
.WithTopicFilter("device/command")
.Build();
await _mqttClient.SubscribeAsync(subscribeOptions, CancellationToken.None);
}
catch (Exception ex)
{
throw new Exception($"MQTT连接失败: {ex.Message}", ex);
}
}
public async Task DisconnectAsync()
{
if (_mqttClient.IsConnected)
{
await _mqttClient.DisconnectAsync();
}
}
public async Task PublishAsync(string topic, object payload)
{
if (!_mqttClient.IsConnected)
{
throw new InvalidOperationException("MQTT客户端未连接");
}
var json = JsonSerializer.Serialize(payload);
var message = new MqttApplicationMessageBuilder()
.WithTopic(topic)
.WithPayload(json)
.WithQualityOfServiceLevel(MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce)
.Build();
await _mqttClient.PublishAsync(message, CancellationToken.None);
}
private Task OnMessageReceived(MqttApplicationMessageReceivedEventArgs args)
{
var topic = args.ApplicationMessage.Topic;
var payloadBytes = args.ApplicationMessage.PayloadSegment.ToArray();
var payload = Encoding.UTF8.GetString(payloadBytes);
MessageReceived?.Invoke(topic, payload);
return Task.CompletedTask;
}
private async Task OnDisconnected(MqttClientDisconnectedEventArgs args)
{
// 自动重连
if (!args.ClientWasConnected)
return;
await Task.Delay(TimeSpan.FromSeconds(5));
try
{
await ConnectAsync();
}
catch
{
// 重连失败,等待下次重试
}
}
}