package mobvista.dmp.datasource.ga.mapreduce.vo; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.mapreduce.Partitioner; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class TextPair implements WritableComparable<TextPair> { private Text first; private Text second; public TextPair() { set(new Text(), new Text()); } public TextPair(String first, String second) { this.set(new Text(first), new Text(second)); } public void set(Text first, Text second) { this.first = first; this.second = second; } public void set(String first, String second) { this.set(new Text(first), new Text(second)); } public void readFields(DataInput in) throws IOException { first.readFields(in); second.readFields(in); } public void write(DataOutput out) throws IOException { first.write(out); second.write(out); } @Override public int hashCode() { return first.hashCode() * 163 + second.hashCode(); } @Override public String toString() { return first + "\t" + second; } public Text getFirst() { return first; } public Text getSecond() { return second; } public int compareTo(TextPair arg0) { TextPair tp = (TextPair) arg0; int cmp = first.compareTo(tp.first); if (cmp != 0) { return cmp; } return second.compareTo(tp.second); } public static class FirstComparator extends WritableComparator { private static final Text.Comparator TEXT_COMPARATOR = new Text.Comparator(); protected FirstComparator() { super(TextPair.class); } @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { try { int firstL1 = WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1); int firstL2 = WritableUtils.decodeVIntSize(b2[s2]) + readVInt(b2, s2); return TEXT_COMPARATOR.compareBytes(b1, s1, firstL1, b2, s2, firstL2); } catch (IOException e) { e.printStackTrace(); } return 0; } @Override public int compare(WritableComparable a, WritableComparable b) { if (a instanceof TextPair && b instanceof TextPair) { return ((TextPair) a).getFirst().compareTo(((TextPair) b).getFirst()); } return super.compare(a, b); } } public static class FirstPartitioner extends Partitioner<TextPair, Text> { @Override public int getPartition(TextPair textPair, Text text, int numPartitions) { return (textPair.getFirst().hashCode() & Integer.MAX_VALUE) % numPartitions ; } } }