My dev blog where I dive deep into TypeScript, Postgres, Data science, Infrastructure, Ethereum, and more...

DBT Incremental export to S3

29th Oct 2023

For data sharing, S3 is the cheapest, most efficient and universally accepted.

While you could use macros to do this, I find it much easier to create a seperate helper python script that is run after your dbt models are built.

We’ll make a python script that does this.

Iterate the models:

path = "models/"
files = os.listdir(path)
sql_files = [file for file in files if file.endswith('.sql')]

for sql_file in sql_files:
	modelname = sql_file[:-4]
	print(f"Exporting {modelname} to S3...")

	...

Inside the block, fetch the cursor, or create a new, blank one

cursor_key = '' + modelname + '/_last_inserted_id.csv'

def update_cursor(new_cursor):
	csv_bytes_value = bytes(f"{new_cursor}\n", encoding='UTF-8')

	try:
		s3.put_object(Body=csv_bytes_value, Bucket="l3-dbt-test-bucket", Key=cursor_key)
	except Exception as e:
		print("There was an error updating the cursor.")
		print(e)


try:
	s3.head_object(Bucket="l3-dbt-test-bucket", Key=cursor_key)

	csv_obj = s3.get_object(Bucket="l3-dbt-test-bucket", Key=cursor_key)
	byte_data = csv_obj['Body'].read()

	str_data = byte_data.decode("utf-8")

	_last_inserted_id = int(str_data)

	print(f"Found cursor file ✅ (cursor: {_last_inserted_id}))")

except NoCredentialsError:
	print(f"No credentials provided.")
except Exception as e:
	print(f"Could not find cursor file... Generating one...")
	initial_cursor = 0
	update_cursor(initial_cursor)
	print(f"Generated cursor file ✅ (cursor: {initial_cursor})")
	_last_inserted_id = initial_cursor

print(f"Exporting from {_last_inserted_id}")

Then export the records to s3 based on the cursor. Remember to order by ID, and add the ID-filter!

sql = f"""
	INSERT INTO
		FUNCTION s3(
			'{s3_bucket_url}/{modelname}/{modelname}_{datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S")}.csv',
			'{s3_access_key}',
			'{s3_secret_key}',
			'TabSeparatedWithNames'
		)
	SELECT
		*
	FROM
		{modelname}
	WHERE id > '{ _last_inserted_id}'
	ORDER BY id ASC

	SETTINGS format_tsv_null_representation = '';
"""

client.command(sql)

And update the cursor

sql = f"""
	SELECT
		max(id)
	FROM
		{modelname}
"""

cursor = client.query(sql)
new_cursor = cursor.result_rows[0][0]

update_cursor(new_cursor)

That’s all!

CI

You’ll want to run this in your continous integration. Example with Github actions and Pipenv.

Pipfile

  [scripts]
+ export_s3 = "python scripts/incremental-export-s3.py"

Github actions file:

  - name: Run dbt models
    run: pipenv run dbt run

+ - name: Incrementally export models to s3
+   run: pipenv run export_s3
 

Tools