PageRank简单介绍:
其值是通过其他值得指向值所决定,具体例子如下:
第一部分:
对应于每个mapReduce的计算:
由mapper算出每个点所指节点的分值,由reduce整个key相同的,由公式算出。
三角号表示的是迭代两次之间计算的差值,若小于某个值则计算完成,求的每个点的pagerank值。
自我实现的代码:如下
输入的数据分为:
input1.txt
A,B,D
B,C
C,A,B
D,B,C
表示每行第一个点所指向的节点,在reducer的setup会用到,构建hashmap供使用。
input2.txt
A,0.25,B,D
B,0.25,C
C,0.25,A,B
D,0.25,B,C
中间多的数字,表示当前每个节点的pagerank值,其文件可无,因为可以由上面的文件计算生成,有四个节点,即1/4。
自我实现的代码:
package bbdt.steiss.pageRank;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class PageRank {
public static class PageMapper extends Mapper<LongWritable, Text, Text, Text>{
private Text averageValue = new Text();
private Text node = new Text();
@Override
//把每行数据的对应节点的分pagerank找出,并输出,当前节点的值除以指向节点的总数
protected void map(LongWritable key, Text value,
Context context)
throws IOException, InterruptedException {
String string = value.toString();
String [] ss = string.split(",");
int length = ss.length;
double pageValue = Double.parseDouble(ss[1]);
double average = pageValue/(length-2);
averageValue.set(String.valueOf(average));
int i = 2;
while(i<=length-1){
node.set(ss[i]);
context.write(node,averageValue);
i++;
}
}
}
public static class PageReducer extends Reducer<Text, Text, Text, Text>{
private HashMap<String, String> content;
private Text res = new Text();
//reducer工作前,key相同的会分组分在一组,用迭代器操作,从总的图中找到所有该节点的分pagerank值
//利用公式计算该pagerank值,输出。因为下一次要用,因此输出可以凑近一些,把结果都放在value里输出
@Override
protected void reduce(Text text, Iterable<Text> intIterable,
Context context)
throws IOException, InterruptedException {
double sum = 0.0;
double v = 0.0;
for (Text t : intIterable) {
v = Double.parseDouble(t.toString());
sum = sum + v;
}
double a = 0.85;
double result = (1-a)/4 + a*sum;
String sRes = String.valueOf(result);
String back = content.get(text.toString());
String front = text.toString();
String comp = front + "," + sRes + back;
res.set(comp);
context.write(null,res);
}
@Override
//reducer的初始化时,先把节点对应文件的数据,存在hashmap中,也就是content中,供每次reduce方法使用,相当于数据库的作用
//方便查询
protected void setup(Context context)
throws IOException, InterruptedException {
URI[] uri = context.getCacheArchives();
content = new HashMap<String, String>();
for(URI u : uri)
{
FileSystem fileSystem = FileSystem.get(u.create("hdfs://hadoop1:9000"), context.getConfiguration());
FSDataInputStream in = null;
in = fileSystem.open(new Path(u.getPath()));
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(in));
String line;
while((line = bufferedReader.readLine())!=null)
{
int index = line.indexOf(",");
String first = line.substring(0,index);
String last = line.substring(index,line.length());
content.put(first, last);
}
}
}
}
public static void main(String[] args) throws Exception{
//接受路径文件
Path inputPath = new Path(args[0]);
Path outputPath = new Path(args[1]);
Path cachePath = new Path(args[2]);
double result = 100;
int flag = 0;
//制定差值多大时进入循环
while(result>0.1)
{
if(flag == 1)
{
//初次调用mapreduce不操作这个
//这个是把mapreduce的输出文件复制到输入文件中,作为这次mapreduce的输入文件
copyFile();
flag = 0;
}
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
job.setJarByClass(PageRank.class);
job.setMapperClass(PageMapper.class);
job.setReducerClass(PageReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, inputPath);
FileOutputFormat.setOutputPath(job, outputPath);
job.addCacheArchive(cachePath.toUri());
outputPath.getFileSystem(configuration).delete(outputPath, true);
job.waitForCompletion(true);
String outpathString = outputPath.toString()+"/part-r-00000";
//计算两个文件的各节点的pagerank值差
result = fileDo(inputPath, new Path(outpathString));
flag = 1;
}
System.exit(0);
}
//计算两个文件的每个节点的pagerank差值,返回
public static double fileDo(Path inputPath,Path outPath) throws Exception
{
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://hadoop1:9000");
FileSystem fs = FileSystem.get(conf);
FSDataInputStream in1 = null;
FSDataInputStream in2 = null;
in1 = fs.open(inputPath);
in2 = fs.open(outPath);
BufferedReader br1 = new BufferedReader(new InputStreamReader(in1));
BufferedReader br2 = new BufferedReader(new InputStreamReader(in2));
String s1 = null;
String s2 = null;
ArrayList<Double> arrayList1 = new ArrayList<Double>();
ArrayList<Double> arrayList2 = new ArrayList<Double>();
while ((s1 = br1.readLine()) != null)
{
String[] ss = s1.split(",");
arrayList1.add(Double.parseDouble(ss[1]));
}
br1.close();
while ((s2 = br2.readLine()) != null)
{
String[] ss = s2.split(",");
arrayList2.add(Double.parseDouble(ss[1]));
}
double res = 0;
for(int i = 0;i<arrayList1.size();i++)
{
res = res + Math.abs(arrayList1.get(i)-arrayList2.get(i));
}
return res;
}
//将输出文件复制到输入文件中
public static void copyFile() throws Exception
{
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://hadoop1:9000");
FileSystem fs = FileSystem.get(conf);
FSDataInputStream in1 = null;
in1 = fs.open(new Path("/output/part-r-00000"));
BufferedReader br1 = new BufferedReader(new InputStreamReader(in1));
//这里删除需要打开hdfs在/input目录下的权限操作,非常重要
//“hdfs dfs -chmod 777 /input”打开权限,这样才可以删除其下面的文件
fs.delete(new Path("/input/test2.txt"),true);
//建立一个新文件,返回流
FSDataOutputStream fsDataOutputStream = fs.create(new Path("/input/test2.txt"));
BufferedWriter bw1 = new BufferedWriter(new OutputStreamWriter(fsDataOutputStream));
String s1 = null;
//写出并写入
while ((s1 = br1.readLine()) != null)
{
bw1.write(s1);
bw1.write("\n");
}
bw1.close();
fsDataOutputStream.close();
br1.close();
in1.close();
}
}
注意:
在本地操作hdfs时,进行文件的删除和添加,需要打开hdfs的文件操作权限,
这里删除需要打开hdfs在/input目录下的权限操作,非常重要
“hdfs dfs -chmod 777 /input”打开权限,这样才可以删除其下面的文件
打开/input路径的操作权限
第二部分
以上是自己实现的pagerank的算法;下面介绍一下别人的代码
robby的代码实现:
1.首先对节点定义节点类,用于存当前节点的pagerank值以及所指向的节点,存在一个数组中。
package org.robby.mr.pagerank;
import org.apache.commons.lang.StringUtils;
import java.io.IOException;
import java.util.Arrays;
//节点类,记录的是当前节点的pagerank值和其指向的节点
public class Node {
private double pageRank = 0.25;
private String[] adjacentNodeNames;
//分割符号
public static final char fieldSeparator = '\t';
public double getPageRank() {
return pageRank;
}
public Node setPageRank(double pageRank) {
this.pageRank = pageRank;
return this;
}
public String[] getAdjacentNodeNames() {
return adjacentNodeNames;
}
//接受一个数组,复制在指向节点数组上
public Node setAdjacentNodeNames(String[] adjacentNodeNames) {
this.adjacentNodeNames = adjacentNodeNames;
return this;
}
public boolean containsAdjacentNodes() {
return adjacentNodeNames != null;
}
//这个方法是从pagerank值开始+后面的指向的节点
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(pageRank);
if (getAdjacentNodeNames() != null) {
sb.append(fieldSeparator)
.append(StringUtils
.join(getAdjacentNodeNames(), fieldSeparator));
}
return sb.toString();
}
//通过字符串建立一个node
public static Node fromMR(String value) throws IOException {
String[] parts = StringUtils.splitPreserveAllTokens(
value, fieldSeparator);
if (parts.length < 1) {
throw new IOException(
"Expected 1 or more parts but received " + parts.length);
}
Node node = new Node()
.setPageRank(Double.valueOf(parts[0]));
if (parts.length > 1) {
node.setAdjacentNodeNames(Arrays.copyOfRange(parts, 1,
parts.length));
}
return node;
}
}
2.这个是mapper的实现
package org.robby.mr.pagerank;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
//这里map的输入时Text和Text类型,说明是两个文本,因此主函数中应设置job的输入类型格式为KeyValueTextInputFormat
public class Map
extends Mapper<Text, Text, Text, Text> {
private Text outKey = new Text();
private Text outValue = new Text();
@Override
protected void map(Text key, Text value, Context context)
throws IOException, InterruptedException {
//先把原始的数据输出,供reduce找指向节点使用
context.write(key, value);
//传入时,key是第一个节点,以制表符分割,后面是value
Node node = Node.fromMR(value.toString());
if(node.getAdjacentNodeNames() != null &&
node.getAdjacentNodeNames().length > 0) {
double outboundPageRank = node.getPageRank() /
(double)node.getAdjacentNodeNames().length;
for (int i = 0; i < node.getAdjacentNodeNames().length; i++) {
String neighbor = node.getAdjacentNodeNames()[i];
outKey.set(neighbor);
Node adjacentNode = new Node()
.setPageRank(outboundPageRank);
outValue.set(adjacentNode.toString());
System.out.println(
" output -> K[" + outKey + "],V[" + outValue + "]");
//这里输出计算出的节点分pagerank值
context.write(outKey, outValue);
}
}
}
}
输出的数据:例子
A 0.25 B D
B 0.125
D 0.125
注意:
KeyValueTextInputFormat的输入格式(Text,Text),对每行的文本内容进行处理,以第一个制表符作为分割,分为key和value传入。
TextInputFormat的格式是(Longwritable,Text),以行标作为key,内容作为value处理;
3.reduce方法的实现
package org.robby.mr.pagerank;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class Reduce
extends Reducer<Text, Text, Text, Text> {
public static final double CONVERGENCE_SCALING_FACTOR = 1000.0;
public static final double DAMPING_FACTOR = 0.85;
public static String CONF_NUM_NODES_GRAPH = "pagerank.numnodes";
private int numberOfNodesInGraph;
public static enum Counter {
CONV_DELTAS
}
//reduce初始化时执行的方法,得到总节点个数,在conf对象里
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
numberOfNodesInGraph = context.getConfiguration().getInt(
CONF_NUM_NODES_GRAPH, 0);
}
private Text outValue = new Text();
public void reduce(Text key, Iterable<Text> values,
Context context)
throws IOException, InterruptedException {
System.out.println("input -> K[" + key + "]");
double summedPageRanks = 0;
Node originalNode = new Node();
for (Text textValue : values) {
System.out.println(" input -> V[" + textValue + "]");
Node node = Node.fromMR(textValue.toString());
//这里就是传入的是原始数据
if (node.containsAdjacentNodes()) {
// the original node
//
originalNode = node;
} else {
//计算针对一个节点的pagerank总和
summedPageRanks += node.getPageRank();
}
}
double dampingFactor =
((1.0 - DAMPING_FACTOR) / (double) numberOfNodesInGraph);
double newPageRank =
dampingFactor + (DAMPING_FACTOR * summedPageRanks);
//计算差值
double delta = originalNode.getPageRank() - newPageRank;
//把原节点对象的pagerank改为新的
originalNode.setPageRank(newPageRank);
outValue.set(originalNode.toString());
System.out.println(
" output -> K[" + key + "],V[" + outValue + "]");
//把更改后的节点对象输出
context.write(key, outValue);
int scaledDelta =
Math.abs((int) (delta * CONVERGENCE_SCALING_FACTOR));
System.out.println("Delta = " + scaledDelta);
//这个是计数器,mapreduce有很多计数器,自定义的要通过enum对象传入建立和取值
//increment是增值的意思
context.getCounter(Counter.CONV_DELTAS).increment(scaledDelta);
}
}
4.main函数的实现:
package org.robby.mr.pagerank;
import org.apache.commons.io.*;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.*;
import java.util.*;
public final class Main {
public static void main(String... args) throws Exception {
//传入输入文件的路径,与输出文件的路径
String inputFile = args[0];
String outputDir = args[1];
iterate(inputFile, outputDir);
}
public static void iterate(String input, String output)
throws Exception {
//因为这个是在hadoop上运行的(hadoop jar ...),因此conf会自动配上集群上hadoop的hdfs的入口
//后面的文件可以直接找filesystem,即hdfs的文件操作类
Configuration conf = new Configuration();
Path outputPath = new Path(output);
outputPath.getFileSystem(conf).delete(outputPath, true);
outputPath.getFileSystem(conf).mkdirs(outputPath);
//建立输入文件
Path inputPath = new Path(outputPath, "input.txt");
//建立文件,返回节点个数
int numNodes = createInputFile(new Path(input), inputPath);
int iter = 1;
double desiredConvergence = 0.01;
while (true) {
//path建立时,outputpath+后面的是文件路径
Path jobOutputPath =
new Path(outputPath, String.valueOf(iter));
System.out.println("======================================");
System.out.println("= Iteration: " + iter);
System.out.println("= Input path: " + inputPath);
System.out.println("= Output path: " + jobOutputPath);
System.out.println("======================================");
//这里进行mapreduce
if (calcPageRank(inputPath, jobOutputPath, numNodes) <
desiredConvergence) {
System.out.println(
"Convergence is below " + desiredConvergence +
", we're done");
break;
}
inputPath = jobOutputPath;
iter++;
}
}
//这个类的作用是把file文件的内容加上pagerank值送到targetfile里
public static int createInputFile(Path file, Path targetFile)
throws IOException {
Configuration conf = new Configuration();
FileSystem fs = file.getFileSystem(conf);
int numNodes = getNumNodes(file);
double initialPageRank = 1.0 / (double) numNodes;
//fs调用create方法根据path对象建立文件,返回该文件流
OutputStream os = fs.create(targetFile);
//file文件的流迭代器
LineIterator iter = IOUtils
.lineIterator(fs.open(file), "UTF8");
while (iter.hasNext()) {
String line = iter.nextLine();
//获取每行的内容
String[] parts = StringUtils.split(line);
//建立node对象
Node node = new Node()
.setPageRank(initialPageRank)
.setAdjacentNodeNames(
Arrays.copyOfRange(parts, 1, parts.length));
IOUtils.write(parts[0] + '\t' + node.toString() + '\n', os);
}
os.close();
return numNodes;
}
//获取节点数量,也就是获取文件的行数
public static int getNumNodes(Path file) throws IOException {
Configuration conf = new Configuration();
FileSystem fs = file.getFileSystem(conf);
return IOUtils.readLines(fs.open(file), "UTF8").size();
}
//进行mapreduce运算
public static double calcPageRank(Path inputPath, Path outputPath, int numNodes)
throws Exception {
Configuration conf = new Configuration();
conf.setInt(Reduce.CONF_NUM_NODES_GRAPH, numNodes);
Job job = Job.getInstance(conf);
job.setJarByClass(Main.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
//输入的key和value都是文本,因此使用这个class,以第一个分隔符作为分割符号,分为key和value
job.setInputFormatClass(KeyValueTextInputFormat.class);
//map输出定义下
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job, inputPath);
FileOutputFormat.setOutputPath(job, outputPath);
if (!job.waitForCompletion(true)) {
throw new Exception("Job failed");
}
long summedConvergence = job.getCounters().findCounter(
Reduce.Counter.CONV_DELTAS).getValue();
double convergence =
((double) summedConvergence /
Reduce.CONVERGENCE_SCALING_FACTOR) /
(double) numNodes;
System.out.println("======================================");
System.out.println("= Num nodes: " + numNodes);
System.out.println("= Summed convergence: " + summedConvergence);
System.out.println("= Convergence: " + convergence);
System.out.println("======================================");
return convergence;
}
}
注意:
这个是文件流操作的方法,使用 import org.apache.commons.io.IOUtils中的IOUtils类中的方法。
还有一个Arrays方法copyOfRange,可以返回数组的指定位置,返回一个数组
OutputStream os = fs.create(targetFile);
//file文件的流迭代器
LineIterator iter = IOUtils
.lineIterator(fs.open(file), "UTF8");
while (iter.hasNext()) {
String line = iter.nextLine();
String[] parts = StringUtils.split(line);
Node node = new Node()
.setPageRank(initialPageRank)
.setAdjacentNodeNames(
Arrays.copyOfRange(parts, 1, parts.length));
IOUtils.write(parts[0] + '\t' + node.toString() + '\n', os);
}
使用readLines方法,返回的是一个String数组,每个单元里放的是每行的内容
IOUtils.readLines(fs.open(file), "UTF8").size();
TextOutPutFormat的输出的键值对可以是任何类型,输出是自动调用toString方法,把对象转为字符串输出。
使用stringUtils,截字符串为数组
String[] parts = StringUtils.splitPreserveAllTokens(
value, fieldSeparator);
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。