Triggering Dataflow Pipelines With Cloud Functions

Triggering Dataflow Pipelines With Cloud Functions

Do you have an unreasonable fear of cronjobs? Find spinning up VMs to be a colossal waste of your towering intellect? Does the thought of checking a folder regularly for updates fill you with an apoplectic rage? If so, you should probably get some help. Maybe find another line of work.

In the meantime, here’s one way to ease your regular file processing anxieties. With just one application of Google Cloud Functions, eased gently up your Dataflow Pipeline, you can find lasting relief from troublesome cronjobs.

But first, some assumptions.

Assumptions?

You’ve got gcloud installed, you’ve created a project, and you’ve enabled the dataflow and cloud functions APIs. If you haven’t, then a few minutes of reading the google docs should get you started.

I’m ready now.

We are going to set up a Google Cloud Function that will get called every time a cloud storage bucket gets updated. That function will kick off a dataflow pipeline (using the handy new templates) to process the file with whatever complicated data transformations somebody further up the food chain has specified.

Hang on, you’ve lost me.

Ok, maybe I should have been a bit more explicit in my assumptions. Here’s a quick cheat sheet:

  • Cloud functions – serverless little bits of code (like Amazon’s Lambdas)
  • Dataflow – a managed service for processing large amounts of data
  • Cloud storage – Google’s equivalent of S3

Ok, carry on, we’re good.

First step is to create a few buckets (you can call them whatever you want, but these are the names I’ve used in the rest of this article):

  • one for keeping the code for your cloud function – cloud-fn-staging
  • one for keeping the code for your pipeline – pipeline-staging
  • the bucket you want to monitor for updates – incoming-files

If you’re not sure how to create buckets, google’s docs can help. Or just start clicking around randomly in cloud console. You’ll work it out, I believe in you, champ.

Now what?

Now we’re going to make a Dataflow Template. If you’ve already got a pipeline you can use that one, but I mangled one of the example pipelines that Google gives you. There’s a little fiddly detail here: templates were introduced in version 1.9.0 of the dataflow java libraries, so you’ll need at least that version. However, if you go for the latest 2.x version (which is also the default version that the maven archetypes generate), the way to create your template changes. But the google docs haven’t been updated for that version.

To create the example pipeline I ran the following maven command:

mvn archetype:generate \
 -DarchetypeArtifactId=google-cloud-dataflow-java-archetypes-examples \
 -DarchetypeGroupId=com.google.cloud.dataflow \
 -DarchetypeVersion=1.9.0 \
 -DgroupId=com.shinesolutions \
 -DartifactId=dataflow-template-poc \
 -Dversion="0.1" \
 -DinteractiveMode=false \
 -Dpackage=com.shinesolutions

Notice the -DarchetypeVersion=1.9.0 option, which ensures that I’m using that version of the libs. Without that option you’ll end up with the 2.x versions (which I will explain how to use as well, don’t worry).

This generates some sample code, including the dataflow standard WordCount. I edited this to make it templatable. I also invented the word templatable, just now. You can use it, though, it’s cool. Here’s the important changes, first in the WordCountOptions interface (in the WordCount class):

public interface WordCountOptions extends PipelineOptions {
 @Description("Path of the file to read from")
 @Validation.Required
 ValueProvider<String> getInputFile();
 void setInputFile(ValueProvider<String> value);

@Description("Path of the file to write to")
 @Validation.Required
 ValueProvider<String> getOutputFile();
 void setOutputFile(ValueProvider<String> value);
}

Instead of the options just being String values, they are now ValueProvider types. This lets the Runner know that these values will be provided later.

The main method looks like this:

public static void main(String[] args) {
 WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
 .as(WordCountOptions.class);
 Pipeline p = Pipeline.create(options);

  p.apply(TextIO.Read.named("ReadLines").from(options.getInputFile()).withoutValidation())
 .apply(new CountWords())
 .apply(ParDo.of(new FormatAsTextFn()))
 .apply(TextIO.Write.named("WriteCounts").to(options.getOutputFile()).withoutValidation());

  p.run();
}

The important thing to note here is the .withoutValidation() modifier to the TextIO.Read. If you don’t do that your template won’t get created because TextIO will try to validate the option values before they’ve been supplied. We’re going to use TextIO in this example, but PubSub and BigQuery input/output in dataflow also support passing in options from templates.

To create our template, run this command:

 mvn compile exec:java \
 -Dexec.mainClass=com.shinesolutions.WordCount \
 -Dexec.args="--project=<your project> \
 --stagingLocation=gs://pipeline-staging \
 --dataflowJobFile=gs://pipeline-staging/templates/WordCount \
 --runner=TemplatingDataflowPipelineRunner"

If that succeeded, your template has been created. Remember the value you supplied for the dataflowJobFile option, you’ll need that in your cloud function.

If you chose to use the 2.x version of the libraries, you probably got an error. I did tell you not to, but you knew better, didn’t you? The command for 2.x looks like this:

 mvn compile exec:java \
 -Dexec.mainClass=com.shinesolutions.WordCount \
 -Dexec.args="--project=<your project> \
 --stagingLocation=gs://pipeline-staging \
 --templateLocation=gs://pipeline-staging/templates/WordCount \
 --runner=DataflowRunner"

I haven’t tested this with version 2.x, so don’t blame me if that command deletes the internet or something.

Template is done. What next?

Now you need your cloud function. They are written in javascript running on node.js, so you should probably install that. Then in a suitable directory, run npm init to set up a package.json file, which will tell Google what your dependencies are. It will ask you a lot of questions, but don’t stress about the answers, they’re not a big deal.

Our cloud function is going to talk to the dataflow api, so you’ll need to install that dependency. Run npm install --save googleapis to get that done. (Confusingly there are two versions of node.js support from google, the @google-cloud packages don’t support dataflow yet though). The cloud function looks like this:

const google = require('googleapis');

exports.goWithTheDataFlow = function(event, callback) {
 const file = event.data;
 if (file.resourceState === 'exists' && file.name) {
   google.auth.getApplicationDefault(function (err, authClient, projectId) {
     if (err) {
       throw err;
     }

     if (authClient.createScopedRequired && authClient.createScopedRequired()) {
       authClient = authClient.createScoped([
         'https://www.googleapis.com/auth/cloud-platform',
         'https://www.googleapis.com/auth/userinfo.email'
       ]);
     }

     const dataflow = google.dataflow({ version: 'v1b3', auth: authClient });

     dataflow.projects.templates.create({
       projectId: projectId,
       resource: {
         parameters: {
           inputFile: `gs://${file.bucket}/${file.name}`,
           outputFile: `gs://${file.bucket}/${file.name}-wordcount`
         },
         jobName: 'cloud-fn-dataflow-test',
         gcsPath: 'gs://pipeline-staging/templates/WordCount'
       }
     }, function(err, response) {
       if (err) {
         console.error("problem running dataflow template, error was: ", err);
       }
       console.log("Dataflow template response: ", response);
       callback();
     });

   });
 }
};

We’re going to trigger our cloud function from files being uploaded to a GCS bucket, so goWithTheDataFlow gets called with an event that has a few useful properties. The main one is event.data, which contains the information about the updated resource. We check if the resource exists (because we also get notifications of deletes from the bucket). Then we authenticate – because we’re a cloud function, application default auth is all set up for you – and create a dataflow API client. Make the call to our dataflow template and we are done. Easy.

Now we upload our function to Google’s cloud with a command that looks like this:

gcloud beta functions deploy goWithTheDataFlow \
  --stage-bucket cloud-fn-staging \
  --trigger-bucket incoming-files

If that all went ok, we should be good to test with a file.

Awesome. How do I know it’s working?

The cloud function logs go to Stackdriver Logging in your google cloud console. Upload a file to the bucket, and in a few seconds you should be able to see some output there. Any errors will appear here, and also trigger an email to you as well (by default anyway). You can see your dataflow pipeline in the usual Dataflow area of the cloud console, and you can see the output files (if you used the WordCount code from above) in the same bucket as your source file.

Great. What should I do now?

You’ll probably want to get rid of that test cloud function at some point. Do that with a command like this:

gcloud beta functions delete goWithTheDataFlow

Then make a cup of tea. Put your feet up, close your eyes, let your mind relax and just think about all those cronjobs you can delete. Imagine grinding them under your heel, and setting fire to the VMs they run on. Better now? Good.

gareth.jones@shinesolutions.com

I'm a Senior Developer at Shine, and my milkshake brings all the boys to the yard. I could teach you, but I'd have to charge. Contact Shine for prices.

9 Comments
  • Pingback:TEL monthly newsletter – March 2017 – Shine Solutions Group
    Posted at 11:01h, 06 April Reply

    […] Gareth Jones hates cron jobs. So he ditched them for a recent client project, and used Google’s new Cloud Functions (their version of AWS Lambdas) to trigger his Dataflow pipelines instead. You can read all about it here. […]

  • Pingback:Beam me up Google – porting your Dataflow applications from 1.x to 2.x – Shine Solutions Group
    Posted at 23:08h, 22 May Reply

    […] SDKs) to the new Apache Beam model (2.x Java SDK). Spoiler – it has something to do with this. It will also highlight the biggest changes we needed to make when making the switch (pretty much […]

  • KISHAN KUMAR
    Posted at 17:41h, 10 October Reply

    How to set a job dependency using cloud Functions in Google Dataflow, Means After Running One Template Another Template Must Be Triggered?

    • Gareth Jones
      Posted at 08:36h, 11 October Reply

      Hi, I would put a message on a queue to trigger the next pipeline – cloud pubsub would be great for this, and it can trigger cloud functions.

      • KISHAN KUMAR
        Posted at 14:55h, 11 October

        Thanks, Gareth But Can you please Publish An Article With an Example Explaining All Thing Which may help us….

      • KISHAN KUMAR
        Posted at 15:24h, 11 October

        I Will Wait For Your Response Please send Me Some Resource So i can learn From that and Implement or An Example From Where I can Understand the Concept because I new To Google Cloud and Still Studying But Required Work in My College project.

      • Gareth Jones
        Posted at 15:32h, 11 October

        I’m not going to write your college project for you, but this tutorial shows you how to write a cloud function that is triggered by a pub sub message. You’ll have to work out how to send the message for yourself (google “pubsub publish message”).

      • KISHAN KUMAR
        Posted at 16:08h, 11 October

        thanks For The Help

  • Matt Byrne
    Posted at 13:37h, 31 January Reply

    Great example, thanks! Note to others to beware that the version of googleapis that was used when this article was published was likely “22.2.0”. I was pulling my hair out wondering why I kept getting errors like “A Forbidden error was returned while attempting to retrieve an access token for the Compute Engine built-in service account”. Force an earlier version to get this working and then work your way up to version 25+.

    You can read migration guides to get back up to the latest version (I’m not there yet): https://github.com/google/google-api-nodejs-client/blob/master/MIGRATING.md

Leave a Reply

%d bloggers like this: