from typing import Any, Dict, List, Tuple, Union
from cachetools import TTLCache
from elasticsearch import ConnectionError, ConnectionTimeout, NotFoundError, helpers
from recs_delta_feed_processor.common.app_settings import AppSettings
from recs_delta_feed_processor.common.helpers import current_milli_time
from recs_delta_feed_processor.common.logging_config import Logger
from recs_delta_feed_processor.common.metrics import (
elasticsearch_bulk,
elasticsearch_bulk_res,
elasticsearch_create_action,
index_mapping,
)
from recs_delta_feed_processor.serDe.delta_update_builder import DatafeedsDeltaApiAction
logger = Logger(settings=AppSettings(), name=__name__).logger
mapping_cache: TTLCache[str, Any] = TTLCache(maxsize=1500, ttl=180)
class ElasticsearchManager:
def __init__(self, connections, settings):
self.es_client = connections.es_client
self.settings = settings
def update_elasticsearch(
self, batch: List[Dict[str, Any]]
) -> List[Tuple[Dict[str, Any], Dict[str, Any]]]:
response_array = []
actions = []
# Step 1: Prepare actions and handle exceptions during creation
for message in batch:
item_id = message.get("itemId")
section_id = message.get("sectionId")
timestamp = message.get("timestamp")
response_key = f"{item_id}-{section_id}-{timestamp}"
try:
action = self.create_action(message)
actions.append(action)
# Map action's unique identifier (itemId) to its message
response_array.append({"message": message})
elasticsearch_create_action.labels("success").inc()
except ConnectionTimeout:
raise
except ConnectionError:
raise
except NotFoundError as e:
elasticsearch_create_action.labels("index_not_found").inc()
response_array.append(self.build_create_action_error(
item_id, section_id, getattr(e, "status_code"), e.message
))
logger.exception(
"index not found", extra={"section_id": section_id, "sku": item_id}
)
except Exception as e:
elasticsearch_create_action.labels("create_action_failed").inc()
response_array.append(self.build_create_action_error(
item_id, section_id, 0, str(e)
))
logger.exception(
"Error creating action",
extra={"section_id": section_id, "sku": item_id},
)
# Step 2: Execute bulk request
bulk_start = current_milli_time()
try:
logger.info("Executing ES bulk request", extra={"size": len(actions)})
logger.debug(f"Bulk request prepared: {actions}")
i = 0
for success, result in helpers.streaming_bulk(
client=self.es_client,
actions=actions,
initial_backoff=self.settings.es_initial_backoff,
max_backoff=self.settings.es_max_backoff, # maximum number of seconds a retry will wait
retry_on_status=[408, 429, 503, 504],
max_retries=self.settings.es_max_retries,
raise_on_error=False,
):
# Extract the document ID from the response
update_response: Union[Dict[str, Any], Any] = next(
iter(result.values()), {}
)
item_id = update_response.get("_id")
action_res = update_response.get("result", None)
index = update_response.get("_index", None)
while i < len(response_array) and response_array[i].get("response") is not None:
i += 1
if success or action_res == "noop" or action_res == "not_found":
response_array[i]["response"] = update_response
else:
response_array[i]["response"] = {
"error": f"Failed transaction occurred for event {update_response.get("itemAction")}"
}
elasticsearch_bulk_res.labels(index, action_res).inc()
elasticsearch_bulk.labels("success").observe(
current_milli_time() - bulk_start
)
logger.info("finished indexing ES", extra={"size": len(actions)})
except ConnectionError as _ce:
elasticsearch_bulk.labels("connection_error").observe(
current_milli_time() - bulk_start
)
logger.exception("Connection error with Elasticsearch during bulk request")
raise
# # Step 3: Pair original messages with responses
result = []
for row in response_array:
result.append((row["message"], row["response"]))
return result
def create_action(self, message: Dict[str, Any]) -> DatafeedsDeltaApiAction:
action = DatafeedsDeltaApiAction.convert_message(message)
mapping = self.get_index_mapping(action.build_index_name(action.section_id))
return action.build_request(mapping)
@staticmethod
def build_create_action_error(item_id, section_id, status, error_message):
return {
"response": {
"id_": item_id,
"_index": f"products_{section_id}_sync",
"status": status,
"result": error_message or "",
}
}
def get_index_mapping(self, index_alias: str) -> Dict[str, Any]:
"""
Get the index mapping for a given index alias, using the cache to store mappings.
:param index_alias: The alias of the index to get the mapping for.
:return: The index mapping as a dictionary.
"""
logger.info(
"Getting index mapping for alias", extra={"index_alias": index_alias}
)
if index_alias in mapping_cache:
logger.debug(f"Returning cached mapping for alias: {index_alias}")
index_mapping.labels("cache_hit").inc()
return mapping_cache[index_alias]
logger.debug(f"Fetching mapping for alias: {index_alias} from Elasticsearch")
try:
response = self.es_client.indices.get_mapping(index=index_alias)
logger.debug(f"ES mapping response: {response}")
mapping = self.parse_es_mapping_response(response)
mapping_cache[index_alias] = mapping
index_mapping.labels("cache_miss").inc()
return mapping
except ConnectionError as _ce:
logger.exception(
"Connection error with Elasticsearch",
extra={"index_alias": index_alias},
)
index_mapping.labels("connection_error").inc()
raise
except Exception:
logger.exception(
"Error fetching mapping for alias", extra={"index_alias": index_alias}
)
index_mapping.labels("error").inc()
raise
@staticmethod
def parse_es_mapping_response(response: dict) -> dict:
try:
index_name = next(iter(response))
mappings = response[index_name].get("mappings")
if not mappings:
logger.error(
"No mappings found for index", extra={"index_name": index_name}
)
return {}
return mappings
except StopIteration:
logger.exception("The mapping response is empty")
return {}
except KeyError:
logger.exception("Key error")
return {}
except Exception:
logger.exception("An unexpected error occurred")
return {}
Preview:
downloadDownload PNG
downloadDownload JPEG
downloadDownload SVG
Tip: You can change the style, width & colours of the snippet with the inspect tool before clicking Download!
Click to optimize width for Twitter