package com.example.redis.lock;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.params.SetParams;

import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.*;
import java.util.concurrent.locks.LockSupport;

/**
 * Redis分布式锁的Java实现
 * 演示基础实现、可重入锁和看门狗机制
 */
public class RedisDistributedLock {

    private final Jedis jedis;
    private final String lockKey;
    private final String lockValue;
    private final int defaultTtlSeconds;
    private final ThreadLocal<Integer> reentrantCount = new ThreadLocal<>();
    
    // 看门狗相关
    private volatile boolean watchdogRunning = false;
    private ScheduledExecutorService watchdogExecutor;
    private ScheduledFuture<?> watchdogTask;

    // Lua脚本
    private static final String RELEASE_SCRIPT = 
        "if redis.call('GET', KEYS[1]) == ARGV[1] then " +
        "    return redis.call('DEL', KEYS[1]) " +
        "else " +
        "    return 0 " +
        "end";

    private static final String RENEW_SCRIPT = 
        "if redis.call('GET', KEYS[1]) == ARGV[1] then " +
        "    return redis.call('EXPIRE', KEYS[1], ARGV[2]) " +
        "else " +
        "    return 0 " +
        "end";

    private static final String REENTRANT_ACQUIRE_SCRIPT = 
        "local key = KEYS[1] " +
        "local value = ARGV[1] " +
        "local ttl = tonumber(ARGV[2]) " +
        "local threadId = ARGV[3] " +
        "" +
        "local current = redis.call('HMGET', key, 'value', 'threadId', 'count') " +
        "" +
        "if current[1] == false then " +
        "    redis.call('HMSET', key, 'value', value, 'threadId', threadId, 'count', 1) " +
        "    redis.call('EXPIRE', key, ttl) " +
        "    return 1 " +
        "elseif current[1] == value and current[2] == threadId then " +
        "    local newCount = tonumber(current[3]) + 1 " +
        "    redis.call('HMSET', key, 'count', newCount) " +
        "    redis.call('EXPIRE', key, ttl) " +
        "    return newCount " +
        "else " +
        "    return 0 " +
        "end";

    private static final String REENTRANT_RELEASE_SCRIPT = 
        "local key = KEYS[1] " +
        "local value = ARGV[1] " +
        "local threadId = ARGV[2] " +
        "" +
        "local current = redis.call('HMGET', key, 'value', 'threadId', 'count') " +
        "" +
        "if current[1] == false then " +
        "    return 0 " +
        "elseif current[1] == value and current[2] == threadId then " +
        "    local count = tonumber(current[3]) " +
        "    if count > 1 then " +
        "        redis.call('HMSET', key, 'count', count - 1) " +
        "        return count - 1 " +
        "    else " +
        "        redis.call('DEL', key) " +
        "        return 0 " +
        "    end " +
        "else " +
        "    return -1 " +
        "end";

    public RedisDistributedLock(Jedis jedis, String lockKey, int defaultTtlSeconds) {
        this.jedis = jedis;
        this.lockKey = lockKey;
        this.lockValue = UUID.randomUUID().toString();
        this.defaultTtlSeconds = defaultTtlSeconds;
        this.reentrantCount.set(0);
    }

    /**
     * 简单锁实现 - 基于SET NX
     */
    public static class SimpleLock {
        private final Jedis jedis;
        private final String lockKey;
        private final String lockValue;
        private final int ttlSeconds;

        public SimpleLock(Jedis jedis, String lockKey, int ttlSeconds) {
            this.jedis = jedis;
            this.lockKey = lockKey;
            this.lockValue = UUID.randomUUID().toString();
            this.ttlSeconds = ttlSeconds;
        }

        public boolean tryLock() {
            SetParams params = new SetParams()
                .nx()  // 仅在键不存在时设置
                .ex(ttlSeconds);  // 设置过期时间（秒）
            
            String result = jedis.set(lockKey, lockValue, params);
            return "OK".equals(result);
        }

        public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
            long timeoutMillis = unit.toMillis(timeout);
            long endTime = System.currentTimeMillis() + timeoutMillis;
            
            while (System.currentTimeMillis() < endTime) {
                if (tryLock()) {
                    return true;
                }
                
                // 短暂休眠后重试
                Thread.sleep(50);
            }
            
            return false;
        }

        public void unlock() {
            Object result = jedis.eval(
                RELEASE_SCRIPT, 
                Collections.singletonList(lockKey), 
                Collections.singletonList(lockValue)
            );
            
            if (!(result instanceof Long) || (Long) result != 1L) {
                System.err.println("Warning: Failed to release lock or lock was not owned by current thread");
            }
        }

        public boolean renewLock(int newTtlSeconds) {
            Object result = jedis.eval(
                RENEW_SCRIPT,
                Collections.singletonList(lockKey),
                java.util.Arrays.asList(lockValue, String.valueOf(newTtlSeconds))
            );
            
            return result instanceof Long && (Long) result == 1L;
        }
    }

    /**
     * 可重入锁实现
     */
    public boolean tryLock() {
        return tryLock(0, TimeUnit.MILLISECONDS);
    }

    public boolean tryLock(long timeout, TimeUnit unit) {
        long timeoutMillis = unit.toMillis(timeout);
        long endTime = System.currentTimeMillis() + timeoutMillis;
        String threadId = String.valueOf(Thread.currentThread().getId());
        
        do {
            Object result = jedis.eval(
                REENTRANT_ACQUIRE_SCRIPT,
                Collections.singletonList(lockKey),
                java.util.Arrays.asList(lockValue, String.valueOf(defaultTtlSeconds), threadId)
            );
            
            if (result instanceof Long) {
                long count = (Long) result;
                if (count > 0) {
                    reentrantCount.set((int) count);
                    
                    // 如果是第一次获取锁，启动看门狗
                    if (count == 1) {
                        startWatchdog();
                    }
                    
                    return true;
                }
            }
            
            if (timeout > 0) {
                try {
                    Thread.sleep(50);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    return false;
                }
            }
        } while (System.currentTimeMillis() < endTime);
        
        return false;
    }

    public void unlock() {
        String threadId = String.valueOf(Thread.currentThread().getId());
        
        Object result = jedis.eval(
            REENTRANT_RELEASE_SCRIPT,
            Collections.singletonList(lockKey),
            java.util.Arrays.asList(lockValue, threadId)
        );
        
        if (result instanceof Long) {
            long remainingCount = (Long) result;
            
            if (remainingCount == 0) {
                // 完全释放锁
                reentrantCount.set(0);
                stopWatchdog();
            } else if (remainingCount > 0) {
                // 减少重入次数
                reentrantCount.set((int) remainingCount);
            } else {
                throw new IllegalStateException("Cannot release lock that is not owned by current thread");
            }
        }
    }

    /**
     * 看门狗机制 - 自动续期
     */
    private void startWatchdog() {
        if (!watchdogRunning) {
            watchdogRunning = true;
            watchdogExecutor = Executors.newSingleThreadScheduledExecutor(r -> {
                Thread thread = new Thread(r, "Redis-Lock-Watchdog");
                thread.setDaemon(true);
                return thread;
            });
            
            // 每10秒续期一次
            watchdogTask = watchdogExecutor.scheduleAtFixedRate(
                this::renewLockInternal,
                10, 10, TimeUnit.SECONDS
            );
            
            System.out.println("Watchdog started for lock: " + lockKey);
        }
    }

    private void stopWatchdog() {
        if (watchdogRunning) {
            watchdogRunning = false;
            
            if (watchdogTask != null) {
                watchdogTask.cancel(false);
            }
            
            if (watchdogExecutor != null) {
                watchdogExecutor.shutdown();
                try {
                    if (!watchdogExecutor.awaitTermination(1, TimeUnit.SECONDS)) {
                        watchdogExecutor.shutdownNow();
                    }
                } catch (InterruptedException e) {
                    watchdogExecutor.shutdownNow();
                    Thread.currentThread().interrupt();
                }
            }
            
            System.out.println("Watchdog stopped for lock: " + lockKey);
        }
    }

    private void renewLockInternal() {
        if (!watchdogRunning) {
            return;
        }
        
        try {
            Object result = jedis.eval(
                RENEW_SCRIPT,
                Collections.singletonList(lockKey),
                java.util.Arrays.asList(lockValue, String.valueOf(defaultTtlSeconds))
            );
            
            if (result instanceof Long && (Long) result == 1L) {
                System.out.println("Lock renewed successfully: " + lockKey);
            } else {
                System.err.println("Failed to renew lock, stopping watchdog: " + lockKey);
                stopWatchdog();
            }
        } catch (Exception e) {
            System.err.println("Error renewing lock: " + e.getMessage());
            stopWatchdog();
        }
    }

    /**
     * 获取锁状态信息
     */
    public LockInfo getLockInfo() {
        try {
            String value = jedis.get(lockKey);
            Long ttl = jedis.ttl(lockKey);
            
            return new LockInfo(
                value != null,
                lockValue.equals(value),
                ttl != null ? ttl.intValue() : -1
            );
        } catch (Exception e) {
            return new LockInfo(false, false, -1);
        }
    }

    public static class LockInfo {
        public final boolean exists;
        public final boolean ownedByCurrentThread;
        public final int ttl;

        public LockInfo(boolean exists, boolean ownedByCurrentThread, int ttl) {
            this.exists = exists;
            this.ownedByCurrentThread = ownedByCurrentThread;
            this.ttl = ttl;
        }

        @Override
        public String toString() {
            return String.format("LockInfo{exists=%s, owned=%s, ttl=%d}", 
                exists, ownedByCurrentThread, ttl);
        }
    }

    /**
     * 演示和测试方法
     */
    public static void main(String[] args) throws InterruptedException {
        Jedis jedis = new Jedis("localhost", 6379);
        
        System.out.println("=== Redis分布式锁Java实现演示 ===");
        
        // 测试简单锁
        testSimpleLock(jedis);
        
        // 测试可重入锁
        testReentrantLock(jedis);
        
        // 测试看门狗机制
        testWatchdogMechanism(jedis);
        
        jedis.close();
    }

    private static void testSimpleLock(Jedis jedis) throws InterruptedException {
        System.out.println("\n=== 简单锁测试 ===");
        
        SimpleLock lock = new SimpleLock(jedis, "simple_test_lock", 30);
        
        if (lock.tryLock()) {
            System.out.println("✓ 获取简单锁成功");
            
            // 模拟业务处理
            Thread.sleep(1000);
            
            // 续期测试
            if (lock.renewLock(60)) {
                System.out.println("✓ 锁续期成功");
            }
            
            lock.unlock();
            System.out.println("✓ 释放简单锁成功");
        } else {
            System.out.println("✗ 获取简单锁失败");
        }
    }

    private static void testReentrantLock(Jedis jedis) throws InterruptedException {
        System.out.println("\n=== 可重入锁测试 ===");
        
        RedisDistributedLock lock = new RedisDistributedLock(jedis, "reentrant_test_lock", 30);
        
        // 递归调用测试可重入性
        testReentrantRecursive(lock, 1, 3);
    }

    private static void testReentrantRecursive(RedisDistributedLock lock, int depth, int maxDepth) {
        System.out.println("  递归深度 " + depth + ": 尝试获取锁");
        
        if (lock.tryLock()) {
            LockInfo info = lock.getLockInfo();
            System.out.println("  ✓ 深度 " + depth + ": 获取锁成功, " + info);
            
            if (depth < maxDepth) {
                testReentrantRecursive(lock, depth + 1, maxDepth);
            } else {
                System.out.println("  到达最大深度，开始返回");
            }
            
            lock.unlock();
            System.out.println("  ✓ 深度 " + depth + ": 释放锁成功");
        } else {
            System.out.println("  ✗ 深度 " + depth + ": 获取锁失败");
        }
    }

    private static void testWatchdogMechanism(Jedis jedis) throws InterruptedException {
        System.out.println("\n=== 看门狗机制测试 ===");
        
        RedisDistributedLock lock = new RedisDistributedLock(jedis, "watchdog_test_lock", 15);
        
        if (lock.tryLock()) {
            System.out.println("✓ 获取看门狗锁成功");
            
            // 模拟长时间业务处理
            for (int i = 0; i < 3; i++) {
                Thread.sleep(8000);
                LockInfo info = lock.getLockInfo();
                System.out.println("业务处理中... " + info);
            }
            
            System.out.println("业务处理完成");
            lock.unlock();
            System.out.println("✓ 看门狗锁释放成功");
        } else {
            System.out.println("✗ 获取看门狗锁失败");
        }
    }
}
