MQTTHelper.cs 3.12 KB
using MQTTnet;
using MQTTnet.Protocol;
using MQTTnet.Server;
using RECS_MQTT.Model;
using System;
using System.Data;
using System.Data.SqlClient;
using System.Text;
using System.Threading.Tasks;

namespace RECS_MQTT
{
    class MQTTHelper
    {
        public static async Task<IMqttServer> GetMqttServer()
        {
            // Configure MQTT server.
            var optionsBuilder = new MqttServerOptionsBuilder()
                .WithConnectionBacklog(1000)
                .WithDefaultEndpointPort(AppSession.Port)
                .WithDefaultEndpointBoundIPAddress(AppSession.IP)
                //客户端验证
                .WithConnectionValidator(c =>
                {
                    if (c.Username != AppSession.UserName)
                    {
                        c.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
                        return;
                    }

                    if (c.Password != AppSession.Password)
                    {
                        c.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
                        return;
                    }

                    c.ReasonCode = MqttConnectReasonCode.Success;
                })
                .WithApplicationMessageInterceptor(context =>
                {
                    var payload = context.ApplicationMessage.Payload;
                    if (payload == null)
                    {
                        context.AcceptPublish = false;
                        return;
                    }
                    var msg = Encoding.UTF8.GetString(context.ApplicationMessage.Payload);
                    Console.WriteLine($"{DateTime.Now.ToString()}:收到消息,客户端:{context.ClientId},主题:{context.ApplicationMessage.Topic},消息:{msg}");
                    try
                    {
                        //using IDbConnection connection = new SqlConnection(AppSession.Constr);
                        var mqttMsgEntity = new MQTTMsgEntity()
                        {
                            Topic = context.ApplicationMessage.Topic,
                            Msg = msg,
                            Created = DateTime.Now,
                            ClientId = context.ClientId
                        };
                        //connection.Insert(new MQTTMsgEntity()
                        //{
                        //    Topic = context.ApplicationMessage.Topic,
                        //    Msg = msg,
                        //    Created = DateTime.Now,
                        //    ClientId = context.ClientId
                        //});
                    }
                    catch (Exception ex)
                    {
                        Console.WriteLine($"缓存数据出现异常:{ex.Message}-{ex.InnerException?.Message}");
                    }
                    //对于华恒而言,没有客户端会进行订阅,这里一概拦截
                    context.AcceptPublish = false;
                });

            var mqttServer = new MqttFactory().CreateMqttServer();
            await mqttServer.StartAsync(optionsBuilder.Build());
            return mqttServer;

        }
    }
}