MQTTHelper.cs
3.12 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
using MQTTnet;
using MQTTnet.Protocol;
using MQTTnet.Server;
using RECS_MQTT.Model;
using System;
using System.Data;
using System.Data.SqlClient;
using System.Text;
using System.Threading.Tasks;
namespace RECS_MQTT
{
class MQTTHelper
{
public static async Task<IMqttServer> GetMqttServer()
{
// Configure MQTT server.
var optionsBuilder = new MqttServerOptionsBuilder()
.WithConnectionBacklog(1000)
.WithDefaultEndpointPort(AppSession.Port)
.WithDefaultEndpointBoundIPAddress(AppSession.IP)
//客户端验证
.WithConnectionValidator(c =>
{
if (c.Username != AppSession.UserName)
{
c.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
return;
}
if (c.Password != AppSession.Password)
{
c.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
return;
}
c.ReasonCode = MqttConnectReasonCode.Success;
})
.WithApplicationMessageInterceptor(context =>
{
var payload = context.ApplicationMessage.Payload;
if (payload == null)
{
context.AcceptPublish = false;
return;
}
var msg = Encoding.UTF8.GetString(context.ApplicationMessage.Payload);
Console.WriteLine($"{DateTime.Now.ToString()}:收到消息,客户端:{context.ClientId},主题:{context.ApplicationMessage.Topic},消息:{msg}");
try
{
//using IDbConnection connection = new SqlConnection(AppSession.Constr);
var mqttMsgEntity = new MQTTMsgEntity()
{
Topic = context.ApplicationMessage.Topic,
Msg = msg,
Created = DateTime.Now,
ClientId = context.ClientId
};
//connection.Insert(new MQTTMsgEntity()
//{
// Topic = context.ApplicationMessage.Topic,
// Msg = msg,
// Created = DateTime.Now,
// ClientId = context.ClientId
//});
}
catch (Exception ex)
{
Console.WriteLine($"缓存数据出现异常:{ex.Message}-{ex.InnerException?.Message}");
}
//对于华恒而言,没有客户端会进行订阅,这里一概拦截
context.AcceptPublish = false;
});
var mqttServer = new MqttFactory().CreateMqttServer();
await mqttServer.StartAsync(optionsBuilder.Build());
return mqttServer;
}
}
}