Hive: Write custom serde

Suppose we have input file like below:
$ vi uwserde
This input file consist of sessionid and timestamp as comma-separated value. Assuming this I wrote a WritableComparable as below:

package hive;


public class UserWritable implements WritableComparable<UserWritable>  {
       private Text sessionID;
       private Text timestamp;

       public UserWritable() {
              set(new Text(), new Text());
       public void set(Text sessionID, Text timestamp) {
              this.sessionID = sessionID;
              this.timestamp = timestamp;

       public Text getSessionID() {
              return sessionID;
       public Text getTimestamp() {
              return timestamp;
       public void readFields(DataInput din) throws IOException {

       public void write(DataOutput dout) throws IOException {

       public int hashCode() {
              return sessionID.hashCode()*163 + timestamp.hashCode();
       public int compareTo(UserWritable uw) {
              int cmp = compare(sessionID,uw.sessionID); //sessionID.compareTo(uw.sessionID);
              if(cmp != 0){
                     return cmp;
                     long meT = Long.parseLong(timestamp.toString());
                     long cmpT = Long.parseLong(uw.timestamp.toString());
                     return compare(new LongWritable(meT),new LongWritable(cmpT));//new LongWritable(meT).compareTo(new LongWritable(cmpT));
              }catch(Exception e){
                     throw new RuntimeException("Error in comparing long timestamp.",e);

       public static int compare(Text a , Text b){
              return a.compareTo(b);
       public static int compare(LongWritable a , LongWritable b){
              return a.compareTo(b);
       public boolean equals(Object o) {
              if (o instanceof UserWritable) {
                     UserWritable tp = (UserWritable) o;
                     return sessionID.equals(tp.sessionID) && timestamp.equals(tp.timestamp);
              return false;

Now let us write InputFormat class with LongWritable key and UserWritable as value. Also, implement RecordReader to read file line by line.

package hive;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobConfigurable;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;

public class UWInputFormat extends FileInputFormat<LongWritable, UserWritable> implements JobConfigurable{

       JobConf conf;
       public void configure(JobConf conf) {
              this.conf = conf;         
       public org.apache.hadoop.mapred.RecordReader<LongWritable, UserWritable> getRecordReader(
                     org.apache.hadoop.mapred.InputSplit paramInputSplit,
                     JobConf paramJobConf, Reporter paramReporter) throws IOException {
              return new UWRecordReader(paramJobConf, (FileSplit) paramInputSplit);

        public static class UWRecordReader implements RecordReader<LongWritable,UserWritable> {
              private FileSplit fileSplit;
              private Configuration conf;
              private boolean processed =false;
              private FSDataInputStream fileIn;
              private long pos;
              public UWRecordReader(Configuration job,
                   FileSplit split) throws IOException {
                     this.fileSplit =  split;
                     this.conf  = job;
                     Path file = fileSplit.getPath();

                     FileSystem fs = file.getFileSystem(conf);
                     this.fileIn =;

              public LongWritable createKey() {
                     return new LongWritable();
              public UserWritable createValue() {
                     // TODO Auto-generated method stub
                     return new UserWritable();
              public long getPos() throws IOException {
                     return pos;
              public float getProgress() throws IOException {
                     return processed ? 1.0f : 0.0f;
              public boolean next(LongWritable paramK, UserWritable paramV)
                           throws IOException {
                           pos = fileIn.getPos();
                           String val = fileIn.readLine();
                           if(val==null || val.trim().equals("")){
                                  processed = true;
                                  return false;
                           String [] arr = val.split(",");
                           paramV.set(new Text(arr[0]), new Text(arr[1]));
                           return true;
                     return false;

              public void close() throws IOException {

Now let us write Serializer-Deserializer. In below example I have only covered deserialize option which will get UserWritable object for each line read by RecordReader. 

package hive;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.serde.Constants;
import org.apache.hadoop.hive.serde2.SerDe;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.SerDeStats;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;

public class UWSerde implements SerDe{

       private int numColumns;
       private List<String> columnNames;
       private ArrayList<Object> row;
       private StructObjectInspector rowOI;
       private List<TypeInfo> columnTypes;
       public Object deserialize(Writable arg0) throws SerDeException {
              UserWritable rowKey = (UserWritable) arg0;
              // Loop over columns in table and set values
              String colName;
              Object value;
              for (int c = 0; c < numColumns; c++) {
                     colName = columnNames.get(c);
                     TypeInfo ti = columnTypes.get(c);
                           row.set(c, rowKey.getSessionID().toString());
                           row.set(c, rowKey.getTimestamp().toString());
              return row;

       public ObjectInspector getObjectInspector() throws SerDeException {
              return rowOI;

       public SerDeStats getSerDeStats() {
              // TODO Auto-generated method stub
              return null;

       public void initialize(Configuration sysProps, Properties tblProps)
                     throws SerDeException {
              String columnNameProperty = tblProps.getProperty(Constants.LIST_COLUMNS);
              columnNames = Arrays.asList(columnNameProperty.split(","));
              String columnTypeProperty = tblProps.getProperty(Constants.LIST_COLUMN_TYPES);
              columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);
              assert columnNames.size() == columnTypes.size();
              numColumns = columnNames.size();
              List<ObjectInspector> columnOIs = new ArrayList<ObjectInspector>(
              ObjectInspector oi;
              for (int c = 0; c < numColumns; c++) {
                     oi = TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(columnTypes.get(c));
              rowOI = ObjectInspectorFactory.getStandardStructObjectInspector(columnNames, columnOIs);
              // Create an empty row object to be reused during deserialization
              row = new ArrayList<Object>(numColumns);
              for (int c = 0; c < numColumns; c++) {

       public Class<? extends Writable> getSerializedClass() {
              return Text.class;

       public Writable serialize(Object arg0, ObjectInspector arg1)
                     throws SerDeException {
              // TODO Auto-generated method stub
              return null;


Create Jar of above code named as "hadoop-examples.jar" and open hive cli.

hive> add jar /root/hadoop-examples.jar;

hive> create table uw(
    > sessionid string,
    > timestamp string )
    > ROW FORMAT SERDE 'hive.UWSerde'
    > stored as
    > inputformat 'hive.UWInputFormat'
    > outputformat ''
    > ;
Time taken: 9.5 seconds

hive> load data local inpath '/root/uwserde' into table uw;
Copying data from file:/root/uwserde
Copying file: file:/root/uwserde
Loading data to table default.uw
Table default.uw stats: [numFiles=1, numRows=0, totalSize=40, rawDataSize=0]
Time taken: 6.565 seconds

hive> select * from uw;
kiju1233        1234567890
huhuhuhu        1233330987
Time taken: 2.689 seconds, Fetched: 2 row(s)


