import asyncio
from datetime import datetime, timedelta, timezone
from typing import Optional

from litellm._logging import verbose_proxy_logger
from litellm.caching import RedisCache
from litellm.constants import (
    SPEND_LOG_CLEANUP_BATCH_SIZE,
    SPEND_LOG_CLEANUP_JOB_NAME,
    SPEND_LOG_RUN_LOOPS,
)
from litellm.litellm_core_utils.duration_parser import duration_in_seconds
from litellm.proxy.utils import PrismaClient


class SpendLogCleanup:
    """
    Handles cleaning up old spend logs based on maximum retention period.
    Deletes logs in batches to prevent timeouts.
    Uses PodLockManager to ensure only one pod runs cleanup in multi-pod deployments.
    """

    def __init__(self, general_settings=None, redis_cache: Optional[RedisCache] = None):
        self.batch_size = SPEND_LOG_CLEANUP_BATCH_SIZE
        self.retention_seconds: Optional[int] = None
        from litellm.proxy.proxy_server import general_settings as default_settings

        self.general_settings = general_settings or default_settings
        from litellm.proxy.proxy_server import proxy_logging_obj

        pod_lock_manager = proxy_logging_obj.db_spend_update_writer.pod_lock_manager
        self.pod_lock_manager = pod_lock_manager
        verbose_proxy_logger.info(
            f"SpendLogCleanup initialized with batch size: {self.batch_size}"
        )

    def _should_delete_spend_logs(self) -> bool:
        """
        Determines if logs should be deleted based on the max retention period in settings.
        """
        retention_setting = self.general_settings.get(
            "maximum_spend_logs_retention_period"
        )
        verbose_proxy_logger.info(f"Checking retention setting: {retention_setting}")

        if retention_setting is None:
            verbose_proxy_logger.info("No retention setting found")
            return False

        try:
            if isinstance(retention_setting, int):
                retention_setting = str(retention_setting)
            self.retention_seconds = duration_in_seconds(retention_setting)
            verbose_proxy_logger.info(
                f"Retention period set to {self.retention_seconds} seconds"
            )
            return True
        except ValueError as e:
            verbose_proxy_logger.error(
                f"Invalid maximum_spend_logs_retention_period value: {retention_setting}, error: {str(e)}"
            )
            return False

    async def _delete_old_logs(
        self, prisma_client: PrismaClient, cutoff_date: datetime
    ) -> int:
        """
        Helper method to delete old logs in batches.
        Returns the total number of logs deleted.
        """
        total_deleted = 0
        run_count = 0
        while True:
            if run_count > SPEND_LOG_RUN_LOOPS:
                verbose_proxy_logger.info(
                    "Max logs deleted - 1,00,000, rest of the logs will be deleted in next run"
                )
                break
            # Step 1: Find logs to delete
            logs_to_delete = await prisma_client.db.litellm_spendlogs.find_many(
                where={"startTime": {"lt": cutoff_date}},
                take=self.batch_size,
            )
            verbose_proxy_logger.info(f"Found {len(logs_to_delete)} logs in this batch")

            if not logs_to_delete:
                verbose_proxy_logger.info(
                    f"No more logs to delete. Total deleted: {total_deleted}"
                )
                break

            request_ids = [log.request_id for log in logs_to_delete]

            # Step 2: Delete them in one go
            await prisma_client.db.litellm_spendlogs.delete_many(
                where={"request_id": {"in": request_ids}}
            )

            total_deleted += len(logs_to_delete)
            run_count += 1

            # Add a small sleep to prevent overwhelming the database
            await asyncio.sleep(0.1)

        return total_deleted

    async def cleanup_old_spend_logs(self, prisma_client: PrismaClient) -> None:
        """
        Main cleanup function. Deletes old spend logs in batches.
        If pod_lock_manager is available, ensures only one pod runs cleanup.
        If no pod_lock_manager, runs cleanup without distributed locking.
        """
        try:
            verbose_proxy_logger.info(f"Cleanup job triggered at {datetime.now()}")

            if not self._should_delete_spend_logs():
                verbose_proxy_logger.info(
                    "Skipping cleanup — invalid or missing retention setting."
                )
                return

            if self.retention_seconds is None:
                verbose_proxy_logger.error(
                    "Retention seconds is None, cannot proceed with cleanup"
                )
                return

            # If we have a pod lock manager, try to acquire the lock
            if self.pod_lock_manager and self.pod_lock_manager.redis_cache:
                lock_acquired = await self.pod_lock_manager.acquire_lock(
                    cronjob_id=SPEND_LOG_CLEANUP_JOB_NAME,
                )
                verbose_proxy_logger.info(
                    f"Lock acquisition attempt: {'successful' if lock_acquired else 'failed'}  at {datetime.now()}"
                )

                if not lock_acquired:
                    verbose_proxy_logger.info("Another pod is already running cleanup")
                    return

            cutoff_date = datetime.now(timezone.utc) - timedelta(
                seconds=float(self.retention_seconds)
            )
            verbose_proxy_logger.info(
                f"Deleting logs older than {cutoff_date.isoformat()}"
            )

            # Perform the actual deletion
            total_deleted = await self._delete_old_logs(prisma_client, cutoff_date)
            verbose_proxy_logger.info(f"Deleted {total_deleted} logs")

        except Exception as e:
            verbose_proxy_logger.error(f"Error during cleanup: {str(e)}")
            return  # Return after error handling
        finally:
            # Always release the lock if we have a pod lock manager
            if self.pod_lock_manager and self.pod_lock_manager.redis_cache:
                await self.pod_lock_manager.release_lock(
                    cronjob_id=SPEND_LOG_CLEANUP_JOB_NAME
                )
                verbose_proxy_logger.info("Released cleanup lock")
