`
luweimstr
  • 浏览: 18555 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

最短路径Mapreduce实现

阅读更多

 

¢A map task receives
lKey: node n
lValue: D (distance from start); points-to (list of nodes reachable from n)
¢p \in  points-to: emit (p, D+1)
¢The reduce task gathers possible distances to a given p and selects the minimum one
input文件格式为:<node id><distance to the start node><[node id it can reach]:>
map过程中根据每个node id的distance输出“VALUE”类型键值对:[(node id it can reach):(distance + 1)]。如果distance是inf,则不输出“VALUE”类型的键值对;另外还需要输出node自身的连接信息,即“NODE”类型的键值对:<node id>:<[node id it can reach]:>。
Reduce过程输入为<node id>[VALUE distance]/[NODE node id it can reach:]。计算出VALUE list中的最小值,输出为<node id><min of distance><node id it can reach> (和输入文件格式一致)。
Mapper代码:
package com.hadoop.dijkstra;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class DjksMapper extends Mapper<LongWritable, Text, LongWritable, Text> {

	private String MAXDIS = Integer.MAX_VALUE + "";

	@Override
	protected void setup(Context context) throws IOException,
			InterruptedException {

	}

	@Override
	protected void map(LongWritable key, Text value, Context context)
			throws IOException, InterruptedException {
		// From slide 20 of Graph Algorithms with MapReduce (by Jimmy Lin, Univ
		// @ Maryland)
		// Key is node n
		// Value is D, Points-To
		// For every point (or key), look at everything it points to.
		// Emit or write to the points to variable with the current distance + 1
		Text word = new Text();
		String line = value.toString();// looks like 1 0 2:3:
		String[] sp = line.split(" ");// splits on space
		if (sp[1].compareTo(MAXDIS) != 0) { // we don't care those lines with
											// MAX distance
			int distanceadd = Integer.parseInt(sp[1]) + 1;
			String[] PointsTo = sp[2].split(":");
			for (int i = 0; i < PointsTo.length; i++) {
				word.set("VALUE " + distanceadd);// tells me to look at distance
													// value
				context.write(new LongWritable(Integer.parseInt(PointsTo[i])),
						word);
				word.clear();
			}
		}
		// pass in current node's distance (if it is the lowest distance)
		word.set("VALUE " + sp[1]);
		context.write(new LongWritable(Integer.parseInt(sp[0])), word);
		word.clear();

		word.set("NODES " + sp[2]);// tells me to append on the final tally
		context.write(new LongWritable(Integer.parseInt(sp[0])), word);
		word.clear();

	}
}
Reducer代码:
package com.hadoop.dijkstra;

import java.io.IOException;

import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Reducer;

public class DjksReducer extends
		Reducer<LongWritable, Text, LongWritable, Text> {

	public void reduce(LongWritable key, Iterable<Text> values, Context context)
			throws IOException, InterruptedException {
		// From slide 20 of Graph Algorithms with MapReduce (by Jimmy Lin, Univ
		// @ Maryland)
		// The key is the current point
		// The values are all the possible distances to this point
		// we simply emit the point and the minimum distance value

		String nodes = "UNMODED";
		Text word = new Text();
		int lowest = Integer.MAX_VALUE;// start at infinity

		for (Text val : values) {// looks like NODES/VALUES 1 0 2:3:, we need to
									// use the first as a key
			String[] sp = val.toString().split(" ");// splits on space
			// look at first value
			if (sp[0].equalsIgnoreCase("NODES")) {
				nodes = null;
				nodes = sp[1];
			} else if (sp[0].equalsIgnoreCase("VALUE")) {
				int distance = Integer.parseInt(sp[1]);
				lowest = Math.min(distance, lowest);
			}
		}
		word.set(lowest + " " + nodes);
		context.write(key, word);
		word.clear();
	}
}
 
         算法需要迭代多次最终得到各个顶点到start顶点的最短距离。每个迭代round都是一次mapreduce。后一轮的输入为前一轮的输出,直到结果与上一轮相同。主程序代码:
package com.hadoop.dijkstra;

import java.io.*;
import java.util.HashMap;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Djkstra {
	public static String OUT = "outfile";
	public static String IN = "inputlarger";

	public static void main(String args[]) throws IOException,
			InterruptedException, ClassNotFoundException {

		// set in and out to args.
		IN = args[0];
		OUT = args[1];

		String infile = IN;
		String outputfile = OUT + System.nanoTime();

		boolean isdone = false;
		@SuppressWarnings("unused")
		boolean success = false;

		HashMap<Integer, Integer> _map = new HashMap<Integer, Integer>();

		while (isdone == false) {
			Configuration conf = new Configuration();
			conf.set("mapred.textoutputformat.separator", " ");// make the key
																// -> value
																// space
																// separated
																// (for
																// iterations)
			Job job = new Job(conf);
			job.setJarByClass(Djkstra.class);
			job.setJobName("Dijkstra");

			job.setMapperClass(DjksMapper.class);
			job.setReducerClass(DjksReducer.class);

			FileInputFormat.addInputPath(job, new Path(infile));
			FileOutputFormat.setOutputPath(job, new Path(outputfile));

			job.setMapOutputKeyClass(LongWritable.class);
			job.setMapOutputValueClass(Text.class);
			job.setOutputKeyClass(LongWritable.class);
			job.setOutputValueClass(Text.class);

			success = job.waitForCompletion(true);

			// remove the input file
			if (infile != IN) {
				String indir = infile.replace("part-r-00000", "");
				Path ddir = new Path(indir);
				FileSystem dfs = FileSystem.get(conf);
				dfs.delete(ddir, true);
			}

			// output path as the input path for next round
			// TODO: what if there are more than one reducers?
			infile = outputfile + "/part-r-00000";
			outputfile = OUT + System.nanoTime();

			// do we need to re-run the job with the new input file??
			isdone = true;// set the job to NOT run again!
			Path ofile = new Path(infile);
			FileSystem fs = FileSystem.get(new Configuration());
			BufferedReader br = new BufferedReader(new InputStreamReader(
					fs.open(ofile)));

			HashMap<Integer, Integer> imap = new HashMap<Integer, Integer>();
			String line = br.readLine();
			while (line != null) {
				// each line looks like 0 1 2:3:
				// we need to verify node -> distance doesn't change
				String[] sp = line.split(" ");
				int node = Integer.parseInt(sp[0]);
				int distance = Integer.parseInt(sp[1]);
				imap.put(node, distance);
				line = br.readLine();
			}
			if (_map.isEmpty()) {
				// first iteration... must do a second iteration regardless!
				isdone = false;
			} else {
				Iterator<Integer> itr = imap.keySet().iterator();
				while (itr.hasNext()) {
					int key = itr.next();
					int val = imap.get(key);
					if (_map.get(key) != val) {
						// values aren't the same... we aren't at convergence
						// yet (iterate until results are the same as last round)
						isdone = false;
					}
				}
			}
			if (isdone == false) {
				_map.putAll(imap);// copy imap to _map for the next iteration
									// (if required)
			}
		}

	}
}
 
 
 
 
 

 

分享到:
评论

相关推荐

    MapReduce实现单元最短路径算法.doc

    MapReduce实现单元最短路径算法.doc

    单源最短路径算法(MapReduce)源代码

    单源最短路径算法(MapReduce)源代码,对与hadoop的初学者来说是很好的入门教程

    最短路径系列之一从零开始学习HADOOP

    最短路径系列之一从零开始学习HADOOP,只要有一台能上网的计算机,就可以让读者在最短的时间内,学会Hadoop的初级开发。所以,这本书只讲干货,也就是必须要知道的Hadoop的最核心知识点,包括如何搭建Hadoop,如何写...

    MapReduce求解物流配送单源最短路径研究

    针对物流配送路线优化,提出了将配送路线问题分解...详细论述了基于标色法的MapReduce广度优先算法并行化模型、节点数据结构、算法流程和伪代码程序,并通过将该算法应用于快递公司的实际配送,验证了该算法的可行性。

    基于云计算的混合并行遗传算法求解最短路径

    方法采用云计算中Hadoop的MapReduce并行编程模型,提高编码效率,同时将细粒度并行遗传算法和禁忌搜索算法结合,提高了寻优算法的计算速度和局部寻优能力,进而提高最短路径的求解效率。仿真结果表明,该方法在计算...

    简单的MapReduce程序(Hadoop2.2.0)

    一个简单的MapReduce程序。Hadoop2.2.0上实测可用。

    MapReduce_SSSP.rar_mapReduce_mapreduce sssp_single_sssp

    MapReduce框架实现的单源最短路径算法(SSSP, Single-source Shortest Path),包含源文件及数据。sssp.c是MapReduce实现,ss seq.cpp是串行的普通实现。

    基于MapReduce框架下的复杂网络社团发现算法

    为此,利用MapReduce模型,提出了一种并行版本的GN算法来支持大规模网络的新方法,称之为最短路径之间的MapReduce算法(Shortest Path Betweenness MapReduce Algorithm,SPB-MRA)。此外,还提出了一个近似技术,...

    Cloud-Hadoop

    ParallelDijkstra.java 查找最短路径的MapReduce程序。 PDNodeWritable.java 存储邻接表和其他属性的可写节点结构。 版权: 我拥有ParallelDijkstra.java。 我的队友拥有PDNodeWritable.java和PDPreProcess.java...

    PDM :基于Hadoop的并行数据分析系统 (2012年)

    详细阐述了并行多元线性回归算法和“多源最短路径”算法的原理和实现,其中,提出的“消息传递模型”能有效解决MapReduce难以处理邻接矩阵的问题;介绍了基于电信数据的典型应用,如采用并行k均值和决策树算法实现的...

    Hadoop硬实战 [(美)霍姆斯著][电子工业出版社][2015.01]_PDF电子书下载 带书签目录 高清完整版.rar )

    7.1.2 最短路径算法 技术点52 找出两个用户间的最短距离 7.1.3 friends-of-friends(FoF) 技术点53 计算FoF 7.1.4 PageRank 技术点54 通过Web 图计算PageRank 7.2 Bloom filter 技术点55 在...

    Hadoop实战(第2版)

    数据科学.7 数据结构和算法的运用7.1 使用图进行数据建模和解决问题7.1.1 模拟图7.1.2 最短路径算法技术点52 找出两个用户间的最短距离7.1.3 friends-of-friends(FoF) 技术点53 计算FoF 7.1.4 ...

    大图的顶点驱动并行最小生成树算法

    基于MST结构的聚类,分类和最短路径查询等复杂图算法,在效率和结果质量方面需要显着提高。随着互联网的迅猛发展,图数据规模也越来越大,包含数百万甚至上亿个指针的大图数据越发常见。因此,如何在大图数据上实现...

    maestro:在 FPGA 上使用异步累积更新加速迭代算法的框架

    在迭代算法中,通过对输入数据集(例如 PageRank,Dijkstra 的最短路径)执行重复计算得出最终结果。 并行化此类算法的现有技术使用诸如 MapReduce 和 Hadoop 之类的软件框架在集群中的多个基于 CPU 的工作站之间...

    大数据应用-基于大数据的推荐算法研究.pptx

    基于项目层次结构相似性的推荐算法 相似度度量 节点之间的距离度量: 然后利用最短路径算法Dijkstra结合TopK思想找到最相近的项目; 大数据应用-基于大数据的推荐算法研究全文共35页,当前为第18页。 基于项目层次...

Global site tag (gtag.js) - Google Analytics