EquipmentService.cs 12.1 KB
using HHECS.BllModel;
using HHECS.DAQShared.Models;
using Microsoft.Extensions.Caching.Distributed;
using System.Text.Json;
using System.Text;
using HHECS.DAQShared.Dto;
using HHECS.DAQServer.Dto.Equipment;
using HHECS.DAQServer.DataFlag;

namespace HHECS.DAQServer.Services
{
    public class EquipmentService
    {
        private readonly IFreeSql<IOTCloundFlag> _iotCloundFreeSql;
        private readonly DataCacheService _dataCacheService;
        private readonly CommonService _commonService;
        private readonly IDistributedCache _cache;

        /// <summary>
        ///小于此时间,认为是无效数据,ORM框架有限制,应与实体类配置保持一致
        /// </summary>
        private readonly DateTime minStartTime;
        public EquipmentService(IFreeSql<IOTCloundFlag> iotCloundFreeSql, DataCacheService dataCacheService, CommonService commonService, IDistributedCache cache)
        {
            _iotCloundFreeSql = iotCloundFreeSql;
            _dataCacheService = dataCacheService;
            _commonService = commonService;
            _cache = cache;
            minStartTime = DateTime.Parse("2024-6-11");
        }

        /// <summary>
        /// 推送设备实时数据
        /// </summary>
        /// <param name="data"></param>
        /// <returns></returns>
        public async Task<BllResult> SendEquipmentData(IEnumerable<EquipmentDataDto> data)
        {
            try
            {
                if (!data.Any())
                {
                    return BllResultFactory.Error($"数据不能为空!");
                }

                if (data.Where(x => x.Version == 0).Any())
                {
                    return BllResultFactory.Error($"Vesion不能为“0”");
                }

                if (data.Where(x => _commonService.TimestampConvertToLocalDateTime(x.Timestamp) == default).Any())
                {
                    return BllResultFactory.Error($"时间戳“Timestamp”字段数据不正确,请参考 https://tool.lu/timestamp");
                }

                //缓存配置
                var options = new DistributedCacheEntryOptions().SetSlidingExpiration(TimeSpan.FromMinutes(10));

                //缓存设备实时数据
                foreach (var record in data.GroupBy(x => x.EquipmentSN))
                {
                    //获取最新时间戳的数据
                    var lastItem = record.OrderByDescending(x => x.Timestamp).First();

                    //获取当前缓存数据
                    var cacheDataBytes = await _cache.GetAsync(record.Key);
                    if (cacheDataBytes != null)
                    {
                        //设备状态数据队列
                        _dataCacheService.EquipmentStatusDictionary.AddOrUpdate(record.Key, lastItem, (key, oldValue) =>
                        {
                            if (lastItem.Timestamp > oldValue.Timestamp)
                            {
                                return lastItem;
                            }
                            return oldValue;
                        });

                        //数据缓存
                        var cacheData = JsonSerializer.Deserialize<EquipmentDataDto>(Encoding.Default.GetString(cacheDataBytes));
                        if (cacheData.Timestamp >= lastItem.Timestamp)
                        {
                            continue;
                        }
                    }

                    var encodedCurrentData = JsonSerializer.SerializeToUtf8Bytes(lastItem);
                    await _cache.SetAsync(record.Key, encodedCurrentData, options);
                }
                var maxCacheCount = _commonService.GetMaxCacheCount();

                //缓存队列超过设定值,暂时不记录到队列,降低数据丢失的风险
                var currentQueueCount = _dataCacheService.EquipmentDataRecordQueue.Count;
                if (currentQueueCount >= maxCacheCount)
                {
                    return BllResultFactory.Error($"数据队列缓存超过设定值({maxCacheCount}),待当前队列数据({currentQueueCount})处理完成后才能继续");
                }

                var records = data.Where(x =>
                {
                    var time = _commonService.TimestampConvertToLocalDateTime(x.Timestamp);
                    if (time == default)
                    {
                        return false;
                    }

                    if (time < minStartTime)
                    {
                        return false;
                    }
                    return true;
                }).OrderBy(x => x.Timestamp).ToList();

                //加入缓存队列,存入数据库
                foreach (var item in records)
                {
                    _dataCacheService.EquipmentDataRecordQueue.Enqueue(item);
                }

                return BllResultFactory.Success();
            }
            catch (Exception ex)
            {
                return BllResultFactory.Error(ex.Message);
            }
        }

        public async Task<BllResult> SendEquipmentDataV2(IEnumerable<EquipmentDataV2Dto> data)
        {
            try
            {
                var maxCacheCount = _commonService.GetMaxCacheCount();
                var currentQueueCount = _dataCacheService.EquipmentDataRecordQueue.Count;
                if (currentQueueCount >= maxCacheCount)
                {
                    return BllResultFactory.Error($"数据队列缓存超过设定值[{maxCacheCount}],待当前队列数据[{currentQueueCount}]处理完成后才能继续");
                }

                foreach (var item in data.OrderBy(x => x.EquipmentSN).ThenBy(x => x.TimestampStart))
                {
                    var startTime = _commonService.TimestampConvertToLocalDateTime(item.TimestampStart);
                    var endTime = _commonService.TimestampConvertToLocalDateTime(item.TimestampEnd);
                    //无效时间
                    if (startTime <= minStartTime || startTime == default || endTime == default)
                    {
                        continue;
                    }

                    var nextStartTime = startTime;
                    //3秒间隔,分割数据
                    const int seccond = 3;
                    EquipmentDataDto lastRecord;
                    do
                    {
                        var currentTime = nextStartTime < endTime ? nextStartTime : endTime;
                        DateTimeOffset localTime = DateTime.SpecifyKind(currentTime, DateTimeKind.Local);
                        var timestamp = localTime.ToUnixTimeMilliseconds();
                        var record = new EquipmentDataDto
                        {
                            Plmeid = item.Plmeid,
                            EquipmentSN = item.EquipmentSN,
                            Reported = item.Reported,
                            Version = item.Version,
                            Timestamp = timestamp,
                        };
                        _dataCacheService.EquipmentDataRecordQueue.Enqueue(record);
                        nextStartTime = nextStartTime.AddSeconds(seccond);
                        lastRecord = new EquipmentDataDto
                        {
                            Plmeid = item.Plmeid,
                            EquipmentSN = item.EquipmentSN,
                            Reported = item.Reported,
                            Timestamp = timestamp,
                            Version = item.Version,
                        };
                    } while (nextStartTime <= endTime.AddSeconds(seccond));

                    if (lastRecord != null)
                    {
                        _dataCacheService.EquipmentStatusDictionary.AddOrUpdate(item.EquipmentSN, lastRecord, (key, oldValue) =>
                        {
                            if (lastRecord.Timestamp > oldValue.Timestamp)
                            {
                                return lastRecord;
                            }
                            return oldValue;
                        });
                    }
                }
                await Task.Delay(1);
                return BllResultFactory.Success();
            }
            catch (Exception ex)
            {
                return BllResultFactory.Error(ex.Message);
            }
        }

        /// <summary>
        /// 更新客户端状态
        /// </summary>
        /// <param name="clientId"></param>
        /// <returns></returns>
        public async Task<BllResult> UpdateClientStatus(Guid clientId)
        {
            try
            {
                var newValueToAdd = DateTime.Now;
                using var clientConfigRepository = _iotCloundFreeSql.GetRepository<ClientConfig>();
                if (!await clientConfigRepository.Where(x => x.Id == clientId).AnyAsync())
                {
                    return BllResultFactory.Error($"客户端标识:{clientId}不存在,请查验后再试!");
                }
                //添加到队列,统一更新
                _dataCacheService.ClientStatusDictionary.AddOrUpdate(clientId, newValueToAdd, (key, oldValue) => newValueToAdd);
                return BllResultFactory.Success();
            }
            catch (Exception ex)
            {
                return BllResultFactory.Error(ex.Message);
            }
        }

        public async Task<BllResult> SendClientLog(SoftLogDto softLog)
        {
            try
            {
                using var clientConfigRepository = _iotCloundFreeSql.GetRepository<ClientConfig>();
                using var sysSoftLogRepository = _iotCloundFreeSql.GetRepository<SysSoftLog>();
                using var baseProjectRepository = _iotCloundFreeSql.GetRepository<BaseProject>();
                using var baseFactoryRepository = _iotCloundFreeSql.GetRepository<BaseFactory>();
                var client = await clientConfigRepository.Where(x => x.Id == softLog.ClientId).FirstAsync();
                if (client == null)
                {
                    return BllResultFactory.Error($"未找到ClientId“{softLog.ClientId}”对应的客户端配置信息,请检查数据后重试!");
                }

                var projectName = baseProjectRepository.Where(x => x.ProjectCode == client.ProjectCode).First(x => x.ProjectName);
                var factoryName = baseFactoryRepository.Where(x => x.FactoryCode == client.FactoryCode).First(x => x.FactoryName);
                var logs = softLog.Logs.Select(x => new SysSoftLog
                {
                    ClientKey = softLog.ClientId,
                    ProjectCode = client.ProjectCode,
                    ProjectName = projectName,
                    Types = client.Type,
                    Level = x.LogType,
                    Message = x.Msg,
                    Area = factoryName,
                    BackupField1 = x.BackupField1,
                    BackupField2 = x.BackupField2,
                    BackupField3 = x.BackupField3,
                    BackupField4 = x.BackupField4,
                    Remarks = x.Remarks,
                    ErrorTime = _commonService.TimestampConvertToLocalDateTime(x.Timestamp),
                    CreateTime = DateTime.Now,
                }).ToList();
                if (logs.Where(x => x.ErrorTime == default).Any())
                {
                    return BllResultFactory.Error($"日志时间戳“Timestamp”字段数据不正确,请参考 https://tool.lu/timestamp");
                }

                logs.ForEach(x =>
                {
                    //精确到秒
                    x.ErrorTime = new DateTime(x.ErrorTime.Year, x.ErrorTime.Month, x.ErrorTime.Day, x.ErrorTime.Hour, x.ErrorTime.Minute, x.ErrorTime.Second);
                });

                //过滤掉已存在的数据
                var temps = logs.Where(x => !sysSoftLogRepository.Where(o => o.Message == x.Message && o.ErrorTime == x.ErrorTime).Any()).ToList();
                await sysSoftLogRepository.InsertAsync(logs);
                return BllResultFactory.Success();
            }
            catch (Exception ex)
            {
                return BllResultFactory.Error(ex.Message);
            }
        }
    }
}