"""
This file contains the transformation logic for the Gemini realtime API.
"""

import json
import os
import uuid
from typing import Any, Dict, List, Optional, Union, cast

from litellm import verbose_logger
from litellm.litellm_core_utils.litellm_logging import Logging as LiteLLMLoggingObj
from litellm.llms.base_llm.realtime.transformation import BaseRealtimeConfig
from litellm.llms.vertex_ai.gemini.vertex_and_google_ai_studio_gemini import (
    VertexGeminiConfig,
)
from litellm.responses.litellm_completion_transformation.transformation import (
    LiteLLMCompletionResponsesConfig,
)
from litellm.types.llms.gemini import (
    AutomaticActivityDetection,
    BidiGenerateContentRealtimeInput,
    BidiGenerateContentRealtimeInputConfig,
    BidiGenerateContentServerContent,
    BidiGenerateContentServerMessage,
    BidiGenerateContentSetup,
)
from litellm.types.llms.openai import (
    OpenAIRealtimeContentPartDone,
    OpenAIRealtimeConversationItemCreated,
    OpenAIRealtimeDoneEvent,
    OpenAIRealtimeEvents,
    OpenAIRealtimeEventTypes,
    OpenAIRealtimeOutputItemDone,
    OpenAIRealtimeResponseAudioDone,
    OpenAIRealtimeResponseContentPartAdded,
    OpenAIRealtimeResponseDelta,
    OpenAIRealtimeResponseDoneObject,
    OpenAIRealtimeResponseTextDone,
    OpenAIRealtimeStreamResponseBaseObject,
    OpenAIRealtimeStreamResponseOutputItemAdded,
    OpenAIRealtimeStreamSession,
    OpenAIRealtimeStreamSessionEvents,
    OpenAIRealtimeTurnDetection,
)
from litellm.types.llms.vertex_ai import (
    GeminiResponseModalities,
    HttpxBlobType,
    HttpxContentType,
)
from litellm.types.realtime import (
    ALL_DELTA_TYPES,
    RealtimeModalityResponseTransformOutput,
    RealtimeResponseTransformInput,
    RealtimeResponseTypedDict,
)
from litellm.utils import get_empty_usage

from ..common_utils import encode_unserializable_types

MAP_GEMINI_FIELD_TO_OPENAI_EVENT: Dict[str, OpenAIRealtimeEventTypes] = {
    "setupComplete": OpenAIRealtimeEventTypes.SESSION_CREATED,
    "serverContent.generationComplete": OpenAIRealtimeEventTypes.RESPONSE_TEXT_DONE,
    "serverContent.turnComplete": OpenAIRealtimeEventTypes.RESPONSE_DONE,
    "serverContent.interrupted": OpenAIRealtimeEventTypes.RESPONSE_DONE,
}


class GeminiRealtimeConfig(BaseRealtimeConfig):
    def validate_environment(
        self, headers: dict, model: str, api_key: Optional[str] = None
    ) -> dict:
        return headers

    def get_complete_url(
        self, api_base: Optional[str], model: str, api_key: Optional[str] = None
    ) -> str:
        """
        Example output:
        "BACKEND_WS_URL = "wss://generativelanguage.googleapis.com/ws/google.ai.generativelanguage.v1beta.GenerativeService.BidiGenerateContent"";
        """
        if api_base is None:
            api_base = "wss://generativelanguage.googleapis.com"
        if api_key is None:
            api_key = os.environ.get("GEMINI_API_KEY")
        if api_key is None:
            raise ValueError("api_key is required for Gemini API calls")
        api_base = api_base.replace("https://", "wss://")
        api_base = api_base.replace("http://", "ws://")
        return f"{api_base}/ws/google.ai.generativelanguage.v1beta.GenerativeService.BidiGenerateContent?key={api_key}"

    def map_model_turn_event(
        self, model_turn: HttpxContentType
    ) -> OpenAIRealtimeEventTypes:
        """
        Map the model turn event to the OpenAI realtime events.

        Returns either:
        - response.text.delta - model_turn: {"parts": [{"text": "..."}]}
        - response.audio.delta - model_turn: {"parts": [{"inlineData": {"mimeType": "audio/pcm", "data": "..."}}]}

        Assumes parts is a single element list.
        """
        if "parts" in model_turn:
            parts = model_turn["parts"]
            if len(parts) != 1:
                verbose_logger.warning(
                    f"Realtime: Expected 1 part, got {len(parts)} for Gemini model turn event."
                )
            part = parts[0]
            if "text" in part:
                return OpenAIRealtimeEventTypes.RESPONSE_TEXT_DELTA
            elif "inlineData" in part:
                return OpenAIRealtimeEventTypes.RESPONSE_AUDIO_DELTA
            else:
                raise ValueError(f"Unexpected part type: {part}")
        raise ValueError(f"Unexpected model turn event, no 'parts' key: {model_turn}")

    def map_generation_complete_event(
        self, delta_type: Optional[ALL_DELTA_TYPES]
    ) -> OpenAIRealtimeEventTypes:
        if delta_type == "text":
            return OpenAIRealtimeEventTypes.RESPONSE_TEXT_DONE
        elif delta_type == "audio":
            return OpenAIRealtimeEventTypes.RESPONSE_AUDIO_DONE
        else:
            raise ValueError(f"Unexpected delta type: {delta_type}")

    def get_audio_mime_type(self, input_audio_format: str = "pcm16"):
        mime_types = {
            "pcm16": "audio/pcm",
            "g711_ulaw": "audio/pcmu",
            "g711_alaw": "audio/pcma",
        }

        return mime_types.get(input_audio_format, "application/octet-stream")

    def map_automatic_turn_detection(
        self, value: OpenAIRealtimeTurnDetection
    ) -> AutomaticActivityDetection:
        automatic_activity_dection = AutomaticActivityDetection()
        if "create_response" in value and isinstance(value["create_response"], bool):
            automatic_activity_dection["disabled"] = not value["create_response"]
        else:
            automatic_activity_dection["disabled"] = True
        if "prefix_padding_ms" in value and isinstance(value["prefix_padding_ms"], int):
            automatic_activity_dection["prefixPaddingMs"] = value["prefix_padding_ms"]
        if "silence_duration_ms" in value and isinstance(
            value["silence_duration_ms"], int
        ):
            automatic_activity_dection["silenceDurationMs"] = value[
                "silence_duration_ms"
            ]
        return automatic_activity_dection

    def get_supported_openai_params(self, model: str) -> List[str]:
        return [
            "instructions",
            "temperature",
            "max_response_output_tokens",
            "modalities",
            "tools",
            "input_audio_transcription",
            "turn_detection",
        ]

    def map_openai_params(
        self, optional_params: dict, non_default_params: dict
    ) -> dict:
        if "generationConfig" not in optional_params:
            optional_params["generationConfig"] = {}
        for key, value in non_default_params.items():
            if key == "instructions":
                optional_params["systemInstruction"] = HttpxContentType(
                    role="user", parts=[{"text": value}]
                )
            elif key == "temperature":
                optional_params["generationConfig"]["temperature"] = value
            elif key == "max_response_output_tokens":
                optional_params["generationConfig"]["maxOutputTokens"] = value
            elif key == "modalities":
                optional_params["generationConfig"]["responseModalities"] = [
                    modality.upper() for modality in cast(List[str], value)
                ]
            elif key == "tools":
                from litellm.llms.vertex_ai.gemini.vertex_and_google_ai_studio_gemini import (
                    VertexGeminiConfig,
                )

                vertex_gemini_config = VertexGeminiConfig()
                vertex_gemini_config._map_function(value)
                optional_params["generationConfig"][
                    "tools"
                ] = vertex_gemini_config._map_function(value)
            elif key == "input_audio_transcription" and value is not None:
                optional_params["inputAudioTranscription"] = {}
            elif key == "turn_detection":
                value_typed = cast(OpenAIRealtimeTurnDetection, value)
                transformed_audio_activity_config = self.map_automatic_turn_detection(
                    value_typed
                )
                if (
                    len(transformed_audio_activity_config) > 0
                ):  # if the config is not empty, add it to the optional params
                    optional_params[
                        "realtimeInputConfig"
                    ] = BidiGenerateContentRealtimeInputConfig(
                        automaticActivityDetection=transformed_audio_activity_config
                    )
        if len(optional_params["generationConfig"]) == 0:
            optional_params.pop("generationConfig")
        return optional_params

    def transform_realtime_request(
        self,
        message: str,
        model: str,
        session_configuration_request: Optional[str] = None,
    ) -> List[str]:
        realtime_input_dict: BidiGenerateContentRealtimeInput = {}
        try:
            json_message = json.loads(message)
        except json.JSONDecodeError:
            if isinstance(message, bytes):
                message_str = message.decode("utf-8", errors="replace")
            else:
                message_str = str(message)
            raise ValueError(f"Invalid JSON message: {message_str}")

        ## HANDLE SESSION UPDATE ##
        messages: List[str] = []
        if "type" in json_message and json_message["type"] == "session.update":
            client_session_configuration_request = self.map_openai_params(
                optional_params={}, non_default_params=json_message["session"]
            )
            client_session_configuration_request["model"] = f"models/{model}"

            messages.append(
                json.dumps(
                    {
                        "setup": client_session_configuration_request,
                    }
                )
            )
        # elif session_configuration_request is None:
        #     default_session_configuration_request = self.session_configuration_request(model)
        #     messages.append(default_session_configuration_request)

        ## HANDLE INPUT AUDIO BUFFER ##
        if (
            "type" in json_message
            and json_message["type"] == "input_audio_buffer.append"
        ):
            realtime_input_dict["audio"] = HttpxBlobType(
                mimeType=self.get_audio_mime_type(), data=json_message["audio"]
            )
        else:
            realtime_input_dict["text"] = message

        if len(realtime_input_dict) != 1:
            raise ValueError(
                f"Only one argument can be set, got {len(realtime_input_dict)}:"
                f" {list(realtime_input_dict.keys())}"
            )

        realtime_input_dict = cast(
            BidiGenerateContentRealtimeInput,
            encode_unserializable_types(cast(Dict[str, object], realtime_input_dict)),
        )

        messages.append(json.dumps({"realtime_input": realtime_input_dict}))
        return messages

    def transform_session_created_event(
        self,
        model: str,
        logging_session_id: str,
        session_configuration_request: Optional[str] = None,
    ) -> OpenAIRealtimeStreamSessionEvents:
        if session_configuration_request:
            session_configuration_request_dict: BidiGenerateContentSetup = json.loads(
                session_configuration_request
            ).get("setup", {})
        else:
            session_configuration_request_dict = {}

        _model = session_configuration_request_dict.get("model") or model
        generation_config = (
            session_configuration_request_dict.get("generationConfig", {}) or {}
        )
        gemini_modalities = generation_config.get("responseModalities", ["TEXT"])
        _modalities = [
            modality.lower() for modality in cast(List[str], gemini_modalities)
        ]
        _system_instruction = session_configuration_request_dict.get(
            "systemInstruction"
        )
        session = OpenAIRealtimeStreamSession(
            id=logging_session_id,
            modalities=_modalities,
        )
        if _system_instruction is not None and isinstance(_system_instruction, str):
            session["instructions"] = _system_instruction
        if _model is not None and isinstance(_model, str):
            session["model"] = _model.strip(
                "models/"
            )  # keep it consistent with how openai returns the model name

        return OpenAIRealtimeStreamSessionEvents(
            type="session.created",
            session=session,
            event_id=str(uuid.uuid4()),
        )

    def _is_new_content_delta(
        self,
        previous_messages: Optional[List[OpenAIRealtimeEvents]] = None,
    ) -> bool:
        if previous_messages is None or len(previous_messages) == 0:
            return True
        if "type" in previous_messages[-1] and previous_messages[-1]["type"].endswith(
            "delta"
        ):
            return False
        return True

    def return_new_content_delta_events(
        self,
        response_id: str,
        output_item_id: str,
        conversation_id: str,
        delta_type: ALL_DELTA_TYPES,
        session_configuration_request: Optional[str] = None,
    ) -> List[OpenAIRealtimeEvents]:
        if session_configuration_request is None:
            raise ValueError(
                "session_configuration_request is required for Gemini API calls"
            )

        session_configuration_request_dict: BidiGenerateContentSetup = json.loads(
            session_configuration_request
        ).get("setup", {})
        generation_config = session_configuration_request_dict.get(
            "generationConfig", {}
        )
        gemini_modalities = generation_config.get("responseModalities", ["TEXT"])
        _modalities = [
            modality.lower() for modality in cast(List[str], gemini_modalities)
        ]

        _temperature = generation_config.get("temperature")
        _max_output_tokens = generation_config.get("maxOutputTokens")

        response_items: List[OpenAIRealtimeEvents] = []

        ## - return response.created
        response_created = OpenAIRealtimeStreamResponseBaseObject(
            type="response.created",
            event_id="event_{}".format(uuid.uuid4()),
            response={
                "object": "realtime.response",
                "id": response_id,
                "status": "in_progress",
                "output": [],
                "conversation_id": conversation_id,
                "modalities": _modalities,
                "temperature": _temperature,
                "max_output_tokens": _max_output_tokens,
            },
        )
        response_items.append(response_created)

        ## - return response.output_item.added ← adds ‘item_id’ same for all subsequent events
        response_output_item_added = OpenAIRealtimeStreamResponseOutputItemAdded(
            type="response.output_item.added",
            response_id=response_id,
            output_index=0,
            item={
                "id": output_item_id,
                "object": "realtime.item",
                "type": "message",
                "status": "in_progress",
                "role": "assistant",
                "content": [],
            },
        )
        response_items.append(response_output_item_added)
        ## - return conversation.item.created
        conversation_item_created = OpenAIRealtimeConversationItemCreated(
            type="conversation.item.created",
            event_id="event_{}".format(uuid.uuid4()),
            item={
                "id": output_item_id,
                "object": "realtime.item",
                "type": "message",
                "status": "in_progress",
                "role": "assistant",
                "content": [],
            },
        )
        response_items.append(conversation_item_created)
        ## - return response.content_part.added
        response_content_part_added = OpenAIRealtimeResponseContentPartAdded(
            type="response.content_part.added",
            content_index=0,
            output_index=0,
            event_id="event_{}".format(uuid.uuid4()),
            item_id=output_item_id,
            part={
                "type": "text",
                "text": "",
            }
            if delta_type == "text"
            else {
                "type": "audio",
                "transcript": "",
            },
            response_id=response_id,
        )
        response_items.append(response_content_part_added)
        return response_items

    def transform_content_delta_events(
        self,
        message: BidiGenerateContentServerContent,
        output_item_id: str,
        response_id: str,
        delta_type: ALL_DELTA_TYPES,
    ) -> OpenAIRealtimeResponseDelta:
        delta = ""
        try:
            if "modelTurn" in message and "parts" in message["modelTurn"]:
                for part in message["modelTurn"]["parts"]:
                    if "text" in part:
                        delta += part["text"]
                    elif "inlineData" in part:
                        delta += part["inlineData"]["data"]
        except Exception as e:
            raise ValueError(
                f"Error transforming content delta events: {e}, got message: {message}"
            )

        return OpenAIRealtimeResponseDelta(
            type="response.text.delta"
            if delta_type == "text"
            else "response.audio.delta",
            content_index=0,
            event_id="event_{}".format(uuid.uuid4()),
            item_id=output_item_id,
            output_index=0,
            response_id=response_id,
            delta=delta,
        )

    def transform_content_done_event(
        self,
        delta_chunks: Optional[List[OpenAIRealtimeResponseDelta]],
        current_output_item_id: Optional[str],
        current_response_id: Optional[str],
        delta_type: ALL_DELTA_TYPES,
    ) -> Union[OpenAIRealtimeResponseTextDone, OpenAIRealtimeResponseAudioDone]:
        if delta_chunks:
            delta = "".join([delta_chunk["delta"] for delta_chunk in delta_chunks])
        else:
            delta = ""
        if current_output_item_id is None or current_response_id is None:
            raise ValueError(
                "current_output_item_id and current_response_id cannot be None for a 'done' event."
            )
        if delta_type == "text":
            return OpenAIRealtimeResponseTextDone(
                type="response.text.done",
                content_index=0,
                event_id="event_{}".format(uuid.uuid4()),
                item_id=current_output_item_id,
                output_index=0,
                response_id=current_response_id,
                text=delta,
            )
        elif delta_type == "audio":
            return OpenAIRealtimeResponseAudioDone(
                type="response.audio.done",
                content_index=0,
                event_id="event_{}".format(uuid.uuid4()),
                item_id=current_output_item_id,
                output_index=0,
                response_id=current_response_id,
            )

    def return_additional_content_done_events(
        self,
        current_output_item_id: Optional[str],
        current_response_id: Optional[str],
        delta_done_event: Union[
            OpenAIRealtimeResponseTextDone, OpenAIRealtimeResponseAudioDone
        ],
        delta_type: ALL_DELTA_TYPES,
    ) -> List[OpenAIRealtimeEvents]:
        """
        - return response.content_part.done
        - return response.output_item.done
        """
        if current_output_item_id is None or current_response_id is None:
            raise ValueError(
                "current_output_item_id and current_response_id cannot be None for a 'done' event."
            )
        returned_items: List[OpenAIRealtimeEvents] = []

        delta_done_event_text = cast(Optional[str], delta_done_event.get("text"))
        # response.content_part.done
        response_content_part_done = OpenAIRealtimeContentPartDone(
            type="response.content_part.done",
            content_index=0,
            event_id="event_{}".format(uuid.uuid4()),
            item_id=current_output_item_id,
            output_index=0,
            part={"type": "text", "text": delta_done_event_text}
            if delta_done_event_text and delta_type == "text"
            else {
                "type": "audio",
                "transcript": "",  # gemini doesn't return transcript for audio
            },
            response_id=current_response_id,
        )
        returned_items.append(response_content_part_done)
        # response.output_item.done
        response_output_item_done = OpenAIRealtimeOutputItemDone(
            type="response.output_item.done",
            event_id="event_{}".format(uuid.uuid4()),
            output_index=0,
            response_id=current_response_id,
            item={
                "id": current_output_item_id,
                "object": "realtime.item",
                "type": "message",
                "status": "completed",
                "role": "assistant",
                "content": [
                    {"type": "text", "text": delta_done_event_text}
                    if delta_done_event_text and delta_type == "text"
                    else {
                        "type": "audio",
                        "transcript": "",
                    }
                ],
            },
        )
        returned_items.append(response_output_item_done)
        return returned_items

    @staticmethod
    def get_nested_value(obj: dict, path: str) -> Any:
        keys = path.split(".")
        current = obj
        for key in keys:
            if isinstance(current, dict) and key in current:
                current = current[key]
            else:
                return None
        return current

    def update_current_delta_chunks(
        self,
        transformed_message: Union[OpenAIRealtimeEvents, List[OpenAIRealtimeEvents]],
        current_delta_chunks: Optional[List[OpenAIRealtimeResponseDelta]],
    ) -> Optional[List[OpenAIRealtimeResponseDelta]]:
        try:
            if isinstance(transformed_message, list):
                current_delta_chunks = []
                any_delta_chunk = False
                for event in transformed_message:
                    if event["type"] == "response.text.delta":
                        current_delta_chunks.append(
                            cast(OpenAIRealtimeResponseDelta, event)
                        )
                        any_delta_chunk = True
                if not any_delta_chunk:
                    current_delta_chunks = (
                        None  # reset current_delta_chunks if no delta chunks
                    )
            else:
                if (
                    transformed_message["type"] == "response.text.delta"
                ):  # ONLY ACCUMULATE TEXT DELTA CHUNKS - AUDIO WILL CAUSE SERVER MEMORY ISSUES
                    if current_delta_chunks is None:
                        current_delta_chunks = []
                    current_delta_chunks.append(
                        cast(OpenAIRealtimeResponseDelta, transformed_message)
                    )
                else:
                    current_delta_chunks = None
            return current_delta_chunks
        except Exception as e:
            raise ValueError(
                f"Error updating current delta chunks: {e}, got transformed_message: {transformed_message}"
            )

    def update_current_item_chunks(
        self,
        transformed_message: Union[OpenAIRealtimeEvents, List[OpenAIRealtimeEvents]],
        current_item_chunks: Optional[List[OpenAIRealtimeOutputItemDone]],
    ) -> Optional[List[OpenAIRealtimeOutputItemDone]]:
        try:
            if isinstance(transformed_message, list):
                current_item_chunks = []
                any_item_chunk = False
                for event in transformed_message:
                    if event["type"] == "response.output_item.done":
                        current_item_chunks.append(
                            cast(OpenAIRealtimeOutputItemDone, event)
                        )
                        any_item_chunk = True
                if not any_item_chunk:
                    current_item_chunks = (
                        None  # reset current_item_chunks if no item chunks
                    )
            else:
                if transformed_message["type"] == "response.output_item.done":
                    if current_item_chunks is None:
                        current_item_chunks = []
                    current_item_chunks.append(
                        cast(OpenAIRealtimeOutputItemDone, transformed_message)
                    )
                else:
                    current_item_chunks = None
            return current_item_chunks
        except Exception as e:
            raise ValueError(
                f"Error updating current item chunks: {e}, got transformed_message: {transformed_message}"
            )

    def transform_response_done_event(
        self,
        message: BidiGenerateContentServerMessage,
        current_response_id: Optional[str],
        current_conversation_id: Optional[str],
        output_items: Optional[List[OpenAIRealtimeOutputItemDone]],
        session_configuration_request: Optional[str] = None,
    ) -> OpenAIRealtimeDoneEvent:
        if current_conversation_id is None or current_response_id is None:
            raise ValueError(
                f"current_conversation_id and current_response_id must all be set for a 'done' event. Got=current_conversation_id: {current_conversation_id}, current_response_id: {current_response_id}"
            )

        if session_configuration_request:
            session_configuration_request_dict: BidiGenerateContentSetup = json.loads(
                session_configuration_request
            ).get("setup", {})
        else:
            session_configuration_request_dict = {}

        generation_config = session_configuration_request_dict.get(
            "generationConfig", {}
        )
        temperature = generation_config.get("temperature")
        max_output_tokens = generation_config.get("max_output_tokens")
        gemini_modalities = generation_config.get("responseModalities", ["TEXT"])
        _modalities = [
            modality.lower() for modality in cast(List[str], gemini_modalities)
        ]
        if "usageMetadata" in message:
            _chat_completion_usage = VertexGeminiConfig._calculate_usage(
                completion_response=message,
            )
        else:
            _chat_completion_usage = get_empty_usage()

        responses_api_usage = LiteLLMCompletionResponsesConfig._transform_chat_completion_usage_to_responses_usage(
            _chat_completion_usage,
        )
        response_done_event = OpenAIRealtimeDoneEvent(
            type="response.done",
            event_id="event_{}".format(uuid.uuid4()),
            response=OpenAIRealtimeResponseDoneObject(
                object="realtime.response",
                id=current_response_id,
                status="completed",
                output=[output_item["item"] for output_item in output_items]
                if output_items
                else [],
                conversation_id=current_conversation_id,
                modalities=_modalities,
                usage=responses_api_usage.model_dump(),
            ),
        )
        if temperature is not None:
            response_done_event["response"]["temperature"] = temperature
        if max_output_tokens is not None:
            response_done_event["response"]["max_output_tokens"] = max_output_tokens

        return response_done_event

    def handle_openai_modality_event(
        self,
        openai_event: OpenAIRealtimeEventTypes,
        json_message: dict,
        realtime_response_transform_input: RealtimeResponseTransformInput,
        delta_type: ALL_DELTA_TYPES,
    ) -> RealtimeModalityResponseTransformOutput:
        current_output_item_id = realtime_response_transform_input[
            "current_output_item_id"
        ]
        current_response_id = realtime_response_transform_input["current_response_id"]
        current_conversation_id = realtime_response_transform_input[
            "current_conversation_id"
        ]
        current_delta_chunks = realtime_response_transform_input["current_delta_chunks"]
        session_configuration_request = realtime_response_transform_input[
            "session_configuration_request"
        ]

        returned_message: List[OpenAIRealtimeEvents] = []
        if (
            openai_event == OpenAIRealtimeEventTypes.RESPONSE_TEXT_DELTA
            or openai_event == OpenAIRealtimeEventTypes.RESPONSE_AUDIO_DELTA
        ):
            current_response_id = current_response_id or "resp_{}".format(uuid.uuid4())
            if not current_output_item_id:
                # send the list of standard 'new' content.delta events
                current_output_item_id = "item_{}".format(uuid.uuid4())
                current_conversation_id = current_conversation_id or "conv_{}".format(
                    uuid.uuid4()
                )
                returned_message = self.return_new_content_delta_events(
                    session_configuration_request=session_configuration_request,
                    response_id=current_response_id,
                    output_item_id=current_output_item_id,
                    conversation_id=current_conversation_id,
                    delta_type=delta_type,
                )

            # send the list of standard 'new' content.delta events
            transformed_message = self.transform_content_delta_events(
                BidiGenerateContentServerContent(**json_message["serverContent"]),
                current_output_item_id,
                current_response_id,
                delta_type=delta_type,
            )
            returned_message.append(transformed_message)
        elif (
            openai_event == OpenAIRealtimeEventTypes.RESPONSE_TEXT_DONE
            or openai_event == OpenAIRealtimeEventTypes.RESPONSE_AUDIO_DONE
        ):
            transformed_content_done_event = self.transform_content_done_event(
                current_output_item_id=current_output_item_id,
                current_response_id=current_response_id,
                delta_chunks=current_delta_chunks,
                delta_type=delta_type,
            )
            returned_message = [transformed_content_done_event]

            additional_items = self.return_additional_content_done_events(
                current_output_item_id=current_output_item_id,
                current_response_id=current_response_id,
                delta_done_event=transformed_content_done_event,
                delta_type=delta_type,
            )
            returned_message.extend(additional_items)

        return {
            "returned_message": returned_message,
            "current_output_item_id": current_output_item_id,
            "current_response_id": current_response_id,
            "current_conversation_id": current_conversation_id,
            "current_delta_chunks": current_delta_chunks,
            "current_delta_type": delta_type,
        }

    def map_openai_event(
        self,
        key: str,
        value: dict,
        current_delta_type: Optional[ALL_DELTA_TYPES],
        json_message: dict,
    ) -> OpenAIRealtimeEventTypes:
        model_turn_event = value.get("modelTurn")
        generation_complete_event = value.get("generationComplete")
        openai_event: Optional[OpenAIRealtimeEventTypes] = None
        if model_turn_event:  # check if model turn event
            openai_event = self.map_model_turn_event(model_turn_event)
        elif generation_complete_event:
            openai_event = self.map_generation_complete_event(
                delta_type=current_delta_type
            )
        else:
            # Check if this key or any nested key matches our mapping
            for map_key, openai_event in MAP_GEMINI_FIELD_TO_OPENAI_EVENT.items():
                if map_key == key or (
                    "." in map_key
                    and GeminiRealtimeConfig.get_nested_value(json_message, map_key)
                    is not None
                ):
                    openai_event = openai_event
                    break
        if openai_event is None:
            raise ValueError(f"Unknown openai event: {key}, value: {value}")
        return openai_event

    def transform_realtime_response(
        self,
        message: Union[str, bytes],
        model: str,
        logging_obj: LiteLLMLoggingObj,
        realtime_response_transform_input: RealtimeResponseTransformInput,
    ) -> RealtimeResponseTypedDict:
        """
        Keep this state less - leave the state management (e.g. tracking current_output_item_id, current_response_id, current_conversation_id, current_delta_chunks) to the caller.
        """
        try:
            json_message = json.loads(message)
        except json.JSONDecodeError:
            if isinstance(message, bytes):
                message_str = message.decode("utf-8", errors="replace")
            else:
                message_str = str(message)
            raise ValueError(f"Invalid JSON message: {message_str}")

        logging_session_id = logging_obj.litellm_trace_id

        current_output_item_id = realtime_response_transform_input[
            "current_output_item_id"
        ]
        current_response_id = realtime_response_transform_input["current_response_id"]
        current_conversation_id = realtime_response_transform_input[
            "current_conversation_id"
        ]
        current_delta_chunks = realtime_response_transform_input["current_delta_chunks"]
        session_configuration_request = realtime_response_transform_input[
            "session_configuration_request"
        ]
        current_item_chunks = realtime_response_transform_input["current_item_chunks"]
        current_delta_type: Optional[
            ALL_DELTA_TYPES
        ] = realtime_response_transform_input["current_delta_type"]
        returned_message: List[OpenAIRealtimeEvents] = []

        for key, value in json_message.items():
            # Check if this key or any nested key matches our mapping
            openai_event = self.map_openai_event(
                key=key,
                value=value,
                current_delta_type=current_delta_type,
                json_message=json_message,
            )

            if openai_event == OpenAIRealtimeEventTypes.SESSION_CREATED:
                transformed_message = self.transform_session_created_event(
                    model,
                    logging_session_id,
                    realtime_response_transform_input["session_configuration_request"],
                )
                session_configuration_request = json.dumps(transformed_message)
                returned_message.append(transformed_message)
            elif openai_event == OpenAIRealtimeEventTypes.RESPONSE_DONE:
                transformed_response_done_event = self.transform_response_done_event(
                    message=BidiGenerateContentServerMessage(**json_message),  # type: ignore
                    current_response_id=current_response_id,
                    current_conversation_id=current_conversation_id,
                    session_configuration_request=session_configuration_request,
                    output_items=None,
                )
                returned_message.append(transformed_response_done_event)
            elif (
                openai_event == OpenAIRealtimeEventTypes.RESPONSE_TEXT_DELTA
                or openai_event == OpenAIRealtimeEventTypes.RESPONSE_TEXT_DONE
                or openai_event == OpenAIRealtimeEventTypes.RESPONSE_AUDIO_DELTA
                or openai_event == OpenAIRealtimeEventTypes.RESPONSE_AUDIO_DONE
            ):
                _returned_message = self.handle_openai_modality_event(
                    openai_event,
                    json_message,
                    realtime_response_transform_input,
                    delta_type="text" if "text" in openai_event.value else "audio",
                )
                returned_message.extend(_returned_message["returned_message"])
                current_output_item_id = _returned_message["current_output_item_id"]
                current_response_id = _returned_message["current_response_id"]
                current_conversation_id = _returned_message["current_conversation_id"]
                current_delta_chunks = _returned_message["current_delta_chunks"]
                current_delta_type = _returned_message["current_delta_type"]
            else:
                raise ValueError(f"Unknown openai event: {openai_event}")
        if len(returned_message) == 0:
            if isinstance(message, bytes):
                message_str = message.decode("utf-8", errors="replace")
            else:
                message_str = str(message)
            raise ValueError(f"Unknown message type: {message_str}")

        current_delta_chunks = self.update_current_delta_chunks(
            transformed_message=returned_message,
            current_delta_chunks=current_delta_chunks,
        )
        current_item_chunks = self.update_current_item_chunks(
            transformed_message=returned_message,
            current_item_chunks=current_item_chunks,
        )
        return {
            "response": returned_message,
            "current_output_item_id": current_output_item_id,
            "current_response_id": current_response_id,
            "current_delta_chunks": current_delta_chunks,
            "current_conversation_id": current_conversation_id,
            "current_item_chunks": current_item_chunks,
            "current_delta_type": current_delta_type,
            "session_configuration_request": session_configuration_request,
        }

    def requires_session_configuration(self) -> bool:
        return True

    def session_configuration_request(self, model: str) -> str:
        """

        ```
        {
            "model": string,
            "generationConfig": {
                "candidateCount": integer,
                "maxOutputTokens": integer,
                "temperature": number,
                "topP": number,
                "topK": integer,
                "presencePenalty": number,
                "frequencyPenalty": number,
                "responseModalities": [string],
                "speechConfig": object,
                "mediaResolution": object
            },
            "systemInstruction": string,
            "tools": [object]
        }
        ```
        """

        response_modalities: List[GeminiResponseModalities] = ["AUDIO"]
        output_audio_transcription = False
        # if "audio" in model: ## UNCOMMENT THIS WHEN AUDIO IS SUPPORTED
        #     output_audio_transcription = True

        setup_config: BidiGenerateContentSetup = {
            "model": f"models/{model}",
            "generationConfig": {"responseModalities": response_modalities},
        }
        if output_audio_transcription:
            setup_config["outputAudioTranscription"] = {}
        return json.dumps(
            {
                "setup": setup_config,
            }
        )
