Do you have an unreasonable fear of cronjobs? Find spinning up VMs to be a colossal waste of your towering intellect? Does the thought of checking a folder regularly for updates fill you with an apoplectic rage? If so, you should probably get some help. Maybe find another line of work.

In the meantime, here’s one way to ease your regular file processing anxieties. With just one application of Google Cloud Functions, eased gently up your Dataflow Pipeline, you can find lasting relief from troublesome cronjobs.

But first, some assumptions.

Assumptions?

You’ve got gcloud installed, you’ve created a project, and you’ve enabled the dataflow and cloud functions APIs. If you haven’t, then a few minutes of reading the google docs should get you started.

I’m ready now.

We are going to set up a Google Cloud Function that will get called every time a cloud storage bucket gets updated. That function will kick off a dataflow pipeline (using the handy new templates) to process the file with whatever complicated data transformations somebody further up the food chain has specified.

Hang on, you’ve lost me.

Ok, maybe I should have been a bit more explicit in my assumptions. Here’s a quick cheat sheet:

  • Cloud functions – serverless little bits of code (like Amazon’s Lambdas)
  • Dataflow – a managed service for processing large amounts of data
  • Cloud storage – Google’s equivalent of S3

Ok, carry on, we’re good.

First step is to create a few buckets (you can call them whatever you want, but these are the names I’ve used in the rest of this article):

  • one for keeping the code for your cloud function – cloud-fn-staging
  • one for keeping the code for your pipeline – pipeline-staging
  • the bucket you want to monitor for updates – incoming-files

If you’re not sure how to create buckets, google’s docs can help. Or just start clicking around randomly in cloud console. You’ll work it out, I believe in you, champ.

Now what?

Now we’re going to make a Dataflow Template. If you’ve already got a pipeline you can use that one, but I mangled one of the example pipelines that Google gives you. There’s a little fiddly detail here: templates were introduced in version 1.9.0 of the dataflow java libraries, so you’ll need at least that version. However, if you go for the latest 2.x version (which is also the default version that the maven archetypes generate), the way to create your template changes. But the google docs haven’t been updated for that version.

To create the example pipeline I ran the following maven command:

mvn archetype:generate \
 -DarchetypeArtifactId=google-cloud-dataflow-java-archetypes-examples \
 -DarchetypeGroupId=com.google.cloud.dataflow \
 -DarchetypeVersion=1.9.0 \
 -DgroupId=com.shinesolutions \
 -DartifactId=dataflow-template-poc \
 -Dversion="0.1" \
 -DinteractiveMode=false \
 -Dpackage=com.shinesolutions

Notice the -DarchetypeVersion=1.9.0 option, which ensures that I’m using that version of the libs. Without that option you’ll end up with the 2.x versions (which I will explain how to use as well, don’t worry).

This generates some sample code, including the dataflow standard WordCount. I edited this to make it templatable. I also invented the word templatable, just now. You can use it, though, it’s cool. Here’s the important changes, first in the WordCountOptions interface (in the WordCount class):

public interface WordCountOptions extends PipelineOptions {
 @Description("Path of the file to read from")
 @Validation.Required
 ValueProvider<String> getInputFile();
 void setInputFile(ValueProvider<String> value);

@Description("Path of the file to write to")
 @Validation.Required
 ValueProvider<String> getOutputFile();
 void setOutputFile(ValueProvider<String> value);
}

Instead of the options just being String values, they are now ValueProvider types. This lets the Runner know that these values will be provided later.

The main method looks like this:

public static void main(String[] args) {
 WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
 .as(WordCountOptions.class);
 Pipeline p = Pipeline.create(options);

  p.apply(TextIO.Read.named("ReadLines").from(options.getInputFile()).withoutValidation())
 .apply(new CountWords())
 .apply(ParDo.of(new FormatAsTextFn()))
 .apply(TextIO.Write.named("WriteCounts").to(options.getOutputFile()).withoutValidation());

  p.run();
}

The important thing to note here is the .withoutValidation() modifier to the TextIO.Read. If you don’t do that your template won’t get created because TextIO will try to validate the option values before they’ve been supplied. We’re going to use TextIO in this example, but PubSub and BigQuery input/output in dataflow also support passing in options from templates.

To create our template, run this command:

 mvn compile exec:java \
 -Dexec.mainClass=com.shinesolutions.WordCount \
 -Dexec.args="--project=<your project> \
 --stagingLocation=gs://pipeline-staging \
 --dataflowJobFile=gs://pipeline-staging/templates/WordCount \
 --runner=TemplatingDataflowPipelineRunner"

If that succeeded, your template has been created. Remember the value you supplied for the dataflowJobFile option, you’ll need that in your cloud function.

If you chose to use the 2.x version of the libraries, you probably got an error. I did tell you not to, but you knew better, didn’t you? The command for 2.x looks like this:

 mvn compile exec:java \
 -Dexec.mainClass=com.shinesolutions.WordCount \
 -Dexec.args="--project=<your project> \
 --stagingLocation=gs://pipeline-staging \
 --templateLocation=gs://pipeline-staging/templates/WordCount \
 --runner=DataflowRunner"

I haven’t tested this with version 2.x, so don’t blame me if that command deletes the internet or something.

 

Template is done. What next?

Now you need your cloud function. They are written in javascript running on node.js, so you should probably install that. Then in a suitable directory, run npm init to set up a package.json file, which will tell Google what your dependencies are. It will ask you a lot of questions, but don’t stress about the answers, they’re not a big deal.

Our cloud function is going to talk to the dataflow api, so you’ll need to install that dependency. Run npm install --save googleapis to get that done. (Confusingly there are two versions of node.js support from google, the @google-cloud packages don’t support dataflow yet though). The cloud function looks like this:

const google = require('googleapis');

exports.goWithTheDataFlow = function(event, callback) {
 const file = event.data;
 if (file.resourceState === 'exists' && file.name) {
   google.auth.getApplicationDefault(function (err, authClient, projectId) {
     if (err) {
       throw err;
     }

     if (authClient.createScopedRequired && authClient.createScopedRequired()) {
       authClient = authClient.createScoped([
         'https://www.googleapis.com/auth/cloud-platform',
         'https://www.googleapis.com/auth/userinfo.email'
       ]);
     }

     const dataflow = google.dataflow({ version: 'v1b3', auth: authClient });

     dataflow.projects.templates.create({
       projectId: projectId,
       resource: {
         parameters: {
           inputFile: `gs://${file.bucket}/${file.name}`,
           outputFile: `gs://${file.bucket}/${file.name}-wordcount`
         },
         jobName: 'cloud-fn-dataflow-test',
         gcsPath: 'gs://pipeline-staging/templates/WordCount'
       }
     }, function(err, response) {
       if (err) {
         console.error("problem running dataflow template, error was: ", err);
       }
       console.log("Dataflow template response: ", response);
       callback();
     });

   });
 }
};

We’re going to trigger our cloud function from files being uploaded to a GCS bucket, so goWithTheDataFlow gets called with an event that has a few useful properties. The main one is event.data, which contains the information about the updated resource. We check if the resource exists (because we also get notifications of deletes from the bucket). Then we authenticate – because we’re a cloud function, application default auth is all set up for you – and create a dataflow API client. Make the call to our dataflow template and we are done. Easy.

Now we upload our function to Google’s cloud with a command that looks like this:

gcloud beta functions deploy goWithTheDataFlow \
  --stage-bucket cloud-fn-staging \
  --trigger-bucket incoming-files

If that all went ok, we should be good to test with a file.

Awesome. How do I know it’s working?

The cloud function logs go to Stackdriver Logging in your google cloud console. Upload a file to the bucket, and in a few seconds you should be able to see some output there. Any errors will appear here, and also trigger an email to you as well (by default anyway). You can see your dataflow pipeline in the usual Dataflow area of the cloud console, and you can see the output files (if you used the WordCount code from above) in the same bucket as your source file.

Great. What should I do now?

You’ll probably want to get rid of that test cloud function at some point. Do that with a command like this:

gcloud beta functions delete goWithTheDataFlow

Then make a cup of tea. Put your feet up, close your eyes, let your mind relax and just think about all those cronjobs you can delete. Imagine grinding them under your heel, and setting fire to the VMs they run on. Better now? Good.

Written by Gareth Jones

I'm a Senior Developer at Shine, and my milkshake brings all the boys to the yard. I could teach you, but I'd have to charge. Contact Shine for prices.

One comment

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s