Suppose we have input file like below:
Create Jar of above code named as "hadoop-examples.jar" and open hive cli.
$ vi uwserde
kiju1233,1234567890
huhuhuhu,1233330987
…
…
…
|
This input file consist of sessionid and timestamp as comma-separated value. Assuming this I wrote a WritableComparable as below:
package hive;
import java.io.DataInput;
import
java.io.DataOutput;
import
java.io.IOException;
import
org.apache.hadoop.io.LongWritable;
import
org.apache.hadoop.io.Text;
import
org.apache.hadoop.io.WritableComparable;
public class UserWritable implements
WritableComparable<UserWritable> {
private Text sessionID;
private Text timestamp;
public UserWritable() {
set(new Text(), new Text());
}
public void set(Text
sessionID, Text timestamp) {
this.sessionID = sessionID;
this.timestamp = timestamp;
}
public Text getSessionID() {
return sessionID;
}
public Text getTimestamp() {
return timestamp;
}
@Override
public void
readFields(DataInput din) throws IOException {
sessionID.readFields(din);
timestamp.readFields(din);
}
@Override
public void write(DataOutput
dout) throws IOException {
sessionID.write(dout);
timestamp.write(dout);
}
@Override
public int hashCode() {
return sessionID.hashCode()*163 + timestamp.hashCode();
}
@Override
public int compareTo(UserWritable uw) {
int cmp = compare(sessionID,uw.sessionID); //sessionID.compareTo(uw.sessionID);
if(cmp != 0){
return cmp;
}
try{
long meT = Long.parseLong(timestamp.toString());
long cmpT = Long.parseLong(uw.timestamp.toString());
return compare(new LongWritable(meT),new
LongWritable(cmpT));//new LongWritable(meT).compareTo(new LongWritable(cmpT));
}catch(Exception e){
e.printStackTrace();
throw new RuntimeException("Error in
comparing long timestamp.",e);
}
}
public static int compare(Text a ,
Text b){
return a.compareTo(b);
}
public static int
compare(LongWritable a , LongWritable b){
return a.compareTo(b);
}
@Override
public boolean equals(Object o) {
if (o instanceof UserWritable) {
UserWritable tp = (UserWritable) o;
return sessionID.equals(tp.sessionID) && timestamp.equals(tp.timestamp);
}
return false;
}
}
|
Now let us write InputFormat class with LongWritable key and UserWritable as value. Also, implement RecordReader to read file line by line.
package hive;
import
java.io.IOException;
import
org.apache.hadoop.conf.Configuration;
import
org.apache.hadoop.fs.FSDataInputStream;
import
org.apache.hadoop.fs.FileSystem;
import
org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.Seekable;
import
org.apache.hadoop.io.LongWritable;
import
org.apache.hadoop.io.Text;
import
org.apache.hadoop.mapred.FileInputFormat;
import
org.apache.hadoop.mapred.FileSplit;
import
org.apache.hadoop.mapred.JobConf;
import
org.apache.hadoop.mapred.JobConfigurable;
import
org.apache.hadoop.mapred.RecordReader;
import
org.apache.hadoop.mapred.Reporter;
public class UWInputFormat extends FileInputFormat<LongWritable,
UserWritable> implements JobConfigurable{
JobConf conf;
@Override
public void configure(JobConf
conf) {
this.conf = conf;
}
@Override
public
org.apache.hadoop.mapred.RecordReader<LongWritable, UserWritable>
getRecordReader(
org.apache.hadoop.mapred.InputSplit
paramInputSplit,
JobConf paramJobConf,
Reporter paramReporter) throws IOException {
paramReporter.setStatus(paramInputSplit.toString());
return new
UWRecordReader(paramJobConf, (FileSplit) paramInputSplit);
}
public static class UWRecordReader implements
RecordReader<LongWritable,UserWritable> {
private FileSplit fileSplit;
private Configuration conf;
private boolean processed =false;
private FSDataInputStream fileIn;
private long pos;
public
UWRecordReader(Configuration job,
FileSplit split) throws IOException {
this.fileSplit = split;
this.conf = job;
Path file = fileSplit.getPath();
FileSystem fs =
file.getFileSystem(conf);
this.fileIn = fs.open(file);
}
@Override
public LongWritable
createKey() {
return new LongWritable();
}
@Override
public UserWritable
createValue() {
// TODO Auto-generated
method stub
return new UserWritable();
}
@Override
public long getPos() throws IOException {
return pos;
}
@Override
public float getProgress() throws IOException {
return processed ? 1.0f : 0.0f;
}
@Override
public boolean next(LongWritable
paramK, UserWritable paramV)
throws IOException {
if(!processed){
pos = fileIn.getPos();
paramK.set(pos);
String val = fileIn.
if(val==null ||
val.trim().equals("")){
processed = true;
return false;
}
String [] arr =
val.split(",");
paramV.set(new Text(arr[0]), new Text(arr[1]));
return true;
}
return false;
}
@Override
public void close() throws IOException {
fileIn.close();
}
|
Now let us write Serializer-Deserializer. In below example I have only covered deserialize option which will get UserWritable object for each line read by RecordReader.
package hive;
import
java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import
java.util.Properties;
import
org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.ql.io.
import org.apache.hadoop.hive.serde.
import org.apache.hadoop.hive.serde2.
import
org.apache.hadoop.hive.serde2.SerDeException;
import
org.apache.hadoop.hive.serde2.SerDeStats;
import
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import
org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import
org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import
org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import
org.apache.hadoop.io.Text;
import
org.apache.hadoop.io.Writable;
public class UWSerde implements
private int numColumns;
private List<String> columnNames;
private ArrayList<Object> row;
private StructObjectInspector rowOI;
private List<TypeInfo> columnTypes;
@Override
public Object deserialize(Writable arg0) throws SerDeException {
UserWritable rowKey =
(UserWritable) arg0;
// Loop over columns in table and
set values
String colName;
Object value;
for (int c = 0; c < numColumns; c++) {
colName = columnNames.get(c);
TypeInfo ti = columnTypes.get(c);
if(colName.contains("session"))
row.set(c,
rowKey.getSessionID().toString());
else
row.set(c,
rowKey.getTimestamp().toString());
}
return row;
}
@Override
public ObjectInspector
getObjectInspector() throws SerDeException {
return rowOI;
}
@Override
public SerDeStats getSerDeStats() {
// TODO Auto-generated
method stub
return null;
}
@Override
public void
initialize(Configuration sysProps, Properties tblProps)
throws SerDeException {
String columnNameProperty =
tblProps.getProperty(
columnNames = Arrays.asList(columnNameProperty.split(","));
String columnTypeProperty =
tblProps.getProperty(
columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);
assert columnNames.size() == columnTypes.size();
numColumns = columnNames.size();
List<ObjectInspector>
columnOIs = new ArrayList<ObjectInspector>(
columnNames.size());
ObjectInspector oi;
for (int c = 0; c < numColumns; c++) {
oi = TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(columnTypes.get(c));
columnOIs.add(oi);
}
rowOI =
ObjectInspectorFactory.getStandardStructObjectInspector(columnNames, columnOIs);
// Create an empty row object to
be reused during deserialization
row = new
ArrayList<Object>(numColumns);
for (int c = 0; c < numColumns; c++) {
row.add(null);
}
}
@Override
public Class<? extends Writable>
getSerializedClass() {
return Text.class;
}
@Override
public Writable serialize(Object arg0,
ObjectInspector arg1)
throws SerDeException {
// TODO Auto-generated
method stub
return null;
}
}
|
Create Jar of above code named as "hadoop-examples.jar" and open hive cli.
hive> add jar /root/hadoop-examples.jar;
hive> create table uw(
> sessionid string,
> timestamp string )
> ROW FORMAT SERDE
'hive.UWSerde'
> stored as
> inputformat
'hive.UWInputFormat'
> outputformat
'org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat'
> ;
OK
Time taken: 9.5 seconds
hive> load data local inpath '/root/uwserde' into table uw;
Copying data from file:/root/uwserde
Copying file: file:/root/uwserde
Loading data to table default.uw
Table default.uw stats: [numFiles=1, numRows=0, totalSize=40,
rawDataSize=0]
OK
Time taken: 6.565 seconds
hive> select * from uw;
OK
kiju1233 1234567890
huhuhuhu 1233330987
Time taken: 2.689 seconds, Fetched: 2 row(s)
|
Comments
Post a Comment