def load_csv(target_table,file_path,column): temp_table = f"{target_table}_temp" with cre_data_conn() as conn: with conn.cursor() as cursor: # Step 1: Create temp table cursor.execute(sql.SQL(f""" CREATE TEMP TABLE {temp_table} (LIKE {target_table} INCLUDING ALL); """)) # Step 2: Copy CSV data into the temp table with open(file_path, 'r') as data: next(data) # Skip header manually if you're not using CSV HEADER cursor.copy_expert(sql=f""" COPY {temp_table}(daydate, is_weekend, daypart, source, geohash, {column}, count) FROM STDIN WITH CSV HEADER """, file=data) # Step 3: Upsert with count summing cursor.execute(sql.SQL(f""" INSERT INTO {target_table} (daydate, is_weekend, daypart, source, geohash, {column}, count) SELECT daydate, is_weekend, daypart, source, geohash, {column}, count FROM {temp_table} ON CONFLICT (daydate, is_weekend, daypart, source, geohash, {column}) DO UPDATE SET count = {target_table}.count + EXCLUDED.count """)) conn.commit() def save_csv(final_dataframe, bucket, file_prefix): """Saves the final DataFrame to a CSV file.""" output_path = f's3a://{bucket}/{file_prefix}' ( final_dataframe .coalesce(1) .write .mode("overwrite") .option("header", "true") .format("csv") .save(output_path) ) def load_csv(bucket, file_prefix,target_table, column): s3 = boto3.client('s3') response = s3.list_objects_v2(Bucket=bucket, Prefix=file_prefix) for obj in response.get('Contents', []): if obj != []: if obj['Key'].endswith(".csv"): csv_filepath = obj['Key'] else: raise Exception("No CSV part file found in S3 under prefix: " + file_prefix) print(f"Csv file path: {csv_filepath}") csv_obj = s3.get_object(Bucket=bucket, Key=csv_filepath) csv_file = csv_obj['Body'].read().decode('utf-8') # Step 1: Create a StringIO buffer from the CSV content csv_buffer = io.StringIO(csv_file) temp_table = f"{target_table}_temp" with cre_data_conn() as conn: with conn.cursor() as cursor: # Step 2: Create temp table cursor.execute(sql.SQL(f""" CREATE TEMP TABLE {temp_table} (LIKE {target_table} INCLUDING ALL); """)) # Step 4: Copy S3 CSV content into temp table csv_buffer.seek(0) column_list = csv_buffer.readline() print(f"CSV header:{column_list}") csv_buffer.seek(0) cursor.copy_expert(sql=f""" COPY {temp_table}({column_list}) FROM STDIN WITH CSV HEADER """, file=csv_buffer) # Step 5: Upsert with count summing cursor.execute(sql.SQL(f""" INSERT INTO {target_table} ({column_list}) SELECT {column_list} FROM {temp_table} ON CONFLICT (daydate, is_weekend, hour, geohash, source, {column}) DO UPDATE SET count = {target_table}.count + EXCLUDED.count """)) conn.commit()