Tuesday, September 20, 2016

Custom Record Delimiter in Apache Pig


There would be situations where in your big data project you need to ingest data files with custom new line delimiter. In Pig in particular, PigStoarge class implements "\n" as the default record delimiter for data loading. But if your datafiles contain "\n" as part of data fields, it is imperative to either curate the data files and remove the "\n" characters from the data or create a new PigStorage class to consume the datafile with a custom record limiter.

Recommended steps to implement custom record delimiter in Pig

1. Create a custom InputFomat build from a custom record reader object
2. Create a custom PigStorage class using the newly created Inputformat
3. Use the new PigStorage class to load the data file with custom record delimiter

1. Custom InputFormat Java Class - This piece of code implements a custom InputFormat with ctrl-b or '\u0002' as the new record delimiter.

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;

public class LoaderInputFormat extends FileInputFormat<LongWritable, Text> {
  @Override
  public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) {
    String delimiter = "\u0002";
    byte[] recordDelimiterBytes = null;
    if (null != delimiter)
      recordDelimiterBytes = delimiter.getBytes();
    return new LineRecordReader(recordDelimiterBytes);
  }
  @Override
  protected boolean isSplitable(JobContext context, Path file) {
    CompressionCodec codec =
      new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
    return codec == null;
  }
}

2. Custom PigStoage Class -

import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.pig.builtin.PigStorage;

public class NewPigStorage extends PigStorage{
     public NewPigStorage(String delimiter){
       super(delimiter);
    }
       public NewPigStorage(){
              super();
       }
       @SuppressWarnings("rawtypes")
       @Override
       public InputFormat getInputFormat() {
              return new LoaderInputFormat();
       }
}

3. Using the custom Pigstorage in Pig -

         grunt>  recs  =  load  '<data-file>' using NewPigStorage(<column delimiter>) as ( <record structure>)
         # verify the records structure
  grunt> dump recs;

Sunday, September 18, 2016

Join optimization in Apache Pig

Join optimization in Apache Pig

In traditional Hadoop world, Apache Pig plays a crucial role in establishing the data pipe line . Pig supports a variety of use friendly constructs and operators that enables data ingestion, transformation and storage of the data passing through a batch process. It gives the developers the power  to orchastrate the data flow in a seamless sequence of steps that could mimic equivalent sql functions like join, filter, group, order by and many other such tasks. In doing so it hides the low level abstraction of Map reduce.

In Pig we could join datasets in couple of different ways

1. Reduce side join
2. Mapside join or Replicate join

Reduce side join -

This is the default join approach used inside Pig when you join 2 or more relations. This is also known as shuffle join. In a typical Map Reduce life cycle, as the datasets flow from inputsplit, mappers to reducers, the join of the datasets will happen inside the reducer nodes. And this makes sense as all the similar keys end up in the same reducer node. But on the contrary, this is the most inefficient type of join in mapreduce. The reason being, underlying data has to traverse the full life cycle before the required fields get projected or reduced. It has to bear the overhead of IO to temp local files, data movement over the network and sorting operation on memory spilled over disk.

Mapside join or Replicated join -

Replicated join is a specialized type of join that works well when one of the joining datasets is small enough to into the memory. In such situations Map Reduce can perform the join on the mappers and reduces the overhead of IO and network trafiic during the subsequent stages.

big = LOAD 'big_data' AS (b1,b2,b3);
tiny = LOAD 'tiny_data' AS (t1,t2,t3);
mini = LOAD 'mini_data' AS (m1,m2,m3);
C = JOIN big BY b1, tiny BY t1, mini BY m1 USING 'replicated';

Both the smaller relations in the above join must fit into the memory for the join to execute successfully. Otherwise error will be generated