MqttRpcProxyController.cs 5.23 KB
using HHECS.BllModel;
using HHECS.DAQShared.Dto;
using Microsoft.AspNetCore.Mvc;
using MQTTnet;
using MQTTnet.Extensions.Rpc;
using MQTTnet.Protocol;
using System.Text;
using System.Text.Json;

namespace HHECS.DAQServer.Controllers
{
    /// <summary>
    /// MqttRpc代理
    /// </summary>
    [Route("api/[controller]/[action]")]
    [ApiController]
    public class MqttRpcProxyController : ControllerBase
    {
        private readonly ILogger<MqttRpcProxyController> _logger;

        public MqttRpcProxyController(ILogger<MqttRpcProxyController> logger)
        {
            _logger = logger;
        }

        /// <summary>
        /// 读取数据
        /// </summary>
        /// <param name="clientId">采集端唯一标识</param>
        /// <param name="equipmentAddressDatas">地址数据</param>
        /// <param name="timeoutSeconds">超时时间(秒)</param>
        /// <returns></returns>
        [HttpPost("{clientId:guid}")]
        public async Task<BllResult<List<EquipmentAddressData>>> ReadAsync(Guid clientId, List<EquipmentAddressData> equipmentAddressDatas, int timeoutSeconds = 10)
        {
            try
            {
                var mqttFactory = new MqttClientFactory();
                using var client = mqttFactory.CreateMqttClient();
                var mqttClientOptions = GetMqttClientOptions(HttpContext.Request);
                var connectResult = await client.ConnectAsync(mqttClientOptions);
                if (connectResult.ResultCode != MqttClientConnectResultCode.Success)
                {
                    return BllResultFactory.Error<List<EquipmentAddressData>>("Mqtt服务连接失败");
                }
                var methodName = $"{clientId}_Read";
                using var mqttRpcClient = mqttFactory.CreateMqttRpcClient(client);
                var payload = JsonSerializer.Serialize(equipmentAddressDatas);
                var requestResult = await mqttRpcClient.ExecuteAsync(TimeSpan.FromSeconds(timeoutSeconds), methodName, payload, MqttQualityOfServiceLevel.AtLeastOnce);
                var result = JsonSerializer.Deserialize<BllResult<List<EquipmentAddressData>>>(Encoding.Default.GetString(requestResult));
                return result;
            }
            catch (Exception ex)
            {
                _logger.LogError("An error occurred while reading data: {Message}", ex.Message);
                return BllResultFactory.Error<List<EquipmentAddressData>>(ex.Message);
            }
        }

        /// <summary>
        /// 写入数据
        /// </summary>
        /// <param name="clientId">采集端唯一标识</param>
        /// <param name="equipmentAddressDatas">地址数据</param>
        /// <param name="timeoutSeconds">超时时间(秒)</param>
        /// <returns></returns>
        [HttpPost("{clientId:guid}")]
        public async Task<BllResult> WriteAsync(Guid clientId, List<EquipmentAddressData> equipmentAddressDatas, int timeoutSeconds = 10)
        {
            try
            {
                var mqttFactory = new MqttClientFactory();
                using var client = mqttFactory.CreateMqttClient();
                var mqttClientOptions = GetMqttClientOptions(HttpContext.Request);
                var connectResult = await client.ConnectAsync(mqttClientOptions);
                if (connectResult.ResultCode != MqttClientConnectResultCode.Success)
                {
                    return BllResultFactory.Error<List<EquipmentAddressData>>("Mqtt服务连接失败");
                }
                var methodName = $"{clientId}_Write";
                using var mqttRpcClient = mqttFactory.CreateMqttRpcClient(client);
                var payload = JsonSerializer.Serialize(equipmentAddressDatas);
                var requestResult = await mqttRpcClient.ExecuteAsync(TimeSpan.FromSeconds(timeoutSeconds), methodName, payload, MqttQualityOfServiceLevel.AtLeastOnce);
                var result = JsonSerializer.Deserialize<BllResult>(Encoding.Default.GetString(requestResult));
                return result;
            }
            catch (Exception ex)
            {
                _logger.LogError("An error occurred while writing data: {Message}", ex.Message);
                return BllResultFactory.Error(ex.Message);
            }
        }

        private static MqttClientOptions GetMqttClientOptions(HttpRequest httpRequest)
        {
            var uri = string.Empty;
            if (httpRequest.IsHttps)
            {
                uri = $"wss://{httpRequest.Host}/mqtt";
            }
            else
            {
                uri = $"ws://{httpRequest.Host}/mqtt";
            }
            var mqttClientOptions = new MqttClientOptionsBuilder()
                //.WithWebSocketServer(o => o.WithUri("wss://broker.emqx.io:8084/mqtt"))
                .WithWebSocketServer(o => o.WithUri(uri))
                .WithProtocolVersion(MQTTnet.Formatter.MqttProtocolVersion.V500)
                .WithTlsOptions(o =>
                {
                    // The used public broker sometimes has invalid certificates. This sample accepts all
                    // certificates. This should not be used in live environments.
                    o.WithCertificateValidationHandler(_ => true);
                }).Build();
            return mqttClientOptions;
        }
    }
}