MapReduce处理社交关系的一道问题

给定一个社交关系(A,B),A和B分别代表2个人的id或名字(如果是名字,要求名字不能有重复)。要求只使用一次MapReduce,过滤掉里面的双向关系,只保留单向关系。比如现在这个关系里包含如下内容:

(张三,李四)

(张三,王五)

(张三,赵六)

(李四,张三)

里面的第1条和第4条构成了双向关系,要过滤掉,剩下第2和第3条。

思路1:对每一条关系(A, B),在map阶段生成(A, B) 和(B, A) 。如果A和B是双向的,会生成4条记录,分别是(A, B),  (B, A), (B, A), (A,B)。

在reduce阶段,我们发现key A对应的values里,B有2条。我们知道这是由双向关系产生的,我们不应该有任何输出。我们只输出在values里只有1个的。假如C在values里只有1个。则reduce阶段,我们输出(A,C)。

带来的问题:以上的办法能过滤掉双向关系,但是会输出多余的内容。比如一个单项关系 (A, B)。按照以上方法, reduce阶段最后会输出(A,B)和(B,A)。实际上不应该输出(B, A)。这个(B,A)是我们人为加进去的。

思路2:在思路1的基础上,map 阶段加入标记。对每一条关系(A, B),在map阶段生成(A, B|1) 和(B, A|0)。1代表真实存在的关系,0代表为了解决问题加入的虚构关系。在reduce阶段,我们按照思路1过滤掉有2个值的,然后看标记是不是1,如果是1则输出;如果是0,不输出。

Hadoop MapReduce代码如下:

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;

public class SingleRelationshipMapper extends MapReduceBase implements
Mapper<LongWritable, Text, Text, Text>{

    @Override
    public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter arg3)
        throws IOException {
        String line = value.toString();
        String[] names = line.split(" ");
        Text text1 = new Text(names[0]);
        Text text2 = new Text(names[1]);
        output.collect(new Text(text1), new Text(text2+"|1")); //1代表真实关系
        output.collect(new Text(text2), new Text(text1+"|0")); //0代表虚构关系
    }
}

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;

public class SingleRelationshipReducer extends MapReduceBase implements
    Reducer<Text, Text, Text, Text>{

    @Override
    public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter arg3) throws IOException {
        List<String> list = new ArrayList<String>();
        while (values.hasNext()) {
            list.add(values.next().toString());
       }
       for(String item:list){
           String[] array = item.split("\\|");
           if(onlyOne(array[0], list)){
               if(array[1].equals("1")){
                   output.collect(key, new Text(array[0]));
               }
           }
       }
    }

    private boolean onlyOne(String str, List<String> list){
        int count=0;
        for(String item: list){
            String array[] = item.split("\\|");
            if(array[0].equals(str)){
                count++;
            }
        }
        return count==1;
     }
}
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class SingleRelationshipDriver extends Configured implements Tool{

    /**
    * @param args
    * @throws Exception
    */
    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new SingleRelationshipDriver(), args);
        System.exit(exitCode);
    }

    @Override
    public int run(String[] arg) throws Exception {

        JobConf conf = new JobConf();
        conf.setJobName("singleRelation");
        conf.setJarByClass(SingleRelationshipDriver.class);
        conf.setMapperClass(SingleRelationshipMapper.class);
        conf.setReducerClass(SingleRelationshipReducer.class);
        conf.setOutputKeyClass(Text.class);
        conf.setOutputValueClass(Text.class);
        FileInputFormat.addInputPaths(conf, arg[0]);
        FileOutputFormat.setOutputPath(conf, new Path(arg[1]));
        JobClient.runJob(conf);
        return 0;
    }

}

输入:

John Tom
John Amily
John George
Amily John

输出:

John Tom
John George