Airbyte DBT normalization project without running a sync
/ 6 min read
If you’re using Airbyte to transfer data between point A and B, there’s a high chance you’re also using the default normalization feature. Sometimes the default normalization logic might not fit your needs and you’d like to extend it in some way. Airbyte Documentation already features a tutorial on implementing custom transformations with SQL.
Their tutorials walks you through the steps required for generating a DBT project with default normalization logic (in SQL) so that you don’t have to write it from scratch. However, it requires you to trigger actual sync process (and then check the workspace and copy its data from Docker to your computer). What if you don’t want to run a sync just to get the DBT template (i.e. your source is a big one and sync would take too much time)?
This article contains an alternative way of generating DBT template (without running a sync).
Prerequisites:
- You have Airbyte Open Source deployment (no idea if something like this is possible in Airbyte Cloud). I used Airbyte
0.42.0
. - You have access to Airbyte internal Postgres database - it’s requried for downloading catalog data. Alternatively, you could use an API to retrieve it (but you need to know Airbyte user username & password as API is protected with basic auth).
- You have Python 3 installation on your local computer (tested this with Python
3.8.16
). - You have a local clone of Airbyte git repository.
Quick recap on normalization
If you’d like to get a deeper understanding of the normalization process, then I recommend checking Airbyte Base Normalization docs.
When Airbyte extracts the record from the source, it keeps its data in a form of a JSON blob. A record in a source table people
looking like this:
id | first_name | last_name |
---|---|---|
1 | John | Doe |
2 | Ian | Smith |
would be loaded into the target table _airbyte_raw_people
in below form (if Raw data (JSON) option is chosen in the normalization tab):
_airbyte_ab_id | _airbyte_data | _airbyte_emitted_at |
---|---|---|
c7756059-0a6a-428a-92f4-f8a17e99ce48 | {“id”: 1, “first_name”: “John”, “last_name”: “Doe”} | 2023-04-06 14:56:42.713 +0200 |
53945dcc-ea4f-4b79-aa38-a01b5609b65d | {“id”: 2, “first_name”: “Ian”, “last_name”: “Smith”} | 2023-04-06 14:56:42.713 +0200 |
It’s probably not how you want your target data would look like and that’s why Airbyte has an alternative form of normalization available (Normalized tabular data). This mode will normalize JSON attributes into separate column in the final table. It is also the normalization step that takes care of creating slowly changing dimensions table (if you use Deduped + History
sync mode).
Airbyte uses DBT for executing normalization SQL scripts (your target warehouse has to support DBT if you want to use normalization).
If the default normalization SQL generated by Airbyte does not fulfill your requirements, you can overwrite it with custom DBT project that will be executed after every sync.
To avoid writing a lot of boilerplate SQL (parsing the data from JSON and casting to appropriate data type), it makes sense to use default Airbyte DBT project as a starting point.
Recipe for generating the project
Let’s go through all the steps required for generating the default DBT normalization project (without running a sync).
-
Create a copy of Airbyte’s normalization Python package.
Package is located in
<path_to_airbyte_repo_clone>/airbyte-integrations/bases/base-normalization
-
Create a virtualenv in the copied directory and activate it.
cd <copied_normalization_package_path> python3 -m venv venv source venv/bin/activate
-
Create a directory for a DBT project to be generated (use one of the already available templates).
This directory will contain the output from the Airbyte normalization scripts which we’ll run. I’ll create it in the current directory (
<copierd_normalization_package_path>
).There are several template directories within the normalization package (
dbt-project-template
anddbt-project-template-<db_flavor>
directories). Choose the right one based on your target warehouse (in my case I’m doing Postgres to Postgres replication so I’ll use the genericdbt-project-template
as there’s no dedicated one for Postgres). I called minedbt_generated_project
. -
Retrieve destination config & catalog data from Airbyte database.
Note down destination and connection id (you can do it from the browser url when visting destination/connection details page -
/destination/<destination_id>
orconnections/<connection_id>
).Login to the internal Airbyte database (Postgres) and run following queries.
Config data:
select configuration from actor where id = '<destination_id>';
Example result (
config.json
):{"ssl": false, "host": "localhost", "port": 5432, "schemas": ["public"], "database": "events_db", "password": {"_secret": "airbyte_workspace_0ed266f7-c0ab-4bcf-a594-c5e45a17fcd3_secret_ba81e05a-cf31-49b4-9b37-ce024d133ce3_v1"}, "ssl_mode": {"mode": "disable"}, "username": "sandbox", "tunnel_method": {"tunnel_method": "NO_TUNNEL"}, "replication_method": {"method": "Standard"}}
Catalog data:
select catalog from connection where id = '<connection_id>';
Example result (
catalog.json
):{"streams": [{"stream": {"name": "recommendation_events", "namespace": "public", "json_schema": {"type": "object", "properties": {"id": {"type": "number", "airbyte_type": "integer"}, "time": {"type": "string", "format": "date-time", "airbyte_type": "timestamp_without_timezone"}, "type": {"type": "string"}, "currency": {"type": "string"}}}, "default_cursor_field": [], "supported_sync_modes": ["full_refresh", "incremental"], "source_defined_primary_key": [["id"]]}, "sync_mode": "full_refresh", "primary_key": [["id"]], "cursor_field": [], "destination_sync_mode": "overwrite"}, {"stream": {"name": "employees", "namespace": "public", "json_schema": {"type": "object", "properties": {"id": {"type": "number", "airbyte_type": "integer"}, "last_name": {"type": "string"}, "first_name": {"type": "string"}}}, "default_cursor_field": [], "supported_sync_modes": ["full_refresh", "incremental"], "source_defined_primary_key": [["id"]]}, "sync_mode": "full_refresh", "primary_key": [["id"]], "cursor_field": [], "destination_sync_mode": "overwrite"}, {"stream": {"name": "employees_emails", "namespace": "public", "json_schema": {"type": "object", "properties": {"id": {"type": "number", "airbyte_type": "integer"}, "email": {"type": "string"}, "last_name": {"type": "string"}, "first_name": {"type": "string"}}}, "default_cursor_field": [], "supported_sync_modes": ["full_refresh", "incremental"], "source_defined_primary_key": [["id"]]}, "sync_mode": "full_refresh", "primary_key": [["id"]], "cursor_field": [], "destination_sync_mode": "overwrite"}]}
Copy queries results into json files within the directory we created in the previous step (in my case these are
config.json
andcatalog.json
).After this step, the structure of my
dbt_generated_project
directory looks as following:├── README.md ├── catalog.json ├── config.json ├── dbt_project.yml ├── macros │ ├── clean_tmp_tables.sql │ ├── configuration.sql │ ├── cross_db_utils │ │ ├── array.sql │ │ ├── columns.sql │ │ ├── concat.sql │ │ ├── current_timestamp.sql │ │ ├── datatypes.sql │ │ ├── except.sql │ │ ├── hash.sql │ │ ├── json_operations.sql │ │ ├── quote.sql │ │ ├── surrogate_key.sql │ │ └── type_conversions.sql │ ├── get_custom_schema.sql │ ├── incremental.sql │ ├── schema_tests │ │ ├── equal_rowcount.sql │ │ └── equality.sql │ ├── should_full_refresh.sql │ └── star_intersect.sql └── packages.yml
-
Transform config.
Go to
<copied_normalization_package_path>
directory and run following command:(venv) > $ transform-config \ --config dbt_generated_project/config.json \ --integration-type postgres \ --out dbt_generated_project
This generates following output:
Namespace(config='dbt_generated_project/config.json', integration_type=<DestinationType.POSTGRES: 'postgres'>, out='dbt_generated_project') transform_postgres
and creates
profiles.yml
file in the output directory:config: partial_parse: true printer_width: 120 send_anonymous_usage_stats: false use_colors: true normalize: outputs: prod: dbname: cdc_target host: localhost pass: _secret: airbyte_workspace_0ed266f7-c0ab-4bcf-a594-c5e45a17fcd3_secret_b5f75e6f-90eb-48cb-8919-d89155f32b1d_v1 port: 5434 schema: new_target threads: 8 type: postgres user: elt target: prod
-
Transform catalog.
(venv) > $ transform-catalog \ --integration-type postgres \ --profile-config-dir dbt_generated_project \ --catalog dbt_generated_project/catalog.json \ --json-column _airbyte_data \ --out dbt_generated_project/models/generated
Output:
Processing dbt_generated_project/catalog.json... Generating airbyte_ctes/public/ecommerce_message_ab1.sql from ecommerce_message Generating airbyte_ctes/public/ecommerce_message_ab2.sql from ecommerce_message Generating airbyte_incremental/public/ecommerce_message_stg.sql from ecommerce_message Generating airbyte_incremental/scd/public/ecommerce_message_scd.sql from ecommerce_message Generating airbyte_incremental/public/ecommerce_message.sql from ecommerce_message Generating airbyte_ctes/public/dedup_test_ab1.sql from dedup_test Generating airbyte_ctes/public/dedup_test_ab2.sql from dedup_test sGenerating airbyte_ctes/public/dedup_test_ab3.sql from dedup_test Generating airbyte_incremental/public/dedup_test.sql from dedup_test Generating airbyte_ctes/public/superheroes_cdc_ab1.sql from superheroes_cdc Generating airbyte_ctes/public/superheroes_cdc_ab2.sql from superheroes_cdc Generating airbyte_ctes/public/superheroes_cdc_ab3.sql from superheroes_cdc Generating airbyte_incremental/public/superheroes_cdc.sql from superheroes_cdc Generating airbyte_ctes/public/nonsensitive_models_ab1.sql from nonsensitive_models Generating airbyte_ctes/public/nonsensitive_models_ab2.sql from nonsensitive_models Generating airbyte_incremental/public/nonsensitive_models_stg.sql from nonsensitive_models Generating airbyte_incremental/scd/public/nonsensitive_models_scd.sql from nonsensitive_models Generating airbyte_incremental/public/nonsensitive_models.sql from nonsensitive_models
The SQL for default normalization has been generated and you can start building on top of it (inside
dbt_generated_project/models/generated/
directory).
Final thoughts
If you need to extend the default Airbyte normalization code, don’t want to start from scratch and don’t mind running the sync to generate the code, following this Airbyte tutorial is the way to go.
Otherwise, you can use transform-catalog
& transform-config
scripts to generate the code.
Best Regards,
Kuba