Tuesday, September 20, 2016

Custom Record Delimiter in Apache Pig


There would be situations where in your big data project you need to ingest data files with custom new line delimiter. In Pig in particular, PigStoarge class implements "\n" as the default record delimiter for data loading. But if your datafiles contain "\n" as part of data fields, it is imperative to either curate the data files and remove the "\n" characters from the data or create a new PigStorage class to consume the datafile with a custom record limiter.

Recommended steps to implement custom record delimiter in Pig

1. Create a custom InputFomat build from a custom record reader object
2. Create a custom PigStorage class using the newly created Inputformat
3. Use the new PigStorage class to load the data file with custom record delimiter

1. Custom InputFormat Java Class - This piece of code implements a custom InputFormat with ctrl-b or '\u0002' as the new record delimiter.

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;

public class LoaderInputFormat extends FileInputFormat<LongWritable, Text> {
  @Override
  public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) {
    String delimiter = "\u0002";
    byte[] recordDelimiterBytes = null;
    if (null != delimiter)
      recordDelimiterBytes = delimiter.getBytes();
    return new LineRecordReader(recordDelimiterBytes);
  }
  @Override
  protected boolean isSplitable(JobContext context, Path file) {
    CompressionCodec codec =
      new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
    return codec == null;
  }
}

2. Custom PigStoage Class -

import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.pig.builtin.PigStorage;

public class NewPigStorage extends PigStorage{
     public NewPigStorage(String delimiter){
       super(delimiter);
    }
       public NewPigStorage(){
              super();
       }
       @SuppressWarnings("rawtypes")
       @Override
       public InputFormat getInputFormat() {
              return new LoaderInputFormat();
       }
}

3. Using the custom Pigstorage in Pig -

         grunt>  recs  =  load  '<data-file>' using NewPigStorage(<column delimiter>) as ( <record structure>)
         # verify the records structure
  grunt> dump recs;

No comments:

Post a Comment