Preview:
from typing import Any, Dict, List, Tuple, Union

from cachetools import TTLCache
from elasticsearch import ConnectionError, ConnectionTimeout, NotFoundError, helpers

from recs_delta_feed_processor.common.app_settings import AppSettings
from recs_delta_feed_processor.common.helpers import current_milli_time
from recs_delta_feed_processor.common.logging_config import Logger
from recs_delta_feed_processor.common.metrics import (
    elasticsearch_bulk,
    elasticsearch_bulk_res,
    elasticsearch_create_action,
    index_mapping,
)
from recs_delta_feed_processor.serDe.delta_update_builder import DatafeedsDeltaApiAction

logger = Logger(settings=AppSettings(), name=__name__).logger

mapping_cache: TTLCache[str, Any] = TTLCache(maxsize=1500, ttl=180)


class ElasticsearchManager:
    def __init__(self, connections, settings):
        self.es_client = connections.es_client
        self.settings = settings

    def update_elasticsearch(
        self, batch: List[Dict[str, Any]]
    ) -> List[Tuple[Dict[str, Any], Dict[str, Any]]]:
        response_array = []
        actions = []

        # Step 1: Prepare actions and handle exceptions during creation
        for message in batch:
            item_id = message.get("itemId")
            section_id = message.get("sectionId")
            timestamp = message.get("timestamp")
            response_key = f"{item_id}-{section_id}-{timestamp}"
            try:
                action = self.create_action(message)
                actions.append(action)
                # Map action's unique identifier (itemId) to its message
                response_array.append({"message": message})
                elasticsearch_create_action.labels("success").inc()
            except ConnectionTimeout:
                raise
            except ConnectionError:
                raise
            except NotFoundError as e:
                elasticsearch_create_action.labels("index_not_found").inc()
                response_array.append(self.build_create_action_error(
                    item_id, section_id, getattr(e, "status_code"), e.message
                ))
                logger.exception(
                    "index not found", extra={"section_id": section_id, "sku": item_id}
                )
            except Exception as e:
                elasticsearch_create_action.labels("create_action_failed").inc()
                response_array.append(self.build_create_action_error(
                    item_id, section_id, 0, str(e)
                ))
                logger.exception(
                    "Error creating action",
                    extra={"section_id": section_id, "sku": item_id},
                )

        # Step 2: Execute bulk request
        bulk_start = current_milli_time()
        try:
            logger.info("Executing ES bulk request", extra={"size": len(actions)})
            logger.debug(f"Bulk request prepared: {actions}")
            i = 0
            for success, result in helpers.streaming_bulk(
                client=self.es_client,
                actions=actions,
                initial_backoff=self.settings.es_initial_backoff,
                max_backoff=self.settings.es_max_backoff,  # maximum number of seconds a retry will wait
                retry_on_status=[408, 429, 503, 504],
                max_retries=self.settings.es_max_retries,
                raise_on_error=False,
            ):
                # Extract the document ID from the response
                update_response: Union[Dict[str, Any], Any] = next(
                    iter(result.values()), {}
                )
                item_id = update_response.get("_id")
                action_res = update_response.get("result", None)
                index = update_response.get("_index", None)
                while i < len(response_array) and response_array[i].get("response") is not None:
                    i += 1

                if success or action_res == "noop" or action_res == "not_found":
                    response_array[i]["response"] = update_response
                else:
                    response_array[i]["response"] = {
                        "error": f"Failed transaction occurred for event {update_response.get("itemAction")}"
                    }
                elasticsearch_bulk_res.labels(index, action_res).inc()
            elasticsearch_bulk.labels("success").observe(
                current_milli_time() - bulk_start
            )
            logger.info("finished indexing ES", extra={"size": len(actions)})

        except ConnectionError as _ce:
            elasticsearch_bulk.labels("connection_error").observe(
                current_milli_time() - bulk_start
            )
            logger.exception("Connection error with Elasticsearch during bulk request")
            raise

        # # Step 3: Pair original messages with responses
        result = []
        for row in response_array:
            result.append((row["message"], row["response"]))

        return result

    def create_action(self, message: Dict[str, Any]) -> DatafeedsDeltaApiAction:
        action = DatafeedsDeltaApiAction.convert_message(message)
        mapping = self.get_index_mapping(action.build_index_name(action.section_id))
        return action.build_request(mapping)

    @staticmethod
    def build_create_action_error(item_id, section_id, status, error_message):
        return {
            "response": {
                "id_": item_id,
                "_index": f"products_{section_id}_sync",
                "status": status,
                "result": error_message or "",
            }
        }

    def get_index_mapping(self, index_alias: str) -> Dict[str, Any]:
        """
        Get the index mapping for a given index alias, using the cache to store mappings.

        :param index_alias: The alias of the index to get the mapping for.
        :return: The index mapping as a dictionary.
        """
        logger.info(
            "Getting index mapping for alias", extra={"index_alias": index_alias}
        )
        if index_alias in mapping_cache:
            logger.debug(f"Returning cached mapping for alias: {index_alias}")
            index_mapping.labels("cache_hit").inc()
            return mapping_cache[index_alias]

        logger.debug(f"Fetching mapping for alias: {index_alias} from Elasticsearch")
        try:
            response = self.es_client.indices.get_mapping(index=index_alias)
            logger.debug(f"ES mapping response: {response}")
            mapping = self.parse_es_mapping_response(response)
            mapping_cache[index_alias] = mapping
            index_mapping.labels("cache_miss").inc()
            return mapping
        except ConnectionError as _ce:
            logger.exception(
                "Connection error with Elasticsearch",
                extra={"index_alias": index_alias},
            )
            index_mapping.labels("connection_error").inc()
            raise
        except Exception:
            logger.exception(
                "Error fetching mapping for alias", extra={"index_alias": index_alias}
            )
            index_mapping.labels("error").inc()
            raise

    @staticmethod
    def parse_es_mapping_response(response: dict) -> dict:
        try:
            index_name = next(iter(response))

            mappings = response[index_name].get("mappings")
            if not mappings:
                logger.error(
                    "No mappings found for index", extra={"index_name": index_name}
                )
                return {}

            return mappings
        except StopIteration:
            logger.exception("The mapping response is empty")
            return {}
        except KeyError:
            logger.exception("Key error")
            return {}
        except Exception:
            logger.exception("An unexpected error occurred")
            return {}
downloadDownload PNG downloadDownload JPEG downloadDownload SVG

Tip: You can change the style, width & colours of the snippet with the inspect tool before clicking Download!

Click to optimize width for Twitter