给定一个社交关系(A,B),A和B分别代表2个人的id或名字(如果是名字,要求名字不能有重复)。要求只使用一次MapReduce,过滤掉里面的双向关系,只保留单向关系。比如现在这个关系里包含如下内容:
(张三,李四)
(张三,王五)
(张三,赵六)
(李四,张三)
里面的第1条和第4条构成了双向关系,要过滤掉,剩下第2和第3条。
思路1:对每一条关系(A, B),在map阶段生成(A, B) 和(B, A) 。如果A和B是双向的,会生成4条记录,分别是(A, B), (B, A), (B, A), (A,B)。
在reduce阶段,我们发现key A对应的values里,B有2条。我们知道这是由双向关系产生的,我们不应该有任何输出。我们只输出在values里只有1个的。假如C在values里只有1个。则reduce阶段,我们输出(A,C)。
带来的问题:以上的办法能过滤掉双向关系,但是会输出多余的内容。比如一个单项关系 (A, B)。按照以上方法, reduce阶段最后会输出(A,B)和(B,A)。实际上不应该输出(B, A)。这个(B,A)是我们人为加进去的。
思路2:在思路1的基础上,map 阶段加入标记。对每一条关系(A, B),在map阶段生成(A, B|1) 和(B, A|0)。1代表真实存在的关系,0代表为了解决问题加入的虚构关系。在reduce阶段,我们按照思路1过滤掉有2个值的,然后看标记是不是1,如果是1则输出;如果是0,不输出。
Hadoop MapReduce代码如下:
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
public class SingleRelationshipMapper extends MapReduceBase implements
Mapper<LongWritable, Text, Text, Text>{
@Override
public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter arg3)
throws IOException {
String line = value.toString();
String[] names = line.split(" ");
Text text1 = new Text(names[0]);
Text text2 = new Text(names[1]);
output.collect(new Text(text1), new Text(text2+"|1")); //1代表真实关系
output.collect(new Text(text2), new Text(text1+"|0")); //0代表虚构关系
}
}
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
public class SingleRelationshipReducer extends MapReduceBase implements
Reducer<Text, Text, Text, Text>{
@Override
public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter arg3) throws IOException {
List<String> list = new ArrayList<String>();
while (values.hasNext()) {
list.add(values.next().toString());
}
for(String item:list){
String[] array = item.split("\\|");
if(onlyOne(array[0], list)){
if(array[1].equals("1")){
output.collect(key, new Text(array[0]));
}
}
}
}
private boolean onlyOne(String str, List<String> list){
int count=0;
for(String item: list){
String array[] = item.split("\\|");
if(array[0].equals(str)){
count++;
}
}
return count==1;
}
}
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class SingleRelationshipDriver extends Configured implements Tool{
/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new SingleRelationshipDriver(), args);
System.exit(exitCode);
}
@Override
public int run(String[] arg) throws Exception {
JobConf conf = new JobConf();
conf.setJobName("singleRelation");
conf.setJarByClass(SingleRelationshipDriver.class);
conf.setMapperClass(SingleRelationshipMapper.class);
conf.setReducerClass(SingleRelationshipReducer.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
FileInputFormat.addInputPaths(conf, arg[0]);
FileOutputFormat.setOutputPath(conf, new Path(arg[1]));
JobClient.runJob(conf);
return 0;
}
}
输入:
John Tom
John Amily
John George
Amily John
输出:
John Tom
John George



