05 Dec 2017 Fun with Serializable Functions and Dynamic Destinations in Cloud Dataflow

Waterslide analogy. One input, multiple outputs. Each slide represents a date partition in one table.
Do you have some data that needs to be fed into BigQuery but the output must be split between multiple destination tables? Using a Cloud Dataflow pipeline, you could define some side outputs for each destination table you need, but what happens when you want to write to date partitions in a table and you’re not sure what partitions you need to write to in advance? It gets a little messy. That was the problem I encountered, but we have a solution.
Some background
We had an existing pipeline that was triggered once a day when a file entered our GCS bucket. The CSV file we receive contains data with timestamps ranging across two days, but the output of the pipeline writes to a single table. The client wants to create reports from this table and are interested in the current month’s data but that can change and they will likely want to report on days, weeks and previous months. At the time, the table was relatively small in size but over time it will grow and each time it is queried, even if it is filtered by a date range, the entire table is queried. If the table was partitioned however, the query could be run over a range of partitions and you will only be charged for the partitions you are running the query over. After wanting to change the table to be date partitioned, we were left with around three months of data from the old table that would have to be reprocessed. That’s roughly 3*30 partitions that have to be defined somewhere in your code as side outputs ahead of time eg. table$20170606, table$20170607… and so on. That’s a bit clunky and annoying to implement.
Introduced in Dataflow SDK 2.x for Java – this is where SerializableFunctions and DynamicDestinations can come in handy.
Added new API in
BigQueryIO
for writing into multiple tables, possibly with different schemas, based on data. See BigQueryIO.Write.to(SerializableFunction) and BigQueryIO.Write.to(DynamicDestinations).
Long story short, you would use SerializableFunction
when you have multiple table names to write to but using the same schema. If you have differing schemas per tables however, that would be the reason to use DynamicDestinations
. Previously we defined the table to output to as BigQueryIO.writeTableRows().to(String)
. We can replace that string with either of the two options.
When to use one over the other is best demonstrated with some quick examples that I will cover below. I’ll only focus on the relevant lines of code for brevity sake.
A pipeline using a SerializableFunction to write to multiple partitions in a single table
This CSV contains logs from a “news website” and we want each row to find it’s way into the right table partition in BigQuery based on value of time_ts.
story_id,uid,time_ts,section 9405470,9432687,2017-11-20 00:21:29.000000 UTC,Feature 10027153,8557328,2017-11-21 13:19:12.000000 UTC,Entertainment 6094985,3991512,2017-11-20 08:02:06.000000 UTC,Feature 153402,5810364,2017-11-22 22:26:54.000000 UTC,Sport 9269837,7790738,2017-11-23 14:29:59.000000 UTC,Sport 2841651,8963493,2017-11-22 16:03:30.000000 UTC,Politics 39100,3932088,2017-11-20 19:17:40.000000 UTC,Sport 38440,7080288,2017-11-20 19:06:30.000000 UTC,Feature 2603589,5736045,2017-11-21 17:43:22.000000 UTC,Politics 6487870,2373067,2017-11-22 09:15:36.000000 UTC,Sport 55419,2600640,2017-11-20 22:59:39.000000 UTC,Sport 55479,2635760,2017-11-20 02:05:44.000000 UTC,Entertainment 35746,2592953,2017-11-20 00:24:39.000000 UTC,Sport 4542899,6027789,2017-11-21 10:02:57.000000 UTC,Sport 7116689,6055180,2017-11-21 18:57:18.000000 UTC,Politics 2457001,142047,2017-11-21 20:59:41.000000 UTC,Entertainment 6487819,89025,2017-11-21 08:50:38.000000 UTC,Feature 5991530,9504172,2017-11-22 17:11:58.000000 UTC,Feature 5996342,9704627,2017-11-22 19:23:11.000000 UTC,Entertainment 9624149,9457074,2017-11-22 12:37:32.000000 UTC,Entertainment
Using the SerializableFunction
, from a TableRow
element, we get the time_ts value eg. 2017-11-21 17:43:22.000000, clean it up, ie. to 20171121 and append to the table name and the $ table decorator, giving us the destination project:dataset.table$20171121.
Copy here.
SerializableFunction<ValueInSingleWindow<TableRow>, TableDestination>
is implemented above as a lambda expression, implementing one method called apply(ValueInSingleWindow<TableRow> element)
and returning a TableDestination
.
Fun fact
When dealing with table names that use $, I had to keep the TableDestinations
table description (second arg) null otherwise it tried to create a TableReference
object which cannot have a $ in the table ID as seen in documentation. “[Required] The ID of the table. The ID must contain only letters (a-z, A-Z), numbers (0-9), or underscores (_). The maximum length is 1,024 characters.”
When the description was given a value and the pipeline was run, the rows would find their way to the correct partitions but you get many of these responses.
I followed the trail of method calls in an attempt to understand why this happens, but I came to the conclusion that I was going down a rabbit hole – I just wanted it to work!
So when dealing with partitions using table decorators, for now I keep the table description of a TableDestination
null.
As you can see in the results below, time_ts matches _PARTITIONTIME. The rest of the data can be found in the other partitions.
A pipeline using a SerializableFunction to write to multiple tables
This example is quite similar to the one above and uses the same CSV file. However, instead of writing to multiple partitions of the same table, the output tables are derived from the different sections eg. Sport, Feature, Politics etc. Nothing more to it really.
Copy here.
You can see the multiple tables in the dataset and the Entertainment table below.
A pipeline using DynamicDestinations to write to multiple tables and schemas
This CSV file has an extra field called tags. You can see this field only has data for Feature and Sport sections, so we can extend their schemas from the default.
story_id,uid,time_ts,section,tags 9405470,9432687,2017-11-20 00:21:29.000000 UTC,Feature,Dan Hawkin|If Seinfeld never went off air 10027153,8557328,2017-11-21 13:19:12.000000 UTC,Entertainment, 6094985,3991512,2017-11-20 08:02:06.000000 UTC,Feature,Taylor Finn|Channel 10 through the ages 153402,5810364,2017-11-22 22:26:54.000000 UTC,Sport,AFL 9269837,7790738,2017-11-23 14:29:59.000000 UTC,Sport,AFL 2841651,8963493,2017-11-22 16:03:30.000000 UTC,Politics, 39100,3932088,2017-11-20 19:17:40.000000 UTC,Sport,Cricket 38440,7080288,2017-11-20 19:06:30.000000 UTC,Feature,Guy Black|Cats are the new dogs 2603589,5736045,2017-11-21 17:43:22.000000 UTC,Politics, 6487870,2373067,2017-11-22 09:15:36.000000 UTC,Sport,Swimming 55419,2600640,2017-11-20 22:59:39.000000 UTC,Sport,Rugby 55479,2635760,2017-11-20 02:05:44.000000 UTC,Entertainment, 35746,2592953,2017-11-20 00:24:39.000000 UTC,Sport,NRL 4542899,6027789,2017-11-21 10:02:57.000000 UTC,Sport,NRL 7116689,6055180,2017-11-21 18:57:18.000000 UTC,Politics, 2457001,142047,2017-11-21 20:59:41.000000 UTC,Entertainment, 6487819,89025,2017-11-21 08:50:38.000000 UTC,Feature,Gary Ratson|The problem with fleas 5991530,9504172,2017-11-22 17:11:58.000000 UTC,Feature,Mike Goldman|Are we getting dumber? 5996342,9704627,2017-11-22 19:23:11.000000 UTC,Entertainment, 9624149,9457074,2017-11-22 12:37:32.000000 UTC,Entertainment,
When using DynamicDestinations
you have to implement getDestination()
, getTable()
and getSchema()
. I defined a function called getSchemaForTable()
to return a predefined schema per table which makes this code snippet easier on the eyes. However if you were unable to anticipate the schema needed, you could dynamically construct one using the data in the TableRow
element.
Copy here.
In the ParDo
(which isn’t featured in the code snippet) I do more processing than in the previous examples. If the section is Feature, I take the tags value and seperate it with | giving me two values to use for new columns, author and feature_title in the TableRow
. The tags column is then removed after this since the data has been used. Then, when writing that TableRow
to BigQuery, the Feature table schema is used.
When the section is Sport, the
ParDo
takes the tags value to get the sport and assigns the value to a sport column in the TableRow
. The tags column is removed and Sport table schema is used.
The other tables use the default schema ie. story_id, uid, time_ts and section.
Final thoughts
These two classes offer a clean way to dynamically write to multiple tables and DynamicDestinations further adds the ability for multiple schemas. Alternatively, I would have had to create many side outputs to do the same thing and introduce many lines of code.
One caveat – when writing to partitions in a table, the date partitioned table must be created ahead of time. It looks like this is addressed in 2.2.0 according to these Apache BEAM release notes (issue 2390) so that’s cool.
And that’s all I have to say about that.
Eugene Kirpichov
Posted at 02:30h, 06 DecemberThanks, glad you like the feature 🙂 There’s a similar feature for writing to multiple destinations in TextIO and AvroIO.write().to(DynamicDestinations), and there’s a generalization and simplification of that coming to Beam 2.3 in http://s.apache.org/fileio-write.
Pingback:TEL Newsletter – December 2017 – Shine Solutions Group
Posted at 10:06h, 07 December[…] park, and teed up his second blog post in as many months. This time, he explained to readers that dynamically partitioning data in BigQuery doesn’t have to be hard when you use Cloud Dataflow. The post even caught the attention of the Cloud Dataflow engineers from Google, commenting that it […]
Parin
Posted at 21:04h, 23 FebruaryHow can I use Serializable Function to create partition table if table doesn’t exist because it create no partition table by default.
stewartburnell
Posted at 16:40h, 01 MarchHey Parin – just looking at recent release notes https://cloud.google.com/dataflow/release-notes/release-notes-java-2 it looks like they have addressed this.
There’s a method called withTimePartitioning() they added in 2.2.0 to be used with BigQueryIO.writeTableRows().
https://beam.apache.org/documentation/sdks/javadoc/2.3.0/
Haven’t tested it myself but here is the original issue https://issues.apache.org/jira/browse/BEAM-2390