IOTCloudBackgroundService.cs 20.2 KB
using HHECS.DAQShared.Models;
using HHECS.DAQWebClient.Model;
using HHECS.DAQWebClient.Models;
using System.Diagnostics;

namespace HHECS.DAQWebClient.Services
{
    public class IOTCloudBackgroundService : BackgroundService
    {
        private readonly IFreeSql _freeSql;
        private readonly CommonService _commonService;
        private readonly IOTCloudService _cloudService;

        public IOTCloudBackgroundService(IFreeSql freeSql, CommonService commonService, IOTCloudService cloudService)
        {
            _freeSql = freeSql;
            _commonService = commonService;
            _cloudService = cloudService;
        }

        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            //缓存时间,分钟
            const int cacheMinutesTime = 3;
            var _lastAExecution = DateTime.MinValue;
            var _lastBExecution = DateTime.MinValue;
            while (!stoppingToken.IsCancellationRequested)
            {
                try
                {
                    await Task.Delay(1000, stoppingToken);
                    //更新客户端状态
                    if ((DateTime.Now - _lastAExecution) > TimeSpan.FromMinutes(1))
                    {
                        _lastAExecution = DateTime.Now;
                        var clientIdValue = _freeSql.Queryable<LocalConfig>().Where(x => x.Code == ConfigType.ClientId.ToString()).First(x => x.Value);
                        _ = Guid.TryParse(clientIdValue, out var clientId);
                        if (clientId == Guid.Empty)
                        {
                            _commonService.PrintLog($"推送客户端状态数据失败:当前客户端未配置有效ClientId", LogLevel.Warning);
                        }
                        else
                        {
                            var updateStatusResult = await _cloudService.UpdateClientStatusAsync(clientId);
                            if (!updateStatusResult.Success)
                            {
                                _commonService.PrintLog($"推送客户端状态数据失败:{updateStatusResult.Msg}", LogLevel.Error);
                            }
                        }
                    }

                    var commitCountValue = _freeSql.Queryable<LocalConfig>().Where(x => x.Code == ConfigType.CommitCount.ToString()).First(x => x.Value);
                    _ = int.TryParse(commitCountValue, out var _commitCount);
                    if (_commitCount == 0)
                    {
                        //未配置,默认100条
                        _commitCount = 100;
                    }

                    var haveRecord = _freeSql.Queryable<EquipmentDataQueue>().Any();
                    if (!_commonService.EquipmentDataQueues.IsEmpty)
                    {
                        var temps = new List<EquipmentDataQueue>();
                        var commitFailureRecords = new List<EquipmentDataQueue>();
                        //启用数据压缩
                        if (_commonService.DataCompression)
                        {
                            while (!_commonService.EquipmentDataQueues.IsEmpty)
                            {
                                var result = _commonService.EquipmentDataQueues.TryDequeue(out var item);
                                if (item != null)
                                {
                                    temps.Add(item);
                                }
                            }
                        }
                        //未启用数据压缩
                        else
                        {
                            for (int i = 0; i < _commitCount; i++)
                            {
                                var result = _commonService.EquipmentDataQueues.TryDequeue(out var item);
                                if (!result) break;//队列已清空
                                if (item != null)
                                {
                                    temps.Add(item);
                                }
                            }
                        }

                        //自动上传启用,且数据库无未上传的数据,则直接推送
                        if (_commonService.AutoCommit && !_commonService.DataCompression && !haveRecord)
                        {
                            var tasks = new List<Task<List<EquipmentDataQueue>>>();
                            foreach (var item in temps.GroupBy(x => x.EquipmentCode))
                            {
                                tasks.Add(Task.Run(async () =>
                                {
                                    Stopwatch stopwatch = Stopwatch.StartNew();
                                    var records = item.OrderBy(x => x.SourceTimestamp).ToList();
                                    var result = await _cloudService.SendEquipmentDataAsync(records);
                                    if (!result.Success)
                                    {
                                        _commonService.PrintLog($"推送设备[{item.Key}]数据失败,{result.Msg},耗时:{stopwatch.ElapsedMilliseconds}ms", LogLevel.Error);
                                        return records;
                                    }
                                    _commonService.PrintLog($"成功推送{records.Count}条设备[{item.Key}]数据,耗时:{stopwatch.ElapsedMilliseconds}ms");
                                    return new List<EquipmentDataQueue>();
                                }));
                            }
                            Task.WaitAll(tasks.ToArray());
                            commitFailureRecords = tasks.SelectMany(x => x.Result).ToList();
                        }
                        //自动上传关闭或数据库存在未上传的记录,则需要存入数据库
                        else
                        {
                            commitFailureRecords.AddRange(temps);
                        }

                        //将上传失败的数据存入数据库
                        if (commitFailureRecords.Count > 0)
                        {
                            //节流模式
                            if (_commonService.DataCompression)
                            {
                                var dictionaryUpdateTemps = new Dictionary<string, EquipmentDataQueue>();
                                var dictionaryAddTemps = new Dictionary<string, List<EquipmentDataQueue>>();
                                foreach (var item in commitFailureRecords.GroupBy(x => x.EquipmentCode))
                                {
                                    var lastRecord = _freeSql.Queryable<EquipmentDataQueue>().Where(x => x.EquipmentCode == item.Key).OrderByDescending(x => x.SourceTimestamp).First();
                                    var addTemps = new List<EquipmentDataQueue>();
                                    dictionaryAddTemps[item.Key] = addTemps;//传递集合引用
                                    foreach (var record in item.OrderBy(x => x.SourceTimestamp).ToList())
                                    {
                                        var lastTemp = lastRecord;
                                        if (addTemps.Count > 0)
                                        {
                                            lastTemp = addTemps.OrderByDescending(x => x.SourceTimestamp).First();
                                        }

                                        if (lastTemp != null)
                                        {
                                            var currentRecordTime = DateTimeOffset.FromUnixTimeMilliseconds(record.SourceTimestamp).LocalDateTime;
                                            var lastRecordTime = (lastTemp.Updated ?? lastTemp.Created!).Value;
                                            if (lastTemp.Reported == record.Reported && currentRecordTime.Date == lastRecordTime.Date
                                            && (currentRecordTime - lastRecordTime).TotalSeconds <= 5)
                                            {
                                                lastTemp.Updated = currentRecordTime;

                                                //是数据库里面的数据,有变化需要更新
                                                if (lastTemp.Id != Guid.Empty && lastTemp.Id == lastRecord.Id)
                                                {
                                                    dictionaryUpdateTemps[item.Key] = lastTemp;
                                                }
                                                continue;
                                            }
                                        }
                                        //其他情况,新增记录
                                        addTemps.Add(record);
                                    }
                                }

                                var allUpdateTemps = dictionaryUpdateTemps.Select(x => x.Value).ToList();
                                if (allUpdateTemps.Count > 0)
                                {
                                    _freeSql.Update<EquipmentDataQueue>().SetSource(allUpdateTemps).UpdateColumns(x => x.Updated).ExecuteAffrows();
                                    _commonService.PrintLog($"更新[{allUpdateTemps.Count}]条设备记录");
                                }

                                var allAddTemps = dictionaryAddTemps.SelectMany(x => x.Value).ToList();
                                if (allAddTemps.Count > 0)
                                {
                                    _freeSql.Insert(allAddTemps).ExecuteAffrows();
                                    _commonService.PrintLog($"新增{allAddTemps.Count}条设备记录");
                                }
                            }
                            //实时存储
                            else
                            {
                                _freeSql.Insert(commitFailureRecords).ExecuteAffrows();
                                _commonService.PrintLog($"新增{commitFailureRecords.Count}条数据记录");
                            }
                            //更新状态
                            haveRecord = _freeSql.Queryable<EquipmentDataQueue>().Any();
                        }
                    }

                    var timeSpan = TimeSpan.FromSeconds(5);//默认5秒一次
                    var equipmentTotal = _freeSql.Queryable<Equipment>().Where(x => !x.Disable).Count();
                    var recordsTotal = _freeSql.Queryable<EquipmentDataQueue>().Count();
                    if (recordsTotal >= equipmentTotal * _commitCount)
                    {
                        //数据较多时,1秒一次
                        timeSpan = TimeSpan.FromSeconds(1);
                    }

                    //推送数据
                    if (_commonService.AutoCommit && (DateTime.Now - _lastBExecution) >= timeSpan && haveRecord)
                    {
                        _lastBExecution = DateTime.Now;
                        var equipmentCodes = _freeSql.Queryable<EquipmentDataQueue>().Distinct().ToList(x => x.EquipmentCode);
                        var commitSuccessRecordIds = new List<Guid>();

                        //是否存在更新时间为空的记录,最后一条记录不算
                        var haveUpdateIsNull = equipmentCodes.Where(equipmentCode =>
                        {
                            //第一条更新时间为空的记录
                            var firstTemp = _freeSql.Queryable<EquipmentDataQueue>().Where(x => x.EquipmentCode == equipmentCode && x.Updated == null).OrderBy(x => x.SourceTimestamp).First(x => x.Id);
                            if (firstTemp == default)
                            {
                                //无记录
                                return false;
                            }
                            //最后一条记录
                            var lastTemp = _freeSql.Queryable<EquipmentDataQueue>().Where(x => x.EquipmentCode == equipmentCode).OrderByDescending(x => x.SourceTimestamp).First(x => x.Id);
                            //第一条与最后一条是同一条,则排除
                            if (firstTemp == lastTemp)
                            {
                                return false;
                            }
                            //存在多条
                            return true;
                        }).Any();

                        //节流模式
                        if (_commonService.DataCompression && !haveUpdateIsNull)
                        {
                            //超过设定时间的数据
                            var records = _freeSql.Queryable<EquipmentDataQueue>().Where(x => DateTime.Now.AddMinutes(-(cacheMinutesTime + 1)) >= x.Created && x.Updated != null).OrderBy(x => x.SourceTimestamp).Take(_commitCount).ToList();
                            if (records.Count == 0)
                            {
                                continue;
                            }
                            var stopwatch = Stopwatch.StartNew();
                            var result = await _cloudService.SendEquipmentDataV2Async(records);
                            var currentEquipmentCodes = records.Select(x => x.EquipmentCode).Distinct().ToList();
                            if (!result.Success)
                            {
                                _commonService.PrintLog($"推送设备[{string.Join(',', currentEquipmentCodes)}]数据失败,{result.Msg},耗时:{stopwatch.ElapsedMilliseconds}ms", LogLevel.Error);
                                continue;
                            }
                            _commonService.PrintLog($"成功推送{records.Count}条设备[{string.Join(',', currentEquipmentCodes)}]数据,耗时:{stopwatch.ElapsedMilliseconds}ms");
                            commitSuccessRecordIds = records.Select(x => x.Id).ToList();
                            _freeSql.Delete<EquipmentDataQueue>().Where(x => commitSuccessRecordIds.Contains(x.Id)).ExecuteAffrows();
                            continue;
                        }

                        //推送成功的数据集合
                        var tasks = new List<Task<List<EquipmentDataQueue>>>();
                        foreach (var equipmentCode in equipmentCodes)
                        {
                            tasks.Add(Task.Run(async () =>
                            {
                                Stopwatch stopwatch = Stopwatch.StartNew();
                                var successRecords = new List<EquipmentDataQueue>();
                                var records = _freeSql.Queryable<EquipmentDataQueue>().Where(x => x.EquipmentCode == equipmentCode && !x.IsCommit).OrderBy(x => x.SourceTimestamp).Take(_commitCount).ToList();
                                if (records.Count == 0)
                                {
                                    return successRecords;
                                }
                                var temp1s = new List<List<EquipmentDataQueue>>();
                                var temp2s = new List<EquipmentDataQueue>();
                                foreach (var record in records)
                                {
                                    //是最后一条记录,且启用了节流模式
                                    if (_commonService.DataCompression && record == records.Last())
                                    {
                                        //创建时间距离现在的时间未超过设定时间,则不推送
                                        if ((DateTime.Now - record.Created!.Value).TotalMinutes < cacheMinutesTime)
                                        {
                                            break;
                                        }

                                        //队列还有缓存的数据,且持续时间未达到设定值,则不推送
                                        if (!_commonService.EquipmentDataQueues.IsEmpty && record.Updated != null
                                        && (record.Updated - record.Created).Value.TotalMinutes < cacheMinutesTime)
                                        {
                                            break;
                                        }
                                    }

                                    if (temp2s.Count == 0)
                                    {
                                        temp2s.Add(record);
                                        continue;
                                    }

                                    var lastItem = temp2s.Last();
                                    var lastType = lastItem.Updated > lastItem.Created;
                                    var currentType = record.Updated > record.Created;
                                    if (lastType == currentType)
                                    {
                                        temp2s.Add(record);
                                    }
                                    else
                                    {
                                        temp1s.Add(temp2s);
                                        temp2s = new List<EquipmentDataQueue>
                                            {
                                                    record
                                            };
                                    }
                                }
                                if (temp2s.Count > 0)
                                {
                                    temp1s.Add(temp2s);
                                    temp2s = new List<EquipmentDataQueue>();
                                }

                                foreach (var item in temp1s)
                                {
                                    //某段时间的数据
                                    if (item.All(x => x.Updated > x.Created))
                                    {
                                        var result = await _cloudService.SendEquipmentDataV2Async(item);
                                        if (!result.Success)
                                        {
                                            _commonService.PrintLog($"推送设备[{equipmentCode}]数据失败,{result.Msg},耗时:{stopwatch.ElapsedMilliseconds}ms", LogLevel.Error);
                                            break;//推送失败,后续的数据也暂停推送
                                        }
                                        _commonService.PrintLog($"成功推送{records.Count}条设备[{equipmentCode}]数据,耗时:{stopwatch.ElapsedMilliseconds}ms");
                                        successRecords.AddRange(item);
                                        continue;
                                    }

                                    //一秒一条记录的数据
                                    var result2 = await _cloudService.SendEquipmentDataAsync(item);
                                    if (!result2.Success)
                                    {
                                        _commonService.PrintLog($"推送设备[{equipmentCode}]数据失败,{result2.Msg},耗时:{stopwatch.ElapsedMilliseconds}ms", LogLevel.Error);
                                        break;//推送失败,后续的数据也暂停推送
                                    }
                                    _commonService.PrintLog($"成功推送1条设备[{equipmentCode}]数据,耗时:{stopwatch.ElapsedMilliseconds}ms");
                                    successRecords.AddRange(item);
                                }
                                return successRecords;
                            }));
                        }
                        Task.WaitAll(tasks.ToArray());
                        commitSuccessRecordIds = tasks.SelectMany(x => x.Result).Select(x => x.Id).ToList();
                        _freeSql.Delete<EquipmentDataQueue>().Where(x => commitSuccessRecordIds.Contains(x.Id)).ExecuteAffrows();
                    }
                }
                catch (Exception ex)
                {
                    _commonService.PrintLog($"数据上传线程异常:{ex.Message}", LogLevel.Debug);
                }
            }
        }
    }
}