MQTTService.cs
5.59 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
using Microsoft.Extensions.Hosting;
using MQTTnet.Protocol;
using MQTTnet;
using MQTTnet.Server;
using RECS_MQTT.Model;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Data;
using System.Data.SqlClient;
using Org.BouncyCastle.Asn1.X509.Qualified;
using MySqlX.XDevAPI.Relational;
namespace RECS_MQTT
{
public class MQTTService : IHostedService
{
IMqttServer mqttServer = null;
public Task StartAsync(CancellationToken cancellationToken)
{
//mqttServer = MQTTHelper.GetMqttServer().Result;
//return Task.CompletedTask;
var optionsBuilder = new MqttServerOptionsBuilder()
.WithConnectionBacklog(1000)
.WithDefaultEndpointPort(AppSession.Port)
.WithDefaultEndpointBoundIPAddress(AppSession.IP)
//客户端验证
.WithConnectionValidator(c =>
{
if (c.Username != AppSession.UserName)
{
c.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
return;
}
if (c.Password != AppSession.Password)
{
c.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
return;
}
c.ReasonCode = MqttConnectReasonCode.Success;
})
.WithApplicationMessageInterceptor(context =>
{
var payload = context.ApplicationMessage.Payload;
if (payload == null)
{
context.AcceptPublish = false;
return;
}
var msg = Encoding.UTF8.GetString(context.ApplicationMessage.Payload);
Console.WriteLine($"{DateTime.Now.ToString()}:收到消息,客户端:{context.ClientId},主题:{context.ApplicationMessage.Topic},消息:{msg}");
try
{
var mqttMsgEntity = new MQTTMsgEntity()
{
Topic = context.ApplicationMessage.Topic,
Msg = msg,
Created = DateTime.Now,
ClientId = context.ClientId
};
SqlConnection connection = new SqlConnection(AppSession.Constr);
try
{
string strSQL = $" insert into mqttmsg (keys,topic, msg,created,clientId) VALUES ('{Guid.NewGuid()}','{context.ApplicationMessage.Topic}','{msg}','{DateTime.Now}','{context.ClientId}'); ";
//1 删除临时表
//2创建临时表
//3找到数量
//4保留最后2条数据
strSQL += @$" declare @count int ;
if object_id('tempdb..#tempTabl') is not null drop table #tempTabl
select * into #tempTabl from (
select keys,clientId ,created from mqttmsg where clientId='{context.ClientId}'
)tt
select @count= (select count(1) from #tempTabl)
if(@count>2)
begin
delete from mqttmsg where clientId ='{context.ClientId}' and keys not in
(
select top 2 keys from #tempTabl order by created desc
)
end ";
connection.Open();
SqlCommand cmd = new SqlCommand(strSQL, connection);
int rst = cmd.ExecuteNonQuery();
if (rst > 0)
{
Console.WriteLine("执行成功!");
}
else
{
Console.WriteLine("执行失败!");
}
}
catch (Exception ex)
{
Console.WriteLine("异常:" + ex.Message);
}
finally {
connection.Close();
}
}
catch (Exception ex)
{
Console.WriteLine($"缓存数据出现异常:{ex.Message}-{ex.InnerException?.Message}");
}
//对于华恒而言,没有客户端会进行订阅,这里一概拦截
context.AcceptPublish = false;
});
var mqttServer = new MqttFactory().CreateMqttServer();
return mqttServer.StartAsync(optionsBuilder.Build());
}
public Task StopAsync(CancellationToken cancellationToken)
{
Console.WriteLine($"{DateTime.Now}:程序退出");
return Task.CompletedTask;
}
}
}