Load csv file to Postgres Database
Thu Jun 26 2025 10:34:15 GMT+0000 (Coordinated Universal Time)
Saved by @Saravana_Kumar #python
def load_csv(target_table,file_path,column):
temp_table = f"{target_table}_temp"
with cre_data_conn() as conn:
with conn.cursor() as cursor:
# Step 1: Create temp table
cursor.execute(sql.SQL(f"""
CREATE TEMP TABLE {temp_table} (LIKE {target_table} INCLUDING ALL);
"""))
# Step 2: Copy CSV data into the temp table
with open(file_path, 'r') as data:
next(data) # Skip header manually if you're not using CSV HEADER
cursor.copy_expert(sql=f"""
COPY {temp_table}(daydate, is_weekend, daypart, source, geohash, {column}, count)
FROM STDIN WITH CSV HEADER
""", file=data)
# Step 3: Upsert with count summing
cursor.execute(sql.SQL(f"""
INSERT INTO {target_table} (daydate, is_weekend, daypart, source, geohash, {column}, count)
SELECT daydate, is_weekend, daypart, source, geohash, {column}, count
FROM {temp_table}
ON CONFLICT (daydate, is_weekend, daypart, source, geohash, {column})
DO UPDATE SET count = {target_table}.count + EXCLUDED.count
"""))
conn.commit()
def save_csv(final_dataframe, bucket, file_prefix):
"""Saves the final DataFrame to a CSV file."""
output_path = f's3a://{bucket}/{file_prefix}'
(
final_dataframe
.coalesce(1)
.write
.mode("overwrite")
.option("header", "true")
.format("csv")
.save(output_path)
)
def load_csv(bucket, file_prefix,target_table, column):
s3 = boto3.client('s3')
response = s3.list_objects_v2(Bucket=bucket, Prefix=file_prefix)
for obj in response.get('Contents', []):
if obj != []:
if obj['Key'].endswith(".csv"):
csv_filepath = obj['Key']
else:
raise Exception("No CSV part file found in S3 under prefix: " + file_prefix)
print(f"Csv file path: {csv_filepath}")
csv_obj = s3.get_object(Bucket=bucket, Key=csv_filepath)
csv_file = csv_obj['Body'].read().decode('utf-8')
# Step 1: Create a StringIO buffer from the CSV content
csv_buffer = io.StringIO(csv_file)
temp_table = f"{target_table}_temp"
with cre_data_conn() as conn:
with conn.cursor() as cursor:
# Step 2: Create temp table
cursor.execute(sql.SQL(f"""
CREATE TEMP TABLE {temp_table} (LIKE {target_table} INCLUDING ALL);
"""))
# Step 4: Copy S3 CSV content into temp table
csv_buffer.seek(0)
column_list = csv_buffer.readline()
print(f"CSV header:{column_list}")
csv_buffer.seek(0)
cursor.copy_expert(sql=f"""
COPY {temp_table}({column_list})
FROM STDIN WITH CSV HEADER
""", file=csv_buffer)
# Step 5: Upsert with count summing
cursor.execute(sql.SQL(f"""
INSERT INTO {target_table} ({column_list})
SELECT {column_list}
FROM {temp_table}
ON CONFLICT (daydate, is_weekend, hour, geohash, source, {column})
DO UPDATE SET count = {target_table}.count + EXCLUDED.count
"""))
conn.commit()



Comments