cascadingHey guys, welcome to the 4th post on Cascading. It’s been a long time since my last post on filter. In this post, I am going to write about Functions. Functions are among the most powerful tools to play with data in Cascading. They are about applying operations on an individual tuple of data, just like operating on a single record in a database.

For this tutorial, you will need to understand the terms like Tuple, TupleEntry and Each Pipe. You can visit my earlier post on filter to know about these terms or you may also visit the official site of cascading. Cascading itself provides a very good explanation of Function over here. I will be talking about functions on a fundamental level and will explain about things that I had found hard to understand.

Introduction

A function is an operation of Cascading that reads a Tuple and can output one or multiple Tuples. It is used with an Each Pipe and needs to declare an outputSelector. Function is applied to an individual tuple and can return any number of output tuples.

Creating a Function

To create a Function, write a java class that extends BaseOperation and implements Function. You need to override operate method from Function interface and prepare and cleanup methods from BaseOperation according to your needs.

operate()

operate() is a method that runs over all the tuples fed into a function. Suppose you have a data with 1000 tuples then operate method will run 1000 times. Data transformation operations are done in this method.

prepare()

This is a method from BaseOperation. It runs once for every function. So we can use this for initialization tasks or for tasks that are common to every tuple.

cleanup()

This method also runs once for every function and can be used for finalization/cleanup tasks.

Example

In this example, I will write some functions and provide an explanation for them. All code is available here.

pipe = new Each(pipe, new EmailChangeFunction(EmailChangeFunction.Domains.GOOGLE), Fields.REPLACE);
@Override
public void operate(FlowProcess flowProcess, FunctionCall functionCall) {
    TupleEntry entry = functionCall.getArguments();
    TupleEntry outEntry = new TupleEntry(entry);

    //extracting old and new email
    String initialEmail = entry.getString("email");
    String initialDomain = initialEmail.split("\\@")[1].split("\\.")[0];
    String finalEmail = initialEmail.replace(initialDomain, toDomain.getDomain());

    //setting new email
    outEntry.setString("email", finalEmail);

    functionCall.getOutputCollector().add(outEntry);

}

First code is a call to the function and the second one is the operate method of the function. Here, old email is read and it is replaced by the domain passed by the user which is then set in the output entry and returned from the tuple. You can return any number of tuples you want by calling

Here, we extract the TupleEntry from functionCall and use that to initialize the output TupleEntry. Never use the same TupleEntry as input and output as it will be an unmodifiable tuple. You can return any number of tuples you want by calling

functionCall.getOutputCollector().add(outEntry);

any number of times.

Dissecting super()

super(Fields.ARGS) represents declared fields. They are the fields returned by function in each tuple. You can specify the number of arguments expected by function and the fields that would be returned by the function. For example: If you wish to pass only first name, last name and email to the function and return only first name and email, then the code would look like this.

calling part

pipe = new Each(pipe, new Fields("first_name", "last_name", "email"), new EmailChangeFunction(EmailChangeFunction.Domains.GOOGLE, ""), Fields.RESULTS);

in function part just change the constructor

    EmailChangeFunction(Domains newDomain, String s) {
        super(3, new Fields("first_name", "email"));
        toDomain = newDomain;
    }

Here, 3 means the number of input arguments it expects and first_name and email are the fields it’ll output. I have used String in constructor only to differentiate the constructor. It will be helpful to read only required arguments from input in the large dataset as it will be somewhat faster than feeding the whole dataset.

Dissecting prepare()

Just try printing something in prepare function and run your code. You’ll see that it will get executed only once. This approach can be used whenever there is some task common to all the tuples. Such part can be written in prepare method.

This much on function for now. Please leave comments if you need to communicate about something. Hope to see you in the next post. Till then.. Cheers !!!

Advertisements