1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package mobvista.dmp.datasource.ga.mapreduce.vo;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapreduce.Partitioner;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class TextPair implements WritableComparable<TextPair>
{
private Text first;
private Text second;
public TextPair()
{
set(new Text(), new Text());
}
public TextPair(String first, String second)
{
this.set(new Text(first), new Text(second));
}
public void set(Text first, Text second)
{
this.first = first;
this.second = second;
}
public void set(String first, String second)
{
this.set(new Text(first), new Text(second));
}
public void readFields(DataInput in) throws IOException
{
first.readFields(in);
second.readFields(in);
}
public void write(DataOutput out) throws IOException
{
first.write(out);
second.write(out);
}
@Override
public int hashCode()
{
return first.hashCode() * 163 + second.hashCode();
}
@Override
public String toString()
{
return first + "\t" + second;
}
public Text getFirst()
{
return first;
}
public Text getSecond()
{
return second;
}
public int compareTo(TextPair arg0)
{
TextPair tp = (TextPair) arg0;
int cmp = first.compareTo(tp.first);
if (cmp != 0)
{
return cmp;
}
return second.compareTo(tp.second);
}
public static class FirstComparator extends WritableComparator
{
private static final Text.Comparator TEXT_COMPARATOR = new Text.Comparator();
protected FirstComparator()
{
super(TextPair.class);
}
@Override
public int compare(byte[] b1, int s1, int l1,
byte[] b2, int s2, int l2)
{
try
{
int firstL1 = WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1);
int firstL2 = WritableUtils.decodeVIntSize(b2[s2]) + readVInt(b2, s2);
return TEXT_COMPARATOR.compareBytes(b1, s1, firstL1, b2, s2, firstL2);
} catch (IOException e)
{
e.printStackTrace();
}
return 0;
}
@Override
public int compare(WritableComparable a, WritableComparable b)
{
if (a instanceof TextPair && b instanceof TextPair)
{
return ((TextPair) a).getFirst().compareTo(((TextPair) b).getFirst());
}
return super.compare(a, b);
}
}
public static class FirstPartitioner extends Partitioner<TextPair, Text> {
@Override
public int getPartition(TextPair textPair, Text text, int numPartitions) {
return (textPair.getFirst().hashCode() & Integer.MAX_VALUE) % numPartitions ;
}
}
}