from typing import Any, Dict, List, Tuple, Union from cachetools import TTLCache from elasticsearch import ConnectionError, ConnectionTimeout, NotFoundError, helpers from recs_delta_feed_processor.common.app_settings import AppSettings from recs_delta_feed_processor.common.helpers import current_milli_time from recs_delta_feed_processor.common.logging_config import Logger from recs_delta_feed_processor.common.metrics import ( elasticsearch_bulk, elasticsearch_bulk_res, elasticsearch_create_action, index_mapping, ) from recs_delta_feed_processor.serDe.delta_update_builder import DatafeedsDeltaApiAction logger = Logger(settings=AppSettings(), name=__name__).logger mapping_cache: TTLCache[str, Any] = TTLCache(maxsize=1500, ttl=180) class ElasticsearchManager: def __init__(self, connections, settings): self.es_client = connections.es_client self.settings = settings def update_elasticsearch( self, batch: List[Dict[str, Any]] ) -> List[Tuple[Dict[str, Any], Dict[str, Any]]]: response_array = [] actions = [] # Step 1: Prepare actions and handle exceptions during creation for message in batch: item_id = message.get("itemId") section_id = message.get("sectionId") timestamp = message.get("timestamp") response_key = f"{item_id}-{section_id}-{timestamp}" try: action = self.create_action(message) actions.append(action) # Map action's unique identifier (itemId) to its message response_array.append({"message": message}) elasticsearch_create_action.labels("success").inc() except ConnectionTimeout: raise except ConnectionError: raise except NotFoundError as e: elasticsearch_create_action.labels("index_not_found").inc() response_array.append(self.build_create_action_error( item_id, section_id, getattr(e, "status_code"), e.message )) logger.exception( "index not found", extra={"section_id": section_id, "sku": item_id} ) except Exception as e: elasticsearch_create_action.labels("create_action_failed").inc() response_array.append(self.build_create_action_error( item_id, section_id, 0, str(e) )) logger.exception( "Error creating action", extra={"section_id": section_id, "sku": item_id}, ) # Step 2: Execute bulk request bulk_start = current_milli_time() try: logger.info("Executing ES bulk request", extra={"size": len(actions)}) logger.debug(f"Bulk request prepared: {actions}") i = 0 for success, result in helpers.streaming_bulk( client=self.es_client, actions=actions, initial_backoff=self.settings.es_initial_backoff, max_backoff=self.settings.es_max_backoff, # maximum number of seconds a retry will wait retry_on_status=[408, 429, 503, 504], max_retries=self.settings.es_max_retries, raise_on_error=False, ): # Extract the document ID from the response update_response: Union[Dict[str, Any], Any] = next( iter(result.values()), {} ) item_id = update_response.get("_id") action_res = update_response.get("result", None) index = update_response.get("_index", None) while i < len(response_array) and response_array[i].get("response") is not None: i += 1 if success or action_res == "noop" or action_res == "not_found": response_array[i]["response"] = update_response else: response_array[i]["response"] = { "error": f"Failed transaction occurred for event {update_response.get("itemAction")}" } elasticsearch_bulk_res.labels(index, action_res).inc() elasticsearch_bulk.labels("success").observe( current_milli_time() - bulk_start ) logger.info("finished indexing ES", extra={"size": len(actions)}) except ConnectionError as _ce: elasticsearch_bulk.labels("connection_error").observe( current_milli_time() - bulk_start ) logger.exception("Connection error with Elasticsearch during bulk request") raise # # Step 3: Pair original messages with responses result = [] for row in response_array: result.append((row["message"], row["response"])) return result def create_action(self, message: Dict[str, Any]) -> DatafeedsDeltaApiAction: action = DatafeedsDeltaApiAction.convert_message(message) mapping = self.get_index_mapping(action.build_index_name(action.section_id)) return action.build_request(mapping) @staticmethod def build_create_action_error(item_id, section_id, status, error_message): return { "response": { "id_": item_id, "_index": f"products_{section_id}_sync", "status": status, "result": error_message or "", } } def get_index_mapping(self, index_alias: str) -> Dict[str, Any]: """ Get the index mapping for a given index alias, using the cache to store mappings. :param index_alias: The alias of the index to get the mapping for. :return: The index mapping as a dictionary. """ logger.info( "Getting index mapping for alias", extra={"index_alias": index_alias} ) if index_alias in mapping_cache: logger.debug(f"Returning cached mapping for alias: {index_alias}") index_mapping.labels("cache_hit").inc() return mapping_cache[index_alias] logger.debug(f"Fetching mapping for alias: {index_alias} from Elasticsearch") try: response = self.es_client.indices.get_mapping(index=index_alias) logger.debug(f"ES mapping response: {response}") mapping = self.parse_es_mapping_response(response) mapping_cache[index_alias] = mapping index_mapping.labels("cache_miss").inc() return mapping except ConnectionError as _ce: logger.exception( "Connection error with Elasticsearch", extra={"index_alias": index_alias}, ) index_mapping.labels("connection_error").inc() raise except Exception: logger.exception( "Error fetching mapping for alias", extra={"index_alias": index_alias} ) index_mapping.labels("error").inc() raise @staticmethod def parse_es_mapping_response(response: dict) -> dict: try: index_name = next(iter(response)) mappings = response[index_name].get("mappings") if not mappings: logger.error( "No mappings found for index", extra={"index_name": index_name} ) return {} return mappings except StopIteration: logger.exception("The mapping response is empty") return {} except KeyError: logger.exception("Key error") return {} except Exception: logger.exception("An unexpected error occurred") return {}
Preview:
downloadDownload PNG
downloadDownload JPEG
downloadDownload SVG
Tip: You can change the style, width & colours of the snippet with the inspect tool before clicking Download!
Click to optimize width for Twitter