Flink之Kmeans
Flink 之Kmeans
kmeans.java
‘’’
package flink5;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;public class kmeans {public kmeans(){}static double[][] center = new double[4][2]; //这里有4个中心点,为2维static int[] number = new int[4]; //记录属于当前中心点的数据的个数,方便做除法static double[][] new_center = new double[4][2]; //计算出来的新中心点static double sum_x = 0;static double sum_y = 0;public static void main(String[] args) throws Exception {ArrayList<String> arrayList = new ArrayList<String>();try {File file = new File("/home/hadoop/homework4/4/centers.txt");InputStreamReader input = new InputStreamReader(new FileInputStream(file));BufferedReader bf = new BufferedReader(input);// 按行读取字符串String str;while ((str = bf.readLine()) != null) {arrayList.add(str);}bf.close();input.close();} catch (IOException e) {e.printStackTrace();}// 对ArrayList中存储的字符串进行处理 for (int i = 0; i < 4; i++) {for (int j = 0; j < 2; j++) {String s = arrayList.get(i).split(",")[j];center[i][j] = Double.parseDouble(s);}}ParameterTool params = ParameterTool.fromArgs(args);ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();env.getConfig().setGlobalJobParameters(params);DataSet<String> datas = env.readTextFile("/home/hadoop/homework4/4/k-means.dat");while(true) {for (int i = 0; i< 4;i++) //注意每次循环都需要将number[i]变为0{number[i]=0;}//将data分开,得到key: 属于某个中心点的序号(0/1/2/3),value: 与该中心点的距离DataSet<Tuple2<Integer,Tuple2<Double,Double>>> data = datas.flatMap(new distanceTokenizer());//得到key: 属于某个中心点的序号, value:新中心点的坐标//data.print();//var sum_center = data.groupBy(new groupby());//DataSet<Integer, Iterable<Tuple2<Double, Double>>> sum_center = data.groupBy(0); //???????????DataSet<Tuple2<Integer,Tuple2<Double,Double>>> Ncenter = data.groupBy(0).reduce(new ncenterTokenizer());Ncenter.print();DataSet<Tuple2<Integer,Tuple2<Double,Double>>> NewCenter = Ncenter.flatMap(new NewcenterTokenizer());NewCenter.print();//将中心点输出 double distance = 0;for(int i=0;i<4;i++) { distance += (center[i][0]-new_center[i][0])*(center[i][0]-new_center[i][0]) + (center[i][1]-new_center[i][1])*(center[i][1]-new_center[i][1]); }if(distance == 0.0) {//finishedfor(int j = 0;j<4;j++) {System.out.println("the final center: "+" "+center[j][0]+" , "+center[j][1]);}break;}else {for(int i = 0;i<4;i++) {center[i][0] = new_center[i][0];center[i][1] = new_center[i][1];new_center[i][0] = 0;new_center[i][1] = 0;System.out.println("the new center: "+" "+center[i][0]+" , "+center[i][1]);}}}env.execute();
}}
‘’’
ncenterTokenizer.java
‘’’
package flink5;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
public class ncenterTokenizer implements ReduceFunction<Tuple2<Integer, Tuple2<Double, Double>>>{public ncenterTokenizer(){}@Override
public Tuple2<Integer, Tuple2<Double, Double>> reduce(Tuple2<Integer, Tuple2<Double, Double>> value1,Tuple2<Integer, Tuple2<Double, Double>> value2) throws Exception {// TODO Auto-generated method stubSystem.out.println("***********testing**********");System.out.println(value1.f0 + " ," + value2.f0);System.out.println(value1.f1.f0 + " ,"+ value1.f1.f1);System.out.println(value2.f1.f0+" ,"+value2.f1.f1);kmeans.sum_x = value1.f1.f0 + value2.f1.f0;kmeans.sum_y = value1.f1.f1 + value2.f1.f1;//return null;return new Tuple2<Integer,Tuple2<Double,Double>>(value1.f0,new Tuple2<Double,Double>(kmeans.sum_x,kmeans.sum_y));
}
}
‘’’
NewcenterTokenizer.java
‘’’
package flink5;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;public class NewcenterTokenizer implements FlatMapFunction<Tuple2<Integer,Tuple2<Double,Double>>,Tuple2<Integer,Tuple2<Double,Double>>>{public NewcenterTokenizer(){}@Override
public void flatMap(Tuple2<Integer, Tuple2<Double, Double>> value,Collector<Tuple2<Integer, Tuple2<Double, Double>>> out) throws Exception {// TODO Auto-generated method stubSystem.out.println("+++++++++++++++++++++++++++++++++++++=");System.out.println(value.f0 + " " + value.f1.f0 + " " + value.f1.f1 +" " + kmeans.number[value.f0] + " " );double average_x = value.f1.f0/kmeans.number[value.f0] * 2;double average_y = value.f1.f1/kmeans.number[value.f0] * 2;kmeans.new_center[value.f0][0] = average_x;kmeans.new_center[value.f0][1] = average_y;out.collect(new Tuple2<Integer,Tuple2<Double,Double>>(value.f0,new Tuple2<Double,Double>(average_x,average_y)));
}
}
‘’’
distanceTokenizer.java
‘’’
package flink5;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;public class distanceTokenizer implements FlatMapFunction<String,Tuple2<Integer,Tuple2<Double,Double>>>{public distanceTokenizer(){}@Override
public void flatMap(String value, Collector<Tuple2<Integer, Tuple2<Double, Double>>> out) throws Exception {// TODO Auto-generated method stubfinal double[][] loc = kmeans.center; String[] datasplit = value.split(",");double x = Double.parseDouble(datasplit[0]);double y = Double.parseDouble(datasplit[1]);double minDistance = 99999999;int centerIndex = 0;for(int i = 0;i < 4;i++){double itsDistance = (x - loc[i][0])*(x-loc[i][0])+(y-loc[i][1])*(y-loc[i][1]);if(itsDistance < minDistance){minDistance = itsDistance;centerIndex = i; }
}kmeans.number[centerIndex]++; //得到属于4个中心点的个数out.collect(new Tuple2<Integer,Tuple2<Double, Double>>(centerIndex, new Tuple2<Double,Double>(x,y)));}}
‘’’
input
- 与spark之kmeans的输入一样
Outout
- the final center: 98.31 , 803.89
- the final center: 496.11650485436894 , 207.3398058252427
- the final center: 493.2277227722772 , 798.3267326732673
- the final center: 91.05208333333333 , 206.19791666666666
Flink之Kmeans相关推荐
- 基于flink使用K-Means算法对KDD CUP99数据集进行聚类分析
1.算法简介 kmeans算法又称k均值算法,是一种聚类算法,属于无监督学习算法. 对于给定的样本集,kmeans将其中相似的样本成员分类组织到一起,最终将样本集划分成K个簇,每个簇内的样本成员相似度 ...
- Alink漫谈(一) : 从KMeans算法实现不同看Alink设计思想
Alink漫谈(一) : 从KMeans算法实现不同看Alink设计思想 0x00 摘要 Alink 是阿里巴巴基于实时计算引擎 Flink 研发的新一代机器学习算法平台,是业界首个同时支持批式算法. ...
- Spark与Flink:对比与分析
Spark是一种快速.通用的计算集群系统,Spark提出的最主要抽象概念是弹性分布式数据集(RDD),它是一个元素集合,划分到集群的各个节点上,可以被并行操作.用户也可以让Spark保留一个RDD在内 ...
- flink sql udf jar包_Flink 生态:一个案例快速上手 PyFlink
简介: Flink 从 1.9.0 版本开始增加了对 Python 的支持(PyFlink),在刚刚发布的 Flink 1.10 中,PyFlink 添加了对 Python UDFs 的支持,现在可以 ...
- 年度回顾 | 2019 年的 Apache Flink
2019 年即将落下帷幕,这一年对于 Apache Flink 来说是非常精彩的一年,里程碑式的一年.随着这一年在邮件列表发送了超过 1 万封邮件,JIRA 中超过 4 千个 tickets,以及 G ...
- Flink之DataSet迭代计算
目录 (1)迭代计算分类与原理 (2)全量迭代计算详解 (2.1)案例分析 (2.2)案例实战 (3)增量迭代计算详解 (1)迭代计算分类与原理 迭代计算在批量数据处理过程中的应用非常广泛,如常用的机 ...
- flink与spark的区别----阅读笔记1
Flink简介 spark基本架构 flink基本架构 Spark提出的最主要抽象概念是弹性分布式数据集(RDD) flink支持增量迭代计算.基于流执行引擎,Flink提供了诸多更高抽象层的API以 ...
- 大数据流式处理框架Flink介绍
1.Flink的介绍 随着数据的飞速发展,出现了很多热门的开源社区,比如:hadoop.spark.storm社区,他们都有各自专注的适用场景,比如hadoop主要是做数据的存储及批处理计算,spar ...
- Alink使用入门,基于flink的机器学习
一.什么是 Alink? Alink 是阿里巴巴计算平台事业部PAI团队从 2017 年开始基于实时计算引擎 Flink 研发的新一代机器学习算法平台,提供丰富的算法组件库和便捷的操作框架,开发者 ...
最新文章
- win7下不能替换系统文件的解决办法
- 1001 A+B Format (20分)——12行代码AC
- ThriftParserError: ThriftPy does not support generating module with path in protocol 'd'
- 计算机编程ebcdic码,EBCDIC 与 ASCII 编码相互转换
- goland 修改.gitignore无效问题
- 明晚直播预告丨一则ORA-600案例分析
- python numpy 写入、读取 .npz 压缩文件
- css行内元素和块级元素
- 使用CROS解决跨域问题
- 声学信号频谱图分类(十三)
- 【Unity3D插件】VOXL插件分享《多人沙盒游戏插件》
- 仿生机制算法——细胞吸引子模型(附Matlab代码)
- 计算机专业英语常用词汇整理
- 外挂原理之植物大战僵尸
- 三年级下册计算机课程工作计划,三年级数学下册教学工作计划
- 201771010101 白玛次仁 《2018面向对象程序设计(Java)》第十六周学习总结
- rx.xxx 和 io.reactivex.xxx RxJava1 和 RxJava2 和 RxJava3
- dos命令根据大小查询文件
- 入驻华为云·云享专家了?!
- php文件后面有bak,bak文件查看器
热门文章
- 什么叫做“程序计数器”?它能做什么?
- 安全测试都不敢写精通,还敢要25K?
- c语言ofstream头文件,【c++】c++中的ofstream和ifstream
- 计算机网络-传输层:UDP协议
- 【文本分析】基于公众需求文本分析的深圳自然博物馆发展策略研究
- meltdown linux 内核,内核开发者称应更新Linux内核应对 Meltdown 和 Spectre漏洞
- C# listView 增 改 删 查
- Win7安装Ubuntu虚拟机异常处理:FATAL: NO bootable medium found! System halted
- js点击按钮div显示隐藏
- CMake——cmake_minimum_required