Welcome to 2nd post on Cascading.

In this post, we will see how to write cascading sink to database. We will write to MySQL in this example. If you want to know basics of cascading flow, then please visit here. In previous example, we simply created a Cascading flow without any data operations. We will use the same example file used in last example. Here, we will count total males and females and write the result to file as well as database. In short, we will be using two sink taps and one source.

While writing to database, we need to finalize some prerequisites. Table name, databse url, username and password are needed while writing to database. Similarly, the Field name and type should match the column name and type in database. For example varchar column first_name in database should have String.class as type and first_name as name in code.

The database details which I have used are:

  • username: root
  • password: root
  • database name: tutorials
  • table name: emp_count

Code to create this table:

CREATE TABLE `emp_count` (
 `id` int(20) NOT NULL AUTO_INCREMENT,
 `gender` varchar(20) DEFAULT NULL,
 `count` int(20) DEFAULT NULL,
 PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=latin1

Implementation part: CascadingToDb.java

package tutorials.cascading;

import cascading.flow.Flow;
import cascading.flow.FlowDef;
import cascading.flow.hadoop.HadoopFlowConnector;
import cascading.jdbc.JDBCFactory;
import cascading.jdbc.JDBCScheme;
import cascading.operation.Debug;
import cascading.pipe.Each;
import cascading.pipe.Pipe;
import cascading.pipe.assembly.CountBy;
import cascading.scheme.hadoop.TextDelimited;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tap.hadoop.Hfs;
import cascading.tuple.Fields;

import java.lang.reflect.Type;
import java.util.Properties;

public class CascadingToDb {
    private static final String DRIVER_NAME = "com.mysql.jdbc.Driver";
    private static final String TABLE_NAME = "emp_count";
    private static final String DB_USER = "root";
    private static final String DB_PASS = "root";
    private static final String DB_URL = "jdbc:mysql://localhost/tutorials";
    private static Fields sinkFields;

    //defining delimiters and file locations
    private static String DELIMITER_SOURCE = ",";
    private static String DELIMITER_SINK = "|";
    private static String FILE_SOURCE = "cascading_intro_file.csv";
    private static String FILE_SINK = "cascading_intro_file_sink.csv";
    private static FlowDef flowDef = new FlowDef();

    public static void main(String[] args) {
        //fields that match our data layout
        Fields sourceFields = new Fields("id", "first_name", "last_name", "email", "gender", "salary");
        sinkFields = new Fields(new Comparable[]{"gender", "count"}, new Type[]{String.class, Integer.class});

        //source and sink taps definition
        Tap sourceTap = new Hfs(new TextDelimited(sourceFields, true, DELIMITER_SOURCE), FILE_SOURCE);
        Tap sinkTap = new Hfs(new TextDelimited(sinkFields, false, DELIMITER_SINK), FILE_SINK, SinkMode.REPLACE);

        Pipe pipe = new Pipe("personData");

        //counting each kind of gender
        pipe = new CountBy(pipe, new Fields("gender"), new Fields("count"));

        pipe = new Each(pipe, new Debug("pipeLayout", true));
        Pipe pipe_db = new Pipe("dbPipe", pipe);

        //completing the flow by adding multiple sinks
        flowDef.addSource(pipe, sourceTap);
        flowDef.addTailSink(pipe, sinkTap);
        flowDef.addTailSink(pipe_db, getJDBCSinkTap());

        Flow flow = new HadoopFlowConnector().connect(flowDef);
        flow.complete();
    }

    private static Tap getJDBCSinkTap() {
        Properties schemeProps = new Properties();
        Properties tapProps = new Properties();

        //setting properties required by JDBCFactory
        tapProps.put(JDBCFactory.PROTOCOL_JDBC_USER, DB_USER);
        tapProps.put(JDBCFactory.PROTOCOL_JDBC_PASSWORD, DB_PASS);
        tapProps.setProperty(JDBCFactory.PROTOCOL_TABLE_NAME, TABLE_NAME);
        tapProps.setProperty(JDBCFactory.PROTOCOL_JDBC_DRIVER, DRIVER_NAME);

        tapProps.put(JDBCFactory.PROTOCOL_PRIMARY_KEYS, "emp_count_id");
        JDBCFactory factory = new JDBCFactory();

        //creating JDBC sceheme
        JDBCScheme updateScheme = (JDBCScheme) factory.createScheme("genderWriter", sinkFields, schemeProps);
        return factory.createTap("jdbc", updateScheme, DB_URL, SinkMode.UPDATE, tapProps);
    }

}

Most part of the code is similar to first example. Visit here if you do not get it. First Major difference from previous example:

//counting each kind of gender
pipe = new CountBy(pipe, new Fields("gender"), new Fields("count"));

pipe = new Each(pipe, new Debug("pipeLayout", true));
Pipe pipe_db = new Pipe("dbPipe", pipe);

//completing the flow by adding multiple sinks
flowDef.addSource(pipe, sourceTap);
flowDef.addTailSink(pipe, sinkTap);
flowDef.addTailSink(pipe_db, getJDBCSinkTap());

In above code, we have used a class named CountBy which is a subclass of AggregateBy. We will cover more on these classes in later examples. But for now, you just need to understand that CountBy instance will do counting on the basis of Fields provided. In this case, it will count the values in gender Field and store them in count Field. Please note that count Field will be inserted automatically by CountBy itself.

Once counting is done, we need to create another pipe to bind to JDBC Tap. In this case, pipe_db will be the Pipe that will be bound to JDBC tap. It is because, same Pipe cannot be added to multiple sinks.

Second Major difference:

 private static Tap getJDBCSinkTap() {
        Properties schemeProps = new Properties();
        Properties tapProps = new Properties();

        //setting properties required by JDBCFactory
        tapProps.put(JDBCFactory.PROTOCOL_JDBC_USER, DB_USER);
        tapProps.put(JDBCFactory.PROTOCOL_JDBC_PASSWORD, DB_PASS);
        tapProps.setProperty(JDBCFactory.PROTOCOL_TABLE_NAME, TABLE_NAME);
        tapProps.setProperty(JDBCFactory.PROTOCOL_JDBC_DRIVER, DRIVER_NAME);

        tapProps.put(JDBCFactory.PROTOCOL_PRIMARY_KEYS, "emp_count_id");
        JDBCFactory factory = new JDBCFactory();

        //creating JDBC sceheme
        JDBCScheme updateScheme = (JDBCScheme) factory.createScheme("genderWriter", sinkFields, schemeProps);
        return factory.createTap("jdbc", updateScheme, DB_URL, SinkMode.UPDATE, tapProps);
    }

This method simply creates the JDBC tap we need to use to write our data. In this method, nothing much is done but still it is the most important part. We have set the required properties for JDBC Scheme and used that scheme to return the sink Tap. Different database details like username, password, table name, database url and primary key are set here. Once these properties are set, we can use this scheme and write to database. On running this program, your table will look like:

id gender count
1 Female 11
2 Male 12

One thing to note here is that, SinkMode.UPDATE will cause the table to update everytime we run this code. SinkMode.REPLACE will erase the table first. So, the safest way is to write some kind of safe-check in code if you want only the current data to be written. You can at first delete the contents of code or delete the tables to be deleted or anything you prefer.

Hope this post was useful. Please drop comments for questions and suggestions. See you in the next one. 🙂

Advertisements