Load csv file to Postgres Database

PHOTO EMBED

Thu Jun 26 2025 10:34:15 GMT+0000 (Coordinated Universal Time)

Saved by @Saravana_Kumar #python

def load_csv(target_table,file_path,column):

    temp_table = f"{target_table}_temp"

    with cre_data_conn() as conn:
        with conn.cursor() as cursor:
            # Step 1: Create temp table
            cursor.execute(sql.SQL(f"""
                CREATE TEMP TABLE {temp_table} (LIKE {target_table} INCLUDING ALL);
            """))

            # Step 2: Copy CSV data into the temp table
            with open(file_path, 'r') as data:
                next(data)  # Skip header manually if you're not using CSV HEADER
                cursor.copy_expert(sql=f"""
                    COPY {temp_table}(daydate, is_weekend, daypart, source, geohash, {column}, count)
                    FROM STDIN WITH CSV HEADER
                """, file=data)

            # Step 3: Upsert with count summing
            cursor.execute(sql.SQL(f"""
                INSERT INTO {target_table} (daydate, is_weekend, daypart, source, geohash, {column}, count)
                SELECT daydate, is_weekend, daypart, source, geohash, {column}, count
                FROM {temp_table}
                ON CONFLICT (daydate, is_weekend, daypart, source, geohash, {column})
                DO UPDATE SET count = {target_table}.count + EXCLUDED.count
            """))

            conn.commit()


def save_csv(final_dataframe, bucket, file_prefix):
    """Saves the final DataFrame to a CSV file."""

    output_path = f's3a://{bucket}/{file_prefix}'
    (
        final_dataframe
            .coalesce(1)
            .write
            .mode("overwrite")
            .option("header", "true")
            .format("csv")
            .save(output_path)
    )

def load_csv(bucket, file_prefix,target_table, column):

    s3 = boto3.client('s3')
    response = s3.list_objects_v2(Bucket=bucket, Prefix=file_prefix)
    for obj in response.get('Contents', []):
        if obj != []:
            if obj['Key'].endswith(".csv"):
                csv_filepath = obj['Key']
        else:
            raise Exception("No CSV part file found in S3 under prefix: " + file_prefix)
    print(f"Csv file path: {csv_filepath}")
    csv_obj = s3.get_object(Bucket=bucket, Key=csv_filepath)
    csv_file = csv_obj['Body'].read().decode('utf-8')
    # Step 1: Create a StringIO buffer from the CSV content
    csv_buffer = io.StringIO(csv_file)

    temp_table = f"{target_table}_temp"
    with cre_data_conn() as conn:
        with conn.cursor() as cursor:
            # Step 2: Create temp table
            cursor.execute(sql.SQL(f"""
                CREATE TEMP TABLE {temp_table} (LIKE {target_table} INCLUDING ALL);
            """))
            
            # Step 4: Copy S3 CSV content into temp table
            csv_buffer.seek(0)
            column_list = csv_buffer.readline()
            print(f"CSV header:{column_list}")
            csv_buffer.seek(0)
            cursor.copy_expert(sql=f"""
                COPY {temp_table}({column_list})
                FROM STDIN WITH CSV HEADER
            """, file=csv_buffer)

            # Step 5: Upsert with count summing
            cursor.execute(sql.SQL(f"""
                INSERT INTO {target_table} ({column_list})
                SELECT {column_list}
                FROM {temp_table}
                ON CONFLICT (daydate, is_weekend, hour, geohash, source, {column})
                DO UPDATE SET count = {target_table}.count + EXCLUDED.count
            """))

            conn.commit()
content_copyCOPY