Waterslide analogy. One input, multiple outputs. Each slide represents a date partition in one table.

Do you have some data that needs to be fed into BigQuery but the output must be split between multiple destination tables? Using a Cloud Dataflow pipeline, you could define some side outputs for each destination table you need, but what happens when you want to write to date partitions in a table and you’re not sure what partitions you need to write to in advance? It gets a little messy. That was the problem I encountered, but we have a solution.

Some background

We had an existing pipeline that was triggered once a day when a file entered our GCS bucket. The CSV file we receive contains data with timestamps ranging across two days, but the output of the pipeline writes to a single table. The client wants to create reports from this table and are interested in the current month’s data but that can change and they will likely want to report on days, weeks and previous months. At the time, the table was relatively small in size but over time it will grow and each time it is queried, even if it is filtered by a date range, the entire table is queried. If the table was partitioned however, the query could be run over a range of partitions and you will only be charged for the partitions you are running the query over.  After wanting to change the table to be date partitioned, we were left with around three months of data from the old table that would have to be reprocessed. That’s roughly 3*30 partitions that have to be defined somewhere in your code as side outputs ahead of time eg. table$20170606, table$20170607… and so on. That’s a bit clunky and annoying to implement.

Introduced in Dataflow SDK 2.x for Java – this is where SerializableFunctions and DynamicDestinations can come in handy.

Added new API in BigQueryIO for writing into multiple tables, possibly with different schemas, based on data. See BigQueryIO.Write.to(SerializableFunction) and BigQueryIO.Write.to(DynamicDestinations).

Long story short, you would use SerializableFunction when you have multiple table names to write to but using the same schema. If you have differing schemas per tables however, that would be the reason to use DynamicDestinations. Previously we defined the table to output to as BigQueryIO.writeTableRows().to(String). We can replace that string with either of the two options.

When to use one over the other is best demonstrated with some quick examples that I will cover below. I’ll only focus on the relevant lines of code for brevity sake.

A pipeline using a SerializableFunction to write to multiple partitions in a single table

This CSV contains logs from a “news website” and we want each row to find it’s way into the right table partition in BigQuery based on value of time_ts.

9405470,9432687,2017-11-20 00:21:29.000000 UTC,Feature
10027153,8557328,2017-11-21 13:19:12.000000 UTC,Entertainment
6094985,3991512,2017-11-20 08:02:06.000000 UTC,Feature
153402,5810364,2017-11-22 22:26:54.000000 UTC,Sport
9269837,7790738,2017-11-23 14:29:59.000000 UTC,Sport
2841651,8963493,2017-11-22 16:03:30.000000 UTC,Politics
39100,3932088,2017-11-20 19:17:40.000000 UTC,Sport
38440,7080288,2017-11-20 19:06:30.000000 UTC,Feature
2603589,5736045,2017-11-21 17:43:22.000000 UTC,Politics
6487870,2373067,2017-11-22 09:15:36.000000 UTC,Sport
55419,2600640,2017-11-20 22:59:39.000000 UTC,Sport
55479,2635760,2017-11-20 02:05:44.000000 UTC,Entertainment
35746,2592953,2017-11-20 00:24:39.000000 UTC,Sport
4542899,6027789,2017-11-21 10:02:57.000000 UTC,Sport
7116689,6055180,2017-11-21 18:57:18.000000 UTC,Politics
2457001,142047,2017-11-21 20:59:41.000000 UTC,Entertainment
6487819,89025,2017-11-21 08:50:38.000000 UTC,Feature
5991530,9504172,2017-11-22 17:11:58.000000 UTC,Feature
5996342,9704627,2017-11-22 19:23:11.000000 UTC,Entertainment
9624149,9457074,2017-11-22 12:37:32.000000 UTC,Entertainment

Using the SerializableFunction, from a TableRow element, we get the time_ts value eg. 2017-11-21 17:43:22.000000, clean it up, ie. to 20171121 and append to the table name and the $ table decorator, giving us the destination project:dataset.table$20171121.

Screen Shot 2017-12-05 at 3.49.18 pm.png

Copy here.
SerializableFunction<ValueInSingleWindow<TableRow>, TableDestination> is implemented above as a lambda expression, implementing one method called apply(ValueInSingleWindow<TableRow> element) and returning a TableDestination.

Fun fact

When dealing with table names that use $, I had to keep the TableDestinations table description (second arg) null otherwise it tried to create a TableReference object which cannot have a $ in the table ID as seen in documentation. “[Required] The ID of the table. The ID must contain only letters (a-z, A-Z), numbers (0-9), or underscores (_). The maximum length is 1,024 characters.” 

When the description was given a value and the pipeline was run, the rows would find their way to the correct partitions but you get many of these responses.

Screen Shot 2017-12-01 at 3.05.04 pm.png

I followed the trail of method calls in an attempt to understand why this happens, but I came to the conclusion that I was going down a rabbit hole – I just wanted it to work!

So when dealing with partitions using table decorators, for now I keep the table description of a TableDestination null.

As you can see in the results below, time_ts matches _PARTITIONTIME. The rest of the data can be found in the other partitions.


A pipeline using a SerializableFunction to write to multiple tables

This example is quite similar to the one above and uses the same CSV file. However, instead of writing to multiple partitions of the same table, the output tables are derived from the different sections eg. Sport, Feature, Politics etc. Nothing more to it really.

Screen Shot 2017-12-05 at 3.51.28 pm.png

Copy here.

You can see the multiple tables in the dataset and the Entertainment table below.

Screen Shot 2017-12-05 at 4.42.34 pm.png


A pipeline using DynamicDestinations to write to multiple tables and schemas

This CSV file has an extra field called tags. You can see this field only has data for Feature and Sport sections, so we can extend their schemas from the default.

9405470,9432687,2017-11-20 00:21:29.000000 UTC,Feature,Dan Hawkin|If Seinfeld never went off air
10027153,8557328,2017-11-21 13:19:12.000000 UTC,Entertainment,
6094985,3991512,2017-11-20 08:02:06.000000 UTC,Feature,Taylor Finn|Channel 10 through the ages
153402,5810364,2017-11-22 22:26:54.000000 UTC,Sport,AFL
9269837,7790738,2017-11-23 14:29:59.000000 UTC,Sport,AFL
2841651,8963493,2017-11-22 16:03:30.000000 UTC,Politics,
39100,3932088,2017-11-20 19:17:40.000000 UTC,Sport,Cricket
38440,7080288,2017-11-20 19:06:30.000000 UTC,Feature,Guy Black|Cats are the new dogs
2603589,5736045,2017-11-21 17:43:22.000000 UTC,Politics,
6487870,2373067,2017-11-22 09:15:36.000000 UTC,Sport,Swimming
55419,2600640,2017-11-20 22:59:39.000000 UTC,Sport,Rugby
55479,2635760,2017-11-20 02:05:44.000000 UTC,Entertainment,
35746,2592953,2017-11-20 00:24:39.000000 UTC,Sport,NRL
4542899,6027789,2017-11-21 10:02:57.000000 UTC,Sport,NRL
7116689,6055180,2017-11-21 18:57:18.000000 UTC,Politics,
2457001,142047,2017-11-21 20:59:41.000000 UTC,Entertainment,
6487819,89025,2017-11-21 08:50:38.000000 UTC,Feature,Gary Ratson|The problem with fleas
5991530,9504172,2017-11-22 17:11:58.000000 UTC,Feature,Mike Goldman|Are we getting dumber?
5996342,9704627,2017-11-22 19:23:11.000000 UTC,Entertainment,
9624149,9457074,2017-11-22 12:37:32.000000 UTC,Entertainment,

When using DynamicDestinations you have to implement getDestination(), getTable() and getSchema(). I defined a function called getSchemaForTable() to return a predefined schema per table which makes this code snippet easier on the eyes. However if you were unable to anticipate the schema needed, you could dynamically construct one using the data in the TableRow element.


Screen Shot 2017-12-05 at 3.58.30 pm.png

Copy here.

In the ParDo (which isn’t featured in the code snippet) I do more processing than in the previous examples. If the section is Feature, I take the tags value and seperate it with giving me two values to use for new columns, author and feature_title in the TableRow.  The tags column is then removed after this since the data has been used. Then, when writing that TableRow to BigQuery, the Feature table schema is used.

Screen Shot 2017-12-01 at 6.01.03 pm.png

Screen Shot 2017-12-01 at 6.01.19 pm.pngWhen the section is Sport, the ParDo takes the tags value to get the sport and assigns the value to a sport column in the TableRow. The tags column is removed and Sport table schema is used.

The other tables use the default schema ie. story_id, uid, time_ts and section.


Final thoughts

These two classes offer a clean way to dynamically write to multiple tables and DynamicDestinations further adds the ability for multiple schemas. Alternatively, I would have had to create many side outputs to do the same thing and introduce many lines of code.

One caveat – when writing to partitions in a table, the date partitioned table must be created ahead of time. It looks like this is addressed in 2.2.0 according to these Apache BEAM release notes (issue 2390) so that’s cool.

And that’s all I have to say about that.


Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s