INPUT_FILE=$(gsutil ls "gs://atarg-data-dev-smp-extract/atarg_customer_exports/vici_dnbipr_testing/20240601/" | tail -1) echo $INPUT_FILE if [[ $INPUT_FILE == gs://* ]]; then echo "Compressed File exists" else echo "Compressed file does not exist" exit 1 fi gsutil -q stat gs://atarg-data-dev-smp-extract/atarg_customer_exports/vici_dnbipr_testing_output20/jobid status=$? echo $status if [[ $status != 0 ]] then # clean up anything already in output # echo $(gsutil -m rm -r -f {output} 2>&1) # python nparts.py --input_prefix gs://atarg-data-dev-smp-extract/atarg_customer_exports2222/vici_dnbipr_testing/20240601/data.000000000000.csv.gz --output_prefix gs://atarg-data-dev-smp-extract/atarg_customer_exports/vici_dnbipr_testing_output20/20240601/ \ # --region us-central1 --project atarg-data-dev --runner DataflowRunner --job_name testpasrtwswe --temp_location gs://atarg-data-dev-smp-extract/dataflow/tmp31/ & tmp_file=$(mktemp) || echo Running > $tmp_file gsutil cp $tmp_file gs://atarg-data-dev-smp-extract/atarg_customer_exports/vici_dnbipr_testing_output20/jobid || rm "$tmp_file" echo submitted job # exit 1 means, we'll retry and check status of job exit 1 fi state=$(gcloud dataflow jobs list | grep testpasrtwswe | head -1 | awk '{print $(NF-1)}') echo $state echo job state: $state if [[ $state == "Failed" ]] then gsutil -m rm -r -f gs://atarg-data-dev-smp-extract/atarg_customer_exports/vici_dnbipr_testing_output20/jobid exit 1 fi if [[ $state == "Done" ]] then tmp_file=$(mktemp) || echo "Done" > $tmp_file gsutil cp $tmp_file gs://atarg-data-dev-smp-extract/atarg_customer_exports/vici_dnbipr_testing_output20/jobid || rm "$tmp_file" fi