¢A map task receives
lKey: node n
lValue: D (distance from start); points-to (list of nodes reachable from n)
¢p \in points-to: emit (p, D+1)
¢The reduce task gathers possible distances to a given p and selects the minimum one
input文件格式为:<node id><distance to the start node><[node id it can reach]:>
map过程中根据每个node id的distance输出“VALUE”类型键值对:[(node id it can reach):(distance + 1)]。如果distance是inf,则不输出“VALUE”类型的键值对;另外还需要输出node自身的连接信息,即“NODE”类型的键值对:<node id>:<[node id it can reach]:>。
Reduce过程输入为<node id>[VALUE distance]/[NODE node id it can reach:]。计算出VALUE list中的最小值,输出为<node id><min of distance><node id it can reach> (和输入文件格式一致)。
Mapper代码:
package com.hadoop.dijkstra; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class DjksMapper extends Mapper<LongWritable, Text, LongWritable, Text> { private String MAXDIS = Integer.MAX_VALUE + ""; @Override protected void setup(Context context) throws IOException, InterruptedException { } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // From slide 20 of Graph Algorithms with MapReduce (by Jimmy Lin, Univ // @ Maryland) // Key is node n // Value is D, Points-To // For every point (or key), look at everything it points to. // Emit or write to the points to variable with the current distance + 1 Text word = new Text(); String line = value.toString();// looks like 1 0 2:3: String[] sp = line.split(" ");// splits on space if (sp[1].compareTo(MAXDIS) != 0) { // we don't care those lines with // MAX distance int distanceadd = Integer.parseInt(sp[1]) + 1; String[] PointsTo = sp[2].split(":"); for (int i = 0; i < PointsTo.length; i++) { word.set("VALUE " + distanceadd);// tells me to look at distance // value context.write(new LongWritable(Integer.parseInt(PointsTo[i])), word); word.clear(); } } // pass in current node's distance (if it is the lowest distance) word.set("VALUE " + sp[1]); context.write(new LongWritable(Integer.parseInt(sp[0])), word); word.clear(); word.set("NODES " + sp[2]);// tells me to append on the final tally context.write(new LongWritable(Integer.parseInt(sp[0])), word); word.clear(); } }
Reducer代码:
package com.hadoop.dijkstra; import java.io.IOException; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.Reducer; public class DjksReducer extends Reducer<LongWritable, Text, LongWritable, Text> { public void reduce(LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { // From slide 20 of Graph Algorithms with MapReduce (by Jimmy Lin, Univ // @ Maryland) // The key is the current point // The values are all the possible distances to this point // we simply emit the point and the minimum distance value String nodes = "UNMODED"; Text word = new Text(); int lowest = Integer.MAX_VALUE;// start at infinity for (Text val : values) {// looks like NODES/VALUES 1 0 2:3:, we need to // use the first as a key String[] sp = val.toString().split(" ");// splits on space // look at first value if (sp[0].equalsIgnoreCase("NODES")) { nodes = null; nodes = sp[1]; } else if (sp[0].equalsIgnoreCase("VALUE")) { int distance = Integer.parseInt(sp[1]); lowest = Math.min(distance, lowest); } } word.set(lowest + " " + nodes); context.write(key, word); word.clear(); } }
算法需要迭代多次最终得到各个顶点到start顶点的最短距离。每个迭代round都是一次mapreduce。后一轮的输入为前一轮的输出,直到结果与上一轮相同。主程序代码:
package com.hadoop.dijkstra; import java.io.*; import java.util.HashMap; import java.util.Iterator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class Djkstra { public static String OUT = "outfile"; public static String IN = "inputlarger"; public static void main(String args[]) throws IOException, InterruptedException, ClassNotFoundException { // set in and out to args. IN = args[0]; OUT = args[1]; String infile = IN; String outputfile = OUT + System.nanoTime(); boolean isdone = false; @SuppressWarnings("unused") boolean success = false; HashMap<Integer, Integer> _map = new HashMap<Integer, Integer>(); while (isdone == false) { Configuration conf = new Configuration(); conf.set("mapred.textoutputformat.separator", " ");// make the key // -> value // space // separated // (for // iterations) Job job = new Job(conf); job.setJarByClass(Djkstra.class); job.setJobName("Dijkstra"); job.setMapperClass(DjksMapper.class); job.setReducerClass(DjksReducer.class); FileInputFormat.addInputPath(job, new Path(infile)); FileOutputFormat.setOutputPath(job, new Path(outputfile)); job.setMapOutputKeyClass(LongWritable.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(LongWritable.class); job.setOutputValueClass(Text.class); success = job.waitForCompletion(true); // remove the input file if (infile != IN) { String indir = infile.replace("part-r-00000", ""); Path ddir = new Path(indir); FileSystem dfs = FileSystem.get(conf); dfs.delete(ddir, true); } // output path as the input path for next round // TODO: what if there are more than one reducers? infile = outputfile + "/part-r-00000"; outputfile = OUT + System.nanoTime(); // do we need to re-run the job with the new input file?? isdone = true;// set the job to NOT run again! Path ofile = new Path(infile); FileSystem fs = FileSystem.get(new Configuration()); BufferedReader br = new BufferedReader(new InputStreamReader( fs.open(ofile))); HashMap<Integer, Integer> imap = new HashMap<Integer, Integer>(); String line = br.readLine(); while (line != null) { // each line looks like 0 1 2:3: // we need to verify node -> distance doesn't change String[] sp = line.split(" "); int node = Integer.parseInt(sp[0]); int distance = Integer.parseInt(sp[1]); imap.put(node, distance); line = br.readLine(); } if (_map.isEmpty()) { // first iteration... must do a second iteration regardless! isdone = false; } else { Iterator<Integer> itr = imap.keySet().iterator(); while (itr.hasNext()) { int key = itr.next(); int val = imap.get(key); if (_map.get(key) != val) { // values aren't the same... we aren't at convergence // yet (iterate until results are the same as last round) isdone = false; } } } if (isdone == false) { _map.putAll(imap);// copy imap to _map for the next iteration // (if required) } } } }
相关推荐
MapReduce实现单元最短路径算法.doc
单源最短路径算法(MapReduce)源代码,对与hadoop的初学者来说是很好的入门教程
最短路径系列之一从零开始学习HADOOP,只要有一台能上网的计算机,就可以让读者在最短的时间内,学会Hadoop的初级开发。所以,这本书只讲干货,也就是必须要知道的Hadoop的最核心知识点,包括如何搭建Hadoop,如何写...
针对物流配送路线优化,提出了将配送路线问题分解...详细论述了基于标色法的MapReduce广度优先算法并行化模型、节点数据结构、算法流程和伪代码程序,并通过将该算法应用于快递公司的实际配送,验证了该算法的可行性。
方法采用云计算中Hadoop的MapReduce并行编程模型,提高编码效率,同时将细粒度并行遗传算法和禁忌搜索算法结合,提高了寻优算法的计算速度和局部寻优能力,进而提高最短路径的求解效率。仿真结果表明,该方法在计算...
一个简单的MapReduce程序。Hadoop2.2.0上实测可用。
MapReduce框架实现的单源最短路径算法(SSSP, Single-source Shortest Path),包含源文件及数据。sssp.c是MapReduce实现,ss seq.cpp是串行的普通实现。
为此,利用MapReduce模型,提出了一种并行版本的GN算法来支持大规模网络的新方法,称之为最短路径之间的MapReduce算法(Shortest Path Betweenness MapReduce Algorithm,SPB-MRA)。此外,还提出了一个近似技术,...
ParallelDijkstra.java 查找最短路径的MapReduce程序。 PDNodeWritable.java 存储邻接表和其他属性的可写节点结构。 版权: 我拥有ParallelDijkstra.java。 我的队友拥有PDNodeWritable.java和PDPreProcess.java...
详细阐述了并行多元线性回归算法和“多源最短路径”算法的原理和实现,其中,提出的“消息传递模型”能有效解决MapReduce难以处理邻接矩阵的问题;介绍了基于电信数据的典型应用,如采用并行k均值和决策树算法实现的...
7.1.2 最短路径算法 技术点52 找出两个用户间的最短距离 7.1.3 friends-of-friends(FoF) 技术点53 计算FoF 7.1.4 PageRank 技术点54 通过Web 图计算PageRank 7.2 Bloom filter 技术点55 在...
数据科学.7 数据结构和算法的运用7.1 使用图进行数据建模和解决问题7.1.1 模拟图7.1.2 最短路径算法技术点52 找出两个用户间的最短距离7.1.3 friends-of-friends(FoF) 技术点53 计算FoF 7.1.4 ...
基于MST结构的聚类,分类和最短路径查询等复杂图算法,在效率和结果质量方面需要显着提高。随着互联网的迅猛发展,图数据规模也越来越大,包含数百万甚至上亿个指针的大图数据越发常见。因此,如何在大图数据上实现...
在迭代算法中,通过对输入数据集(例如 PageRank,Dijkstra 的最短路径)执行重复计算得出最终结果。 并行化此类算法的现有技术使用诸如 MapReduce 和 Hadoop 之类的软件框架在集群中的多个基于 CPU 的工作站之间...
基于项目层次结构相似性的推荐算法 相似度度量 节点之间的距离度量: 然后利用最短路径算法Dijkstra结合TopK思想找到最相近的项目; 大数据应用-基于大数据的推荐算法研究全文共35页,当前为第18页。 基于项目层次...