MQTTService.cs 5.59 KB
using Microsoft.Extensions.Hosting;
using MQTTnet.Protocol;
using MQTTnet;
using MQTTnet.Server;
using RECS_MQTT.Model;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Data;
using System.Data.SqlClient;
using Org.BouncyCastle.Asn1.X509.Qualified;
using MySqlX.XDevAPI.Relational;

namespace RECS_MQTT
{
    public class MQTTService : IHostedService
    {
        IMqttServer mqttServer = null;

        public Task StartAsync(CancellationToken cancellationToken)
        {
            //mqttServer =  MQTTHelper.GetMqttServer().Result;
            //return Task.CompletedTask;
            var optionsBuilder = new MqttServerOptionsBuilder()
               .WithConnectionBacklog(1000)
               .WithDefaultEndpointPort(AppSession.Port)
               .WithDefaultEndpointBoundIPAddress(AppSession.IP)
               //客户端验证
               .WithConnectionValidator(c =>
               {
                   if (c.Username != AppSession.UserName)
                   {
                       c.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
                       return;
                   }

                   if (c.Password != AppSession.Password)
                   {
                       c.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
                       return;
                   }

                   c.ReasonCode = MqttConnectReasonCode.Success;
               })
               .WithApplicationMessageInterceptor(context =>
               {
                   var payload = context.ApplicationMessage.Payload;
                   if (payload == null)
                   {
                       context.AcceptPublish = false;
                       return;
                   }
                   var msg = Encoding.UTF8.GetString(context.ApplicationMessage.Payload);
                   Console.WriteLine($"{DateTime.Now.ToString()}:收到消息,客户端:{context.ClientId},主题:{context.ApplicationMessage.Topic},消息:{msg}");
                   try
                   {
                       var mqttMsgEntity = new MQTTMsgEntity()
                       {
                           Topic = context.ApplicationMessage.Topic,
                           Msg = msg,
                           Created = DateTime.Now,
                           ClientId = context.ClientId
                       };
                    
                       SqlConnection connection = new SqlConnection(AppSession.Constr);
                       try
                       {
                           string strSQL = $" insert into mqttmsg (keys,topic, msg,created,clientId) VALUES ('{Guid.NewGuid()}','{context.ApplicationMessage.Topic}','{msg}','{DateTime.Now}','{context.ClientId}'); ";
                           //1 删除临时表
                           //2创建临时表
                           //3找到数量
                           //4保留最后2条数据
                           strSQL += @$"    declare @count int ;
                                            if object_id('tempdb..#tempTabl') is not null drop table #tempTabl
                                           
                                            select * into #tempTabl from (
                                               select  keys,clientId ,created from mqttmsg  where clientId='{context.ClientId}'   
                                            )tt
                                            
                                           select @count= (select  count(1) from  #tempTabl)
                                           if(@count>2)
                                           
                                           begin
                                           delete from  mqttmsg where clientId ='{context.ClientId}' and keys not in
                                           (
                                              select top 2 keys from  #tempTabl order by  created desc
                                           )
                                            end ";
                           connection.Open();

                           SqlCommand cmd = new SqlCommand(strSQL, connection);
                           int rst = cmd.ExecuteNonQuery();
                           if (rst > 0)
                           {
                               Console.WriteLine("执行成功!");
                           }
                           else
                           {
                               Console.WriteLine("执行失败!");
                           }
                          
                       }
                       catch (Exception ex)
                       {
                           Console.WriteLine("异常:" + ex.Message);
                       }
                       finally {
                           connection.Close();
                       }
                       
                   }
                   catch (Exception ex)
                   {
                       Console.WriteLine($"缓存数据出现异常:{ex.Message}-{ex.InnerException?.Message}");
                   }
                   //对于华恒而言,没有客户端会进行订阅,这里一概拦截
                   context.AcceptPublish = false;
               });

            var mqttServer = new MqttFactory().CreateMqttServer();
            return mqttServer.StartAsync(optionsBuilder.Build());
        }

        public Task StopAsync(CancellationToken cancellationToken)
        {
            Console.WriteLine($"{DateTime.Now}:程序退出");
            return Task.CompletedTask;
        }
    }
}