Welcome to 3rd post on Cascading. Hope you liked the previous tutorials. Here are Part 1 and Part 2 if you haven’t gone through them. In this post, we will learn about Filter. It will help you to understand basics about Filter.

Filter, as the name suggests is used for data filtering. It can be used to remove unwanted data from data pool. Filter is among the four Operations provided in Cascading. We will learn about other Operations in future posts.

Before starting with code, let’s understand some terminologies first. These are the basic terms in Cascading. Anyone interested in Cascading should be clear about these terms. We will learn about more terminologies in the related posts which I will be posting time to time.

Tuple

Tuple is a data stream. Each stream of data flowing in Pipe is called a Tuple. It is similar to single record in database. Addressing is done by position. Position is 1 indexed, i.e position starts from 1.

TupleEntry

TupleEntry is also a data stream like Tuple but it is indexed by names, i.e we can use Field names to address data in TupleEntry. It is similar to a single record plus header in database.

Each Pipe

Each is one of the Pipes in Cascading. Any Operation applied in Each Pipe applies to all tuples in data at an individual level. If we have 1000 rows of data and we apply any Operation inside Each, then it runs individually for all 1000 rows. Operations like Filter and Function are applied via Each Pipe.

Now comes the coding part. We will use the same text file used in previous examples. You can find the text file here.

Implementation Part: CascadingFilter.java

package tutorials.cascading;

import cascading.flow.Flow;
import cascading.flow.FlowDef;
import cascading.flow.FlowProcess;
import cascading.flow.hadoop.HadoopFlowConnector;
import cascading.operation.BaseOperation;
import cascading.operation.Debug;
import cascading.operation.Filter;
import cascading.operation.FilterCall;
import cascading.pipe.Each;
import cascading.pipe.Pipe;
import cascading.scheme.hadoop.TextDelimited;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tap.hadoop.Hfs;
import cascading.tuple.Fields;
import cascading.tuple.TupleEntry;

public class CascadingFilter {
    //defining delimiters and file locations
    private static String DELIMITER_SOURCE = ",";
    private static String DELIMITER_SINK = "|";
    private static String FILE_SOURCE = "cascading_intro_file.csv";
    private static String FILE_SINK = "cascading_intro_file_sink.csv";
    private static FlowDef flowDef = new FlowDef();

    public static void main(String[] args) {
        //fields that match our data layout
        Fields sourceFields = new Fields("id", "first_name", "last_name", "email", "gender", "salary");

        //source and sink taps definition
        Tap sourceTap = new Hfs(new TextDelimited(sourceFields, true, DELIMITER_SOURCE), FILE_SOURCE);
        Tap sinkTap = new Hfs(new TextDelimited(sourceFields, false, DELIMITER_SINK), FILE_SINK, SinkMode.REPLACE);

        Pipe pipe = new Pipe("personData");

        //pipe debug before filter
        pipe = new Each(pipe, new Debug("beforeFilter", true));

        //filter calls
        pipe = new Each(pipe, new SalaryFilter());
        //pipe = new Each(pipe,new Fields("gender", "first_name"), new GenderFilter("female"));

        //pipe debug after filter
        pipe = new Each(pipe, new Debug("afterFilter", true));

        //completing the flow
        flowDef.addSource(pipe, sourceTap);
        flowDef.addTailSink(pipe, sinkTap);

        Flow flow = new HadoopFlowConnector().connect(flowDef);
        flow.complete();
    }

    /*
    This filter will filter records with less than 50000 salary
     */
    private static class SalaryFilter extends BaseOperation implements Filter {
        SalaryFilter() {
            super(Fields.ALL);
        }

        @Override
        public boolean isRemove(FlowProcess flowProcess, FilterCall filterCall) {
            TupleEntry incomingEntry = filterCall.getArguments();
            return (incomingEntry.getDouble("salary") < 50000);
        }
    }

    /*
    This filter will filter records based on gender
     */
    private static class GenderFilter extends BaseOperation implements Filter {
        String genderToKeep;

        GenderFilter(String genderToKeep) {
            super(2);
            this.genderToKeep = genderToKeep;
        }

        @Override
        public boolean isRemove(FlowProcess flowProcess, FilterCall filterCall) {
            TupleEntry incomingEntry = filterCall.getArguments();

            System.out.println("incomingEntry = " + incomingEntry);

            return !(incomingEntry.getString("gender").equalsIgnoreCase(this.genderToKeep));
        }
    }

}

In the above code, two filters are created. SalaryFilter is applied in this example and GenderFilter is commented out. You can run this code either by including GenderFilter only or both filters. You can verify the result by seeing in the Debug statement before and after applying the filter. All Filters should extend BaseOperation class and implment Filter interface. isRemove is the function that should be overriden. Fields.ALL denotes that all fields in the tuple stream are expected.

All the tuples for which isRemove returns True will be removed and all the tuples for which it removes False will be retained. 

Explanation: SalaryFilter

In SalaryFilter, we have filtered all the rows with salary less than 50000. Code is quite simple. We define a TupleEntry instance and extract salary field from it. Then we compare the value and return either True or False. All rows with salary less than 50000 are removed with this filter. TupleEntry can be checked by printing using sout.

Explanation: GenderFilter

There are some differences in GenderFilter however. We have passed the value on the basis of which we want to filter our data. Look at the following line:

pipe = new Each(pipe,new Fields("gender"), new GenderFilter("female"));

Here gender and first_name Fields have been passed as arguments to the Filter and the value female is the value that is to be checked. We have called super(2) because 2 arguments are expected i.e gender and first_name Fields in this case. We cannot extract the value of any other Field inside Filter because we have passed only gender and first_name while calling the Filter. In the Filter, value of gender is checked(whether it is equal with the passed value or not) and Boolean is returned on that basis, which filters the data.

Hope this tutorial was useful. Please share and like if you think it could be useful. Drop comments for suggestions and questions. See you in the next one. Cheers 🙂

Advertisements