DBT Incremental export to S3
29th Oct 2023
For data sharing, S3 is the cheapest, most efficient and universally accepted.
While you could use macros to do this, I find it much easier to create a seperate helper python script that is run after your dbt models are built.
We’ll make a python script that does this.
Iterate the models:
path = "models/"
files = os.listdir(path)
sql_files = [file for file in files if file.endswith('.sql')]
for sql_file in sql_files:
modelname = sql_file[:-4]
print(f"Exporting {modelname} to S3...")
...
Inside the block, fetch the cursor, or create a new, blank one
cursor_key = '' + modelname + '/_last_inserted_id.csv'
def update_cursor(new_cursor):
csv_bytes_value = bytes(f"{new_cursor}\n", encoding='UTF-8')
try:
s3.put_object(Body=csv_bytes_value, Bucket="l3-dbt-test-bucket", Key=cursor_key)
except Exception as e:
print("There was an error updating the cursor.")
print(e)
try:
s3.head_object(Bucket="l3-dbt-test-bucket", Key=cursor_key)
csv_obj = s3.get_object(Bucket="l3-dbt-test-bucket", Key=cursor_key)
byte_data = csv_obj['Body'].read()
str_data = byte_data.decode("utf-8")
_last_inserted_id = int(str_data)
print(f"Found cursor file ✅ (cursor: {_last_inserted_id}))")
except NoCredentialsError:
print(f"No credentials provided.")
except Exception as e:
print(f"Could not find cursor file... Generating one...")
initial_cursor = 0
update_cursor(initial_cursor)
print(f"Generated cursor file ✅ (cursor: {initial_cursor})")
_last_inserted_id = initial_cursor
print(f"Exporting from {_last_inserted_id}")
Then export the records to s3 based on the cursor. Remember to order by ID, and add the ID-filter!
sql = f"""
INSERT INTO
FUNCTION s3(
'{s3_bucket_url}/{modelname}/{modelname}_{datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S")}.csv',
'{s3_access_key}',
'{s3_secret_key}',
'TabSeparatedWithNames'
)
SELECT
*
FROM
{modelname}
WHERE id > '{ _last_inserted_id}'
ORDER BY id ASC
SETTINGS format_tsv_null_representation = '';
"""
client.command(sql)
And update the cursor
sql = f"""
SELECT
max(id)
FROM
{modelname}
"""
cursor = client.query(sql)
new_cursor = cursor.result_rows[0][0]
update_cursor(new_cursor)
That’s all!
CI
You’ll want to run this in your continous integration. Example with Github actions and Pipenv.
Pipfile
[scripts]
+ export_s3 = "python scripts/incremental-export-s3.py"
Github actions file:
- name: Run dbt models
run: pipenv run dbt run
+ - name: Incrementally export models to s3
+ run: pipenv run export_s3