MapReduce实现kmeans算法
先来介绍一下kmeans算法的思想:(以下是我自己的理解):
1.先随机选取几个中心点作为初始的中心点
2.然后根据每个点到这些点的距离远近,找到最近的那个点作为它的中心点,将这些点进行分类。
3.计算每一类的点形成的新的中心点。
重复2,3步。
hadoop中mapreduce的代码我分成了4部分,是有一点的繁琐,但是比较好理解。
第一部分:kmeansInit:
这部分就是设置了初始的三个点的坐标,在map中对这些点进行分类,在reduce中对点重新计算中心点。
map的的输入:
每一行的key是默认的偏移值,value是一行的点。
reduce的输入是:
前面是第几类,后面是点的坐标。
而输出是:
key是新的中心点,value是属于这个类的点的列表。
第二部分:kmeansIter
第二部分其实跟第一部分差别不大,不过增加了一个setup方法,这个方法是为了将上一次的迭代的中心点从文件中读取出来然后保存到数组里,方便后面的使用。而map和reduce跟之前的一样。下面详细的说明一下setup。
setup:用缓存的方式读取上次产生的文件
输入格式为:
所以用每一行的“\t”做分割,前面是新的中心点坐标,读取出来保存到数组里即可,后面map在使用就可以了。
第三部分:kmeansViewer
其实这部分也跟前面的差别不大,主要是因为输入只是每个点的类别,所以我们不用之前的reduce了,setup和map与之前的一样,而reduce的功能就是输出。
reduce:
输入依然是这样的形式:
输出的key是map输出的key,value是map的输出的value,因为可能在map时会有合并的项
-_-||(发现这里其实可以不要reduce的,用默认的就行)
最后一部分:kmeansDriver
就是调度的部分,整个程序执行一次kmeansInit,数次kmeansIter,一次kmeansViewer。
附上四部分的代码:
一:kmeansInit
//package org.apache.hadoop.examples;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;//import GraphBuilder.GraphBuilderMapper;
//import GraphBuilder.GraphBuilderReducer;import java.io.IOException;
import java.net.URI;
import java.util.StringTokenizer;/*** MapReduce开发WordCount应用程序* */
public class kmeansInit {/**** Map:读取输入的文件* */public static class MyMapper extends Mapper<LongWritable, Text, Text, Text> {//private final IntWritable one = new IntWritable(1); private Text word = new Text();//String dd=new String();double[][] k={{467,41},{500,334},{724,169}};int[] kind=new int[20000];int count=0;@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//获得每行数据String line = value.toString();double vertice_x=Double.valueOf(line.split(",")[0]);double vertice_y=Double.valueOf(line.split(",")[1]);double distance=10000000.0;for(int i=0;i<3;i++)//{double temp=Math.pow((k[i][0]-vertice_x),2)+Math.pow((k[i][1]-vertice_y),2);if(temp<distance){distance=temp;kind[count]=i;}}//分割字符串//设置map输出的key值word.set(String.valueOf(kind[count]));//System.out.println("hh");//上下文输出map的key和valuecontext.write(word, value);count++;}}/*** Reduce* *//* public static class MyReducer extends Reducer<Text, Text, Text, Text> {@Overrideprotected void reduce(Text key, Text<Text> values, Context context) throws IOException, InterruptedException {//用于累加的变量int sum = 0;for(IntWritable value: values) {sum += value.get();}context.write(key, new IntWritable(sum));}}*/public static class MyReducer extendsReducer<Text, Text, Text, Text> {public void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException {//context.write(key, values);double dis_x=0.0;double dis_y=0.0;int count=0;String temp="";for(Text value:values){dis_x+=Double.valueOf(value.toString().split(",")[0]);dis_y+=Double.valueOf(value.toString().split(",")[1]);temp+=value.toString()+"#";count++;}context.write(new Text(String.valueOf(dis_x/(count))+","+String.valueOf(dis_y/(count))),new Text(temp));}}/*** 定义Driver类* */public static void main(String[] args) throws Exception {Configuration conf = new Configuration(); final FileSystem filesystem = FileSystem.get(new URI(args[0]),conf);final Path outPath = new Path(args[1]);if(filesystem.exists(outPath)){filesystem.delete(outPath, true);}Job job1 = new Job(conf, "Graph Builder");job1.setJarByClass(kmeansInit.class);job1.setOutputKeyClass(Text.class);job1.setOutputValueClass(Text.class);job1.setMapperClass(MyMapper.class);job1.setReducerClass(MyReducer.class);FileInputFormat.addInputPath(job1, new Path(args[0]));FileOutputFormat.setOutputPath(job1, new Path(args[1]));job1.waitForCompletion(true);}}
二:kmeansIter
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.StringTokenizer;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class kmeansIter {private static final double damping = 0.85;public static class PRIterMapper extendsMapper<LongWritable, Text, Text, Text> {int count=0;int[] kind=new int[20000];Text word=new Text();private Path[] localFiles;static double[][] k={{0.0,0.0},{0.0,0.0},{0.0,0.0}};//private String pathfile = "hdfs://localhost:9000/user/hadoop/output1/part-r-00000";//String [] points=new String[20];public void setup(Context context) throws IOException, InterruptedException {Configuration conf = context.getConfiguration();//URI[] localCacheFiles = context.getCacheFiles();localFiles = DistributedCache.getLocalCacheFiles(conf); // 获得停词表//FileSystem fs =FileSystem.get(context.getConfiguration());// FSDataInputStream in = fs.open(new Path(pathfile));for (int i = 0; i < localFiles.length; i++) {String line;// BufferedReader br = new BufferedReader(new InputStreamReader(in));// BufferedReader br =// new BufferedReader(new FileReader(localCacheFiles[0].getPath()));BufferedReader br =new BufferedReader(new FileReader(localFiles[i].toString()));while ((line = br.readLine()) != null) {//StringTokenizer itr = new StringTokenizer(line);//while (itr.hasMoreTokens()) { //String temp=itr.nextToken();double point_x=Double.valueOf(line.split("\t")[0].split(",")[0]);double point_y=Double.valueOf(line.split("\t")[0].split(",")[1]);k[count][0]=point_x;k[count][1]=point_y;count++;//System.out.println("hh");//}}}count=0;} public void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {//获得每行数据String line = value.toString();double vertice_x=Double.valueOf(line.split(",")[0]);double vertice_y=Double.valueOf(line.split(",")[1]);double distance=1000000000.0;for(int i=0;i<3;i++)//{double temp=Math.pow((k[i][0]-vertice_x),2)+Math.pow((k[i][1]-vertice_y),2);if(temp<distance){distance=temp;kind[count]=i;}}//分割字符串//设置map输出的key值word.set(String.valueOf(kind[count]));//System.out.println("hh");//上下文输出map的key和valuecontext.write(word, value);count++;}}public static class PRIterReducer extends Reducer<Text, Text, Text, Text> {public void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException {double dis_x=0.0;double dis_y=0.0;int count=0;String temp="";for(Text value:values){dis_x+=Double.valueOf(value.toString().split(",")[0]);dis_y+=Double.valueOf(value.toString().split(",")[1]);temp+=value.toString()+"#";count++;}context.write(new Text(String.valueOf(dis_x/(count))+","+String.valueOf(dis_y/(count))),new Text(temp));}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();DistributedCache.addCacheFile(new URI(args[2]), conf);// 设置停词列表文档作为当前作业的缓存文件final FileSystem filesystem = FileSystem.get(new URI(args[0]),conf);final Path outPath = new Path(args[1]);if(filesystem.exists(outPath)){filesystem.delete(outPath, true);}Job job2 = new Job(conf, "PageRankIter"); //job2.addCacheFile(new Path("hdfs://localhost:9000/input").toUri());job2.setJarByClass(kmeansIter.class);job2.setOutputKeyClass(Text.class);job2.setOutputValueClass(Text.class);job2.setMapperClass(PRIterMapper.class);//job2.setCombinerClass(myCombine.class);job2.setReducerClass(PRIterReducer.class);FileInputFormat.addInputPath(job2, new Path(args[0]));FileOutputFormat.setOutputPath(job2, new Path(args[1]));job2.waitForCompletion(true);}
}
三:kmeansViewer
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.StringTokenizer;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class kmeansViewer1 {private static final double damping = 0.85;static double[][] k={{0.0,0.0},{0.0,0.0},{0.0,0.0}};public static class PRIterMapper extendsMapper<LongWritable, Text, Text, Text> {int count=0;int[] kind=new int[20000];Text word=new Text();private Path[] localFiles;//private String pathfile = "hdfs://localhost:9000/user/hadoop/output1/part-r-00000";//String [] points=new String[20];public void setup(Context context) throws IOException, InterruptedException {Configuration conf = context.getConfiguration();//URI[] localCacheFiles = context.getCacheFiles();localFiles = DistributedCache.getLocalCacheFiles(conf); // 获得停词表//FileSystem fs =FileSystem.get(context.getConfiguration());// FSDataInputStream in = fs.open(new Path(pathfile));for (int i = 0; i < localFiles.length; i++) {String line;// BufferedReader br = new BufferedReader(new InputStreamReader(in));// BufferedReader br =// new BufferedReader(new FileReader(localCacheFiles[0].getPath()));BufferedReader br =new BufferedReader(new FileReader(localFiles[i].toString()));while ((line = br.readLine()) != null) {//StringTokenizer itr = new StringTokenizer(line);//while (itr.hasMoreTokens()) { //String temp=itr.nextToken();double point_x=Double.valueOf(line.split("\t")[0].split(",")[0]);double point_y=Double.valueOf(line.split("\t")[0].split(",")[1]);k[count][0]=point_x;k[count][1]=point_y;count++;//System.out.println("hh");//}}}count=0;} public void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {//获得每行数据String line = value.toString();double vertice_x=Double.valueOf(line.split(",")[0]);double vertice_y=Double.valueOf(line.split(",")[1]);double distance=100000000.0;for(int i=0;i<3;i++)//{double temp=Math.pow((k[i][0]-vertice_x),2)+Math.pow((k[i][1]-vertice_y),2);if(temp<distance){distance=temp;kind[count]=i;}}//分割字符串//设置map输出的key值word.set(String.valueOf(kind[count]));//System.out.println("hh");//上下文输出map的key和valuecontext.write(word, value);count++;}}public static class PRIterReducer extends Reducer<Text, Text, Text, Text> {public void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException {for(Text value:values){context.write(key, value);}}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();DistributedCache.addCacheFile(new URI(args[2]), conf);// 设置停词列表文档作为当前作业的缓存文件final FileSystem filesystem = FileSystem.get(new URI(args[0]),conf);final Path outPath = new Path(args[1]);if(filesystem.exists(outPath)){filesystem.delete(outPath, true);}Job job2 = new Job(conf, "kmeansViewer1"); //job2.addCacheFile(new Path("hdfs://localhost:9000/input").toUri());job2.setJarByClass(kmeansViewer1.class);job2.setOutputKeyClass(Text.class);job2.setOutputValueClass(Text.class);job2.setMapperClass(PRIterMapper.class);//job2.setCombinerClass(myCombine.class);job2.setReducerClass(PRIterReducer.class);FileInputFormat.addInputPath(job2, new Path(args[0]));FileOutputFormat.setOutputPath(job2, new Path(args[1]));job2.waitForCompletion(true);}
}
四:kmeansDriver
public class kmeansDriver {private static int times = 20; // 设置迭代次数public static void main(String[] args) throws Exception {String[] forGB = { "", args[1] + "/Data0" };forGB[0] = args[0];kmeansInit.main(forGB);String[] forItr = { "", "","" };for (int i = 0; i < times; i++) {forItr[0] = args[0];forItr[1] = args[1] + "/Data" + String.valueOf(i + 1);forItr[2]=args[1]+"/Data"+i+"/part-r-00000";kmeansIter.main(forItr);}String[] forRV = { args[0],args[1] + "/FinalRank",args[1] + "/Data" + times+"/part-r-00000" };kmeansViewer1.main(forRV);}
}
MapReduce实现kmeans算法相关推荐
- hadoop下实现kmeans算法——一个mapreduce的实现方法
写mapreduce程序实现kmeans算法,我们的思路可能是这样的 1. 用一个全局变量存放上一次迭代后的质心 2. map里,计算每个质心与样本之间的距离,得到与样本距离最短的质心,以这个质心作为 ...
- KMeans算法的Mapreduce实现
Hive数据分析... 4 一.数据处理.... 4 1.1处理不符合规范的数据.... 4 1.2访问时间分段.... 5 二.基本统计信息.... 6 三.数据属性基础分析.... 6 3.1用户 ...
- 用Hadoop1.0.3实现KMeans算法
从理论上来讲用MapReduce技术实现KMeans算法是很Natural的想法:在Mapper中逐个计算样本点离哪个中心最近,然后Emit(样本点所属的簇编号,样本点):在Reducer中属于同一个 ...
- hadoop下的Kmeans算法实现
前一段时间,从配置Hadoop到运行kmeans的mapreduce程序,着实让我纠结了几天,昨天终于把前面遇到的配置问题和程序运行问题搞定.Kmeans算法看起来很简单,但对于第一次接触mapred ...
- Hadoop 实现kmeans 算法
关于kmeans说在前面:kmeans算法有一个硬性的规定就是簇的个数要提前设定.大家可能会质疑这个限制是否影响聚类效果,但是这种担心是多余的.在该算法诞生的这么多年里,该算法已被证明能够广泛的用于解 ...
- Kmeans算法总结
1. 定义 Kmeans算法的过程较为简单 1.从D中随机取k个元素,作为k个簇的各自的中心. 2.分别计算剩下的元素到k个簇中心的相异度,将这些元素分别划归到相异度最低的簇. 3.根据聚类结果,重新 ...
- 机器学习中的聚类算法(1):k-means算法
一文详解激光点云的物体聚类:https://mp.weixin.qq.com/s/FmMJn2qjtylUMRGrD5telw 引言: Q:什么是聚类算法? 现在我们在做的深度学习当中,比如图像的识别 ...
- python实现K-means算法
K-means算法流程: 随机选k个样本作为初始聚类中心 计算数据集中每个样本到k个聚类中心距离,并将其分配到距离最小的聚类中心 对于每个聚类,重新计算中心 回到2,至得到局部最优解 python代码 ...
- Python之机器学习K-means算法实现
一.前言: 今天在宿舍弄了一个下午的代码,总算还好,把这个东西算是熟悉了,还不算是力竭,只算是知道了怎么回事.今天就给大家分享一下我的代码.代码可以运行,运行的Python环境是Python3.6以上 ...
- matlab 职坐标,机器学习入门之机器学习实战ByMatlab(四)二分K-means算法
本文主要向大家介绍了机器学习入门之机器学习实战ByMatlab(四)二分K-means算法,通过具体的内容向大家展现,希望对大家学习机器学习入门有所帮助.前面我们在是实现K-means算法的时候,提到 ...
最新文章
- Spark DataFrame 添加自增id
- NUTCH的安装与测试
- 统计学:回归分析(2)
- redis stream学习总结
- 放大电路频率响应基础概念
- 古风一棵桃花树简笔画_为什么,很多农村家庭的院子里,会喜欢种一棵樱桃树呢?...
- TriCore处理器的上下文切换原理
- 计数器:counter
- 4.Linux性能诊断 --- Linux工作流程内存管理
- 年前的面试经历(二)
- 企业竞争竞争情报系统的流程整合
- win10 手动设置 DNS 地址
- unbuntu 安装jdk
- Windows Diskpart命令详解
- 简述关系数据库的数据完整性规则_关系数据库的完整性简述 关系数据库完整性规则...
- 微信支付一 :公众号支付1
- 管理系统菜单父子结构,有parentId,快速获取结构树代码
- 租房/搬家必备物品清单
- 通达信20个经典公式_20个欧美时尚达人高领毛衣+大衣造型集锦:冬季最高级的穿搭公式...
- 利用Serv-u提权的简单思路
热门文章
- 物料标识单的制作方法
- 代码该怎么写——设计原则
- Paxos 实现日志复制同步(Basic Paxos)
- java pdf 水印_java pdf加水印的方法
- Android 反编译之smail
- 迅雷手机版苹果版_迅雷IOS版恢复下载,又能用苹果手机下载电影愉快追剧了
- 模拟银行排队叫号机 2011.04.18
- IstioCon 2022 报名中|使用 eBPF 代替 iptable 加速 Istio 数据平面
- 将本地项目上传到码云
- 产品必备技能(十):如何绘制原型图?先要绘制页面流程图+页面原型图!附实操原型图、流程图实例(抖音)