skip to content
Jakub Szafran

Airbyte DBT normalization project without running a sync

/ 6 min read

If you’re using Airbyte to transfer data between point A and B, there’s a high chance you’re also using the default normalization feature. Sometimes the default normalization logic might not fit your needs and you’d like to extend it in some way. Airbyte Documentation already features a tutorial on implementing custom transformations with SQL.

Their tutorials walks you through the steps required for generating a DBT project with default normalization logic (in SQL) so that you don’t have to write it from scratch. However, it requires you to trigger actual sync process (and then check the workspace and copy its data from Docker to your computer). What if you don’t want to run a sync just to get the DBT template (i.e. your source is a big one and sync would take too much time)?

This article contains an alternative way of generating DBT template (without running a sync).

Prerequisites:

  1. You have Airbyte Open Source deployment (no idea if something like this is possible in Airbyte Cloud). I used Airbyte 0.42.0.
  2. You have access to Airbyte internal Postgres database - it’s requried for downloading catalog data. Alternatively, you could use an API to retrieve it (but you need to know Airbyte user username & password as API is protected with basic auth).
  3. You have Python 3 installation on your local computer (tested this with Python 3.8.16).
  4. You have a local clone of Airbyte git repository.

Quick recap on normalization

If you’d like to get a deeper understanding of the normalization process, then I recommend checking Airbyte Base Normalization docs.

When Airbyte extracts the record from the source, it keeps its data in a form of a JSON blob. A record in a source table people looking like this:

idfirst_namelast_name
1JohnDoe
2IanSmith

would be loaded into the target table _airbyte_raw_people in below form (if Raw data (JSON) option is chosen in the normalization tab):

_airbyte_ab_id_airbyte_data_airbyte_emitted_at
c7756059-0a6a-428a-92f4-f8a17e99ce48{“id”: 1, “first_name”: “John”, “last_name”: “Doe”}2023-04-06 14:56:42.713 +0200
53945dcc-ea4f-4b79-aa38-a01b5609b65d{“id”: 2, “first_name”: “Ian”, “last_name”: “Smith”}2023-04-06 14:56:42.713 +0200

It’s probably not how you want your target data would look like and that’s why Airbyte has an alternative form of normalization available (Normalized tabular data). This mode will normalize JSON attributes into separate column in the final table. It is also the normalization step that takes care of creating slowly changing dimensions table (if you use Deduped + History sync mode).

Airbyte uses DBT for executing normalization SQL scripts (your target warehouse has to support DBT if you want to use normalization).

If the default normalization SQL generated by Airbyte does not fulfill your requirements, you can overwrite it with custom DBT project that will be executed after every sync.

To avoid writing a lot of boilerplate SQL (parsing the data from JSON and casting to appropriate data type), it makes sense to use default Airbyte DBT project as a starting point.

Recipe for generating the project

Let’s go through all the steps required for generating the default DBT normalization project (without running a sync).

  1. Create a copy of Airbyte’s normalization Python package.

    Package is located in <path_to_airbyte_repo_clone>/airbyte-integrations/bases/base-normalization

  2. Create a virtualenv in the copied directory and activate it.

    cd <copied_normalization_package_path>
    python3 -m venv venv
    source venv/bin/activate
  3. Create a directory for a DBT project to be generated (use one of the already available templates).

    This directory will contain the output from the Airbyte normalization scripts which we’ll run. I’ll create it in the current directory (<copierd_normalization_package_path>).

    There are several template directories within the normalization package (dbt-project-template and dbt-project-template-<db_flavor> directories). Choose the right one based on your target warehouse (in my case I’m doing Postgres to Postgres replication so I’ll use the generic dbt-project-template as there’s no dedicated one for Postgres). I called mine dbt_generated_project.

  4. Retrieve destination config & catalog data from Airbyte database.

    Note down destination and connection id (you can do it from the browser url when visting destination/connection details page - /destination/<destination_id> or connections/<connection_id>).

    Login to the internal Airbyte database (Postgres) and run following queries.

    Config data:

    select configuration
    from actor
    where id = '<destination_id>';

    Example result (config.json):

     {"ssl": false, "host": "localhost", "port": 5432, "schemas": ["public"], "database": "events_db", "password": {"_secret": "airbyte_workspace_0ed266f7-c0ab-4bcf-a594-c5e45a17fcd3_secret_ba81e05a-cf31-49b4-9b37-ce024d133ce3_v1"}, "ssl_mode": {"mode": "disable"}, "username": "sandbox", "tunnel_method": {"tunnel_method": "NO_TUNNEL"}, "replication_method": {"method": "Standard"}}

    Catalog data:

    select catalog
    from connection
    where id = '<connection_id>';

    Example result (catalog.json):

    {"streams": [{"stream": {"name": "recommendation_events", "namespace": "public", "json_schema": {"type": "object", "properties": {"id": {"type": "number", "airbyte_type": "integer"}, "time": {"type": "string", "format": "date-time", "airbyte_type": "timestamp_without_timezone"}, "type": {"type": "string"}, "currency": {"type": "string"}}}, "default_cursor_field": [], "supported_sync_modes": ["full_refresh", "incremental"], "source_defined_primary_key": [["id"]]}, "sync_mode": "full_refresh", "primary_key": [["id"]], "cursor_field": [], "destination_sync_mode": "overwrite"}, {"stream": {"name": "employees", "namespace": "public", "json_schema": {"type": "object", "properties": {"id": {"type": "number", "airbyte_type": "integer"}, "last_name": {"type": "string"}, "first_name": {"type": "string"}}}, "default_cursor_field": [], "supported_sync_modes": ["full_refresh", "incremental"], "source_defined_primary_key": [["id"]]}, "sync_mode": "full_refresh", "primary_key": [["id"]], "cursor_field": [], "destination_sync_mode": "overwrite"}, {"stream": {"name": "employees_emails", "namespace": "public", "json_schema": {"type": "object", "properties": {"id": {"type": "number", "airbyte_type": "integer"}, "email": {"type": "string"}, "last_name": {"type": "string"}, "first_name": {"type": "string"}}}, "default_cursor_field": [], "supported_sync_modes": ["full_refresh", "incremental"], "source_defined_primary_key": [["id"]]}, "sync_mode": "full_refresh", "primary_key": [["id"]], "cursor_field": [], "destination_sync_mode": "overwrite"}]}

    Copy queries results into json files within the directory we created in the previous step (in my case these are config.json and catalog.json).

    After this step, the structure of my dbt_generated_project directory looks as following:

    ├── README.md
    ├── catalog.json
    ├── config.json
    ├── dbt_project.yml
    ├── macros
    │   ├── clean_tmp_tables.sql
    │   ├── configuration.sql
    │   ├── cross_db_utils
    │   │   ├── array.sql
    │   │   ├── columns.sql
    │   │   ├── concat.sql
    │   │   ├── current_timestamp.sql
    │   │   ├── datatypes.sql
    │   │   ├── except.sql
    │   │   ├── hash.sql
    │   │   ├── json_operations.sql
    │   │   ├── quote.sql
    │   │   ├── surrogate_key.sql
    │   │   └── type_conversions.sql
    │   ├── get_custom_schema.sql
    │   ├── incremental.sql
    │   ├── schema_tests
    │   │   ├── equal_rowcount.sql
    │   │   └── equality.sql
    │   ├── should_full_refresh.sql
    │   └── star_intersect.sql
    └── packages.yml
  5. Transform config.

    Go to <copied_normalization_package_path> directory and run following command:

    (venv) > $ transform-config \
    --config dbt_generated_project/config.json \
    --integration-type postgres \
    --out dbt_generated_project

    This generates following output:

    Namespace(config='dbt_generated_project/config.json', integration_type=<DestinationType.POSTGRES: 'postgres'>, out='dbt_generated_project')
    transform_postgres

    and creates profiles.yml file in the output directory:

    config:
      partial_parse: true
      printer_width: 120
      send_anonymous_usage_stats: false
      use_colors: true
    normalize:
      outputs:
        prod:
          dbname: cdc_target
          host: localhost
          pass:
            _secret: airbyte_workspace_0ed266f7-c0ab-4bcf-a594-c5e45a17fcd3_secret_b5f75e6f-90eb-48cb-8919-d89155f32b1d_v1
          port: 5434
          schema: new_target
          threads: 8
          type: postgres
          user: elt
        target: prod
  6. Transform catalog.

     (venv) > $ transform-catalog \
     --integration-type postgres \
     --profile-config-dir dbt_generated_project \
     --catalog dbt_generated_project/catalog.json \
     --json-column _airbyte_data \
     --out dbt_generated_project/models/generated

    Output:

     Processing dbt_generated_project/catalog.json...
       Generating airbyte_ctes/public/ecommerce_message_ab1.sql from ecommerce_message
       Generating airbyte_ctes/public/ecommerce_message_ab2.sql from ecommerce_message
       Generating airbyte_incremental/public/ecommerce_message_stg.sql from ecommerce_message
       Generating airbyte_incremental/scd/public/ecommerce_message_scd.sql from ecommerce_message
       Generating airbyte_incremental/public/ecommerce_message.sql from ecommerce_message
       Generating airbyte_ctes/public/dedup_test_ab1.sql from dedup_test
       Generating airbyte_ctes/public/dedup_test_ab2.sql from dedup_test
       sGenerating airbyte_ctes/public/dedup_test_ab3.sql from dedup_test
       Generating airbyte_incremental/public/dedup_test.sql from dedup_test
       Generating airbyte_ctes/public/superheroes_cdc_ab1.sql from superheroes_cdc
       Generating airbyte_ctes/public/superheroes_cdc_ab2.sql from superheroes_cdc
       Generating airbyte_ctes/public/superheroes_cdc_ab3.sql from superheroes_cdc
       Generating airbyte_incremental/public/superheroes_cdc.sql from superheroes_cdc
       Generating airbyte_ctes/public/nonsensitive_models_ab1.sql from nonsensitive_models
       Generating airbyte_ctes/public/nonsensitive_models_ab2.sql from nonsensitive_models
       Generating airbyte_incremental/public/nonsensitive_models_stg.sql from nonsensitive_models
       Generating airbyte_incremental/scd/public/nonsensitive_models_scd.sql from nonsensitive_models
       Generating airbyte_incremental/public/nonsensitive_models.sql from nonsensitive_models

    The SQL for default normalization has been generated and you can start building on top of it (inside dbt_generated_project/models/generated/ directory).

Final thoughts

If you need to extend the default Airbyte normalization code, don’t want to start from scratch and don’t mind running the sync to generate the code, following this Airbyte tutorial is the way to go.

Otherwise, you can use transform-catalog & transform-config scripts to generate the code.

Best Regards,

Kuba