BigQuery is a powerful data warehouse for handling big masses of event data. However, the bigger your datasets are and the more frequently they are queried, all add up in the monthly costs.
Suppose you’re using the data in a data visualization tool, such as Looker Studio, that writes direct queries to the tables. In that case, you’ll probably want to pre-process the data instead of serving it raw, as connecting to the raw data directly could result in a slow user experience and costly queries.
This periodic pre-processing of data is where Dataform’s incremental tables come in handy.
Incremental tables allow maintaining tables based on large source data by only performing small periodic queries. For example, it doesn’t make sense to build a table from scratch every day if the only missing data is the data from yesterday. An incremental refresh will run the same query with yesterday’s data only and append the results to the table created earlier.
In this post, I’ll share three different methods for performing an incremental refresh. The code examples are all based on the GA4 BigQuery export data.
The GA4 BigQuery export is an excellent example of how performing an incremental refresh is not always straightforward. The export includes both daily processed exports and a continuous real-time data export. While the real-time data is beneficial, it lacks some of the processed fields available in the daily export. In addition, the daily exported data is not guaranteed to be final either, as updates can come in with an up to 72-hour delay.
Combining the data from the daily and real-time exports into an accurate and up-to-date results table requires some robust incremental refresh strategies.
1. Simple date checkpoint
This method relies on a date checkpoint, defined by querying the current status of the results table. The incremental refresh will scan the days greater than the checkpoint from the source table and append the data to the results table.
config {
type: "incremental",
bigquery: {
partitionBy: "date",
clusterBy: ['event_name']
}
}
js {
const test = false;
const startDate = test ? `current_date()-5` : `date_checkpoint`;
const endDate = 'current_date()';
// Daily export tables only
const dateFilter = `(_table_suffix >= cast(${startDate} as string format "YYYYMMDD") and _table_suffix <= cast(${endDate} as string format "YYYYMMDD"))`;
}
select
cast(event_date as date format 'YYYYMMDD') as date,
event_name,
count(*) as events
from
`<project>.<dataset>.events_*`
where
${dateFilter}
group by
1,
2
pre_operations {
declare date_checkpoint default (
${
when(incremental(),
`select max(date)+1 from ${self()}`,
`select date_trunc(current_date(), year)`)
}
)
}
Note that the date checkpoint must be evaluated in a separate query using the pre_operations block. If you add the SQL directly in the where clause, it won’t work as expected. The query results will be the same, but BigQuery will perform a full table scan instead of just scanning the tables required in this incremental update.
I’ve also included a test variable in the js of the query file. The reason is that Dataform doesn’t evaluate the scripts in the pre_operations block when running a query as a test through the Dataform UI. With the test variable set to true, the query doesn’t utilize the date_checkpoint variable, which fixes the problem. Additionally, the test variable avoids large table scans with query test runs.
The below where clause won’t work as expected:
where
${
when(incremental(),
`_table_suffix >= cast((select max(date)+1 from ${self()}) as string format 'YYYYMMDD') and _table_suffix <= cast(current_date() as string format "YYYYMMDD")`,
`_table_suffix >= cast((select date_trunc(current_date(), year)) as string format 'YYYYMMDD') and _table_suffix <= cast(current_date() as string format "YYYYMMDD")`
)
}
Summary
The simple date checkpoint works great when:
- Already added rows don’t need to be modified,
- It’s clear what data already exists in the results table and what is missing.
The example query assumes that the daily data is always complete and new data doesn’t come in late. With GA4 data, that is, of course, not always the case, as events can be sent with up to a 3-day delay.
2. Merge using unique key
What if your incremental refresh also needs to modify rows instead of just appending them?
The benefit of the merge method is that it can alter rows created earlier. You don’t need to know which rows already exist in the table. If a row already exists, it would simply get overwritten, not duplicated.
config {
type: 'incremental',
uniqueKey: ['date', 'event_name'],
bigquery: {
partitionBy: 'date',
clusterBy: ['event_name'],
updatePartitionFilter: 'date >= current_date()-5'
}
}
js {
const startDate = when(incremental(),
`current_date()-3`,
`date_trunc(current_date(), year)`
);
const endDate = 'current_date()';
// Daily and intraday export tables
const dateFilter = `((_table_suffix >= cast(${startDate} as string format "YYYYMMDD") and _table_suffix <= cast(${endDate} as string format "YYYYMMDD"))
or (_table_suffix >= 'intraday_'||cast(${startDate} as string format "YYYYMMDD") and _table_suffix <= 'intraday_'||cast(${endDate} as string format "YYYYMMDD")))`;
}
select
cast(event_date as date format 'YYYYMMDD') as date,
event_name,
count(*) as events
from
`<project>.<dataset>.events_*`
where
${dateFilter}
group by
1,
2
The unique key is used to check which new rows already exist in the table. The query will overwrite those rows.
For the merge to work correctly, it’s essential that the unique key is always unique and remains the same in the further query runs. In the example, I’m using the combination of date and event_name as the unique key.
If not appropriately configured, the merge operation could also be costly! If your results table contains a lot of data, for example event-level data, the merge will need to scan many rows to find all the matches. You can limit this scan by using the updatePartitionFilter setting.
In the example, the updatePartitionFilter is configured so that the merge only scans rows found in the date partitions starting from the current date minus five. The query itself, on the other hand, includes data starting from the current date minus three. If the updatePartitionFilter doesn’t cover all the rows coming in, you’ll start seeing duplicates in your results table.
When to not use merge
The below query is a silly example of a query that would fail if updates were done using the merge method.
config {
type: 'incremental',
uniqueKey: ['date', 'page_views'],
bigquery: {
partitionBy: 'date'
}
}
with user_page_views as (
select
cast(event_date as date format 'YYYYMMDD') as date,
user_pseudo_id,
count(*) as page_views
from
`<project>.<dataset>.events_*`
where
event_name = 'page_view'
group by
1,
2
)
select
date,
page_views,
count(distinct user_pseudo_id) as daily_visitors
from
user_page_views
group by
1,
2
Here’s why this query doesn’t work: At one point, the same visitor could belong to the bucket of users who visited ten pages during the day. In fact, they could be the only visitor in that bucket. However, their status could have switched to 12 pages per day during the next update. If none of the other visitors were at ten pages, the row would persist, and the data would be duplicated.
Summary
The merge method works great when:
- The query needs to modify already added rows,
- A proper unique key is available,
- It’s inconvenient or impossible to use a time checkpoint.
Delete + checkpoint
This method is an evolved version of the simple date checkpoint method. The idea is that all rows that are not considered “final” are flagged during insertion. The flagged rows will then be deleted during the next incremental refresh before new data is brought in again.
One of the benefits of this method over the merge method is that it doesn’t require a unique key. It also supports deletions in the source data if they happen during the configured date window.
config {
type: "incremental",
bigquery: {
partitionBy: "date",
clusterBy: ['event_name']
}
}
js {
const test = false;
const startDate = test ? `current_date()-5` : `date_checkpoint`;
const endDate = 'current_date()';
// Daily and intraday export tables
const dateFilter = `((_table_suffix >= cast(${startDate} as string format "YYYYMMDD") and _table_suffix <= cast(${endDate} as string format "YYYYMMDD"))
or (_table_suffix >= 'intraday_'||cast(${startDate} as string format "YYYYMMDD") and _table_suffix <= 'intraday_'||cast(${endDate} as string format "YYYYMMDD")))`;
}
select
cast(event_date as date format 'YYYYMMDD') as date,
event_name,
if(
date_diff(current_date(), cast(event_date as date format 'YYYYMMDD'), day) > 3,
true,
false
) as data_is_final,
count(*) as events
from
`<project>.<dataset>.events_*`
where
${dateFilter}
group by
1,
2,
3
pre_operations {
-- date checkpoint is based on the latest date that had "final" data
declare date_checkpoint default (
${
when(incremental(),
`select max(date)+1 from ${self()} where data_is_final = true`,
`select date_trunc(current_date(), year)`
)
}
)
---
-- delete rows that are about to be added again through the incremental refresh
${
when(incremental(),
`delete from ${self()} where date >= date_checkpoint`
)
}
}
The example query includes a data_is_final column. This column is evaluated during the incremental refreshes. Any rows from over three days ago are considered safe and won’t be touched during future refreshes.
The pre_operations block also includes a delete operation. Like the date_checkpoint variable, the delete operation is performed before Dataform runs the main SQL query.
Summary
The delete + checkpoint option works great when:
- The query needs to delete or modify already added rows,
- There is no proper unique key available,
- You have clear logic for when the data has reached its final form.
Final thoughts
There is no one-size-fits-all solution for building incremental refresh queries. There can also be many variations to the solutions presented here.
For example, defining when the data is “final” in the last method might not always be that simple. What if the same date partition contains both final and not final rows? In that situation, the query should probably clear the whole partition before bringing in new data.
When working with GA4 data, I often utilize the delete + checkpoint method, as it handles the data differences and delays with the daily and real-time exports very well. However, all methods are very commonly needed.
I hope you find these incremental refresh examples helpful when fine-tuning your Dataform runs. Let me know in the comments!
I tried the above code for the checkpoint + delete but it always returns: Unrecognized name: date_checkpoint at [17:26]. Any idea why I’m getting that error?
Sounds like you’re running the query using the “Run” button instead of executing the table refresh using “Start execution”. The “local” query runs don’t include the pre_operations and post_operations. For that reason, I have the “test” variable in the query files. With the test mode enabled, the queries use a date range that doesn’t depend on the date_checkpoint variable from the pre_operations.
Hi Taneli,
Very interesting and useful articles about GA4, BigQuery, DataForm.
I have a question if the process of deleting and inserting of the last 3 days is included in a transaction.
We have a real-time update of BigQuery from GA4 and I’d like to implement your logic every hour.
Is there a chance that between the deletion and until the insertion of the data, the dashboards can show wrong results? In this case maybe a cloning of the resulting table will be an appropriate addition.
Thanks again for your amazing work.
Hi,
Yes, for a short time between the deletion and the incremental query finishing, the table will be missing some data. So if the data is retrieved to the dashboard at taht exact moment, it would be showing wrong results.
If you’re looking to show for example the current date’s results close to real time, then maybe it would be best to have a dedicated table for that, which would only contain the current date. Then you could just overwrite the whole table every hour.
Hi Taneli,
I have a question about the merge method: If I ingest into a struct where the row is defined by a unique key, will the entire struct be overwritten, deleted or just duplicated?
Best, bjarke
Hi Bjarke,
The merge would simply overwrite the whole row, ragardless of what data types the columns are (structs, arrays…). So yes, the value of the struct column would be overwritten.