TaskLeaseService.cs 6.02 KB
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Rcs.Application.Services;
using Rcs.Domain.Settings;
using StackExchange.Redis;

namespace Rcs.Infrastructure.Services;

/// <summary>
/// 基于 Redis 的任务级租约服务。
/// </summary>
public sealed class TaskLeaseService : ITaskLeaseService
{
    private const string ReleaseScript = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
    private const string RenewScript = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('pexpire', KEYS[1], ARGV[2]) else return 0 end";

    private readonly IConnectionMultiplexer _redis;
    private readonly IOptionsMonitor<AppSettings> _settingsMonitor;
    private readonly ILogger<TaskLeaseService> _logger;

    public TaskLeaseService(
        IConnectionMultiplexer redis,
        IOptionsMonitor<AppSettings> settingsMonitor,
        ILogger<TaskLeaseService> logger)
    {
        _redis = redis;
        _settingsMonitor = settingsMonitor;
        _logger = logger;
    }

    public async Task<ITaskLeaseAcquireResult> TryAcquireAsync(
        Guid taskId,
        Guid? robotId,
        string purpose,
        CancellationToken ct = default)
    {
        ct.ThrowIfCancellationRequested();
        var settings = _settingsMonitor.CurrentValue.TaskDispatch;
        var ttl = TimeSpan.FromSeconds(Math.Max(2, settings.LeaseTtlSeconds));
        var renewInterval = TimeSpan.FromSeconds(Math.Max(1, settings.LeaseRenewIntervalSeconds));
        var key = BuildLeaseKey(taskId);
        var token = Guid.NewGuid().ToString("N");
        var success = await _redis.GetDatabase().StringSetAsync(
            key,
            token,
            ttl,
            When.NotExists);
        if (!success)
        {
            return new TaskLeaseAcquireResult
            {
                Success = false,
                Message = "task_lease_conflict"
            };
        }

        return new TaskLeaseAcquireResult
        {
            Success = true,
            Message = "ok",
            Lease = new RedisTaskLease(
                _redis,
                key,
                token,
                ttl,
                renewInterval,
                taskId,
                purpose,
                _logger)
        };
    }

    public async Task<bool> HasActiveLeaseAsync(Guid taskId, CancellationToken ct = default)
    {
        ct.ThrowIfCancellationRequested();
        var key = BuildLeaseKey(taskId);
        return await _redis.GetDatabase().KeyExistsAsync(key);
    }

    private string BuildLeaseKey(Guid taskId)
    {
        return $"{_settingsMonitor.CurrentValue.Redis.KeyPrefixes.TaskLeasePrefix}:{taskId}";
    }

    private sealed class RedisTaskLease : ITaskLease
    {
        private readonly IConnectionMultiplexer _redis;
        private readonly string _key;
        private readonly string _token;
        private readonly TimeSpan _ttl;
        private readonly TimeSpan _renewInterval;
        private readonly ILogger _logger;
        private readonly CancellationTokenSource _cts = new();
        private readonly Task _renewTask;
        private int _disposed;

        public RedisTaskLease(
            IConnectionMultiplexer redis,
            string key,
            string token,
            TimeSpan ttl,
            TimeSpan renewInterval,
            Guid taskId,
            string purpose,
            ILogger logger)
        {
            _redis = redis;
            _key = key;
            _token = token;
            _ttl = ttl;
            _renewInterval = renewInterval;
            _logger = logger;
            TaskId = taskId;
            Purpose = purpose;
            LeaseKey = key;
            _renewTask = Task.Run(RenewLoopAsync);
        }

        public Guid TaskId { get; }
        public string LeaseKey { get; }
        public string Purpose { get; }

        public async ValueTask DisposeAsync()
        {
            if (Interlocked.Exchange(ref _disposed, 1) != 0)
            {
                return;
            }

            _cts.Cancel();
            try
            {
                await _renewTask.ConfigureAwait(false);
            }
            catch (OperationCanceledException)
            {
                // ignore
            }
            catch (Exception ex)
            {
                _logger.LogDebug(ex, "[任务租约] 续租循环退出异常: TaskId={TaskId}", TaskId);
            }

            try
            {
                await ReleaseAsync().ConfigureAwait(false);
            }
            catch (Exception ex)
            {
                _logger.LogWarning(ex, "[任务租约] 释放失败: TaskId={TaskId}, Key={LeaseKey}", TaskId, LeaseKey);
            }
        }

        private async Task RenewLoopAsync()
        {
            while (!_cts.IsCancellationRequested)
            {
                await Task.Delay(_renewInterval, _cts.Token).ConfigureAwait(false);
                if (_cts.IsCancellationRequested) break;

                var renewed = await RenewAsync().ConfigureAwait(false);
                if (!renewed)
                {
                    _logger.LogWarning(
                        "[任务租约] 续租失败,提前终止租约: TaskId={TaskId}, Key={LeaseKey}",
                        TaskId,
                        LeaseKey);
                    _cts.Cancel();
                    return;
                }
            }
        }

        private async Task<bool> RenewAsync()
        {
            var result = await _redis.GetDatabase().ScriptEvaluateAsync(
                RenewScript,
                new RedisKey[] { _key },
                new RedisValue[] { _token, (long)_ttl.TotalMilliseconds });
            if (result.IsNull || (long)result <= 0)
            {
                return false;
            }
            return true;
        }

        private async Task ReleaseAsync()
        {
            await _redis.GetDatabase().ScriptEvaluateAsync(
                ReleaseScript,
                new RedisKey[] { _key },
                new RedisValue[] { _token });
        }
    }
}