Flink 之Kmeans

kmeans.java

‘’’

package flink5;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;public class kmeans {public kmeans(){}static double[][] center = new double[4][2];  //这里有4个中心点,为2维static int[] number = new int[4];           //记录属于当前中心点的数据的个数,方便做除法static double[][] new_center = new double[4][2];    //计算出来的新中心点static double sum_x = 0;static double sum_y = 0;public static void main(String[] args) throws Exception {ArrayList<String> arrayList = new ArrayList<String>();try {File file = new File("/home/hadoop/homework4/4/centers.txt");InputStreamReader input = new InputStreamReader(new FileInputStream(file));BufferedReader bf = new BufferedReader(input);// 按行读取字符串String str;while ((str = bf.readLine()) != null) {arrayList.add(str);}bf.close();input.close();} catch (IOException e) {e.printStackTrace();}// 对ArrayList中存储的字符串进行处理 for (int i = 0; i < 4; i++) {for (int j = 0; j < 2; j++) {String s = arrayList.get(i).split(",")[j];center[i][j] = Double.parseDouble(s);}}ParameterTool params = ParameterTool.fromArgs(args);ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();env.getConfig().setGlobalJobParameters(params);DataSet<String> datas = env.readTextFile("/home/hadoop/homework4/4/k-means.dat");while(true) {for (int i = 0; i< 4;i++)           //注意每次循环都需要将number[i]变为0{number[i]=0;}//将data分开,得到key: 属于某个中心点的序号(0/1/2/3),value: 与该中心点的距离DataSet<Tuple2<Integer,Tuple2<Double,Double>>> data = datas.flatMap(new distanceTokenizer());//得到key: 属于某个中心点的序号, value:新中心点的坐标//data.print();//var sum_center = data.groupBy(new groupby());//DataSet<Integer, Iterable<Tuple2<Double, Double>>> sum_center = data.groupBy(0);   //???????????DataSet<Tuple2<Integer,Tuple2<Double,Double>>> Ncenter = data.groupBy(0).reduce(new ncenterTokenizer());Ncenter.print();DataSet<Tuple2<Integer,Tuple2<Double,Double>>> NewCenter = Ncenter.flatMap(new NewcenterTokenizer());NewCenter.print();//将中心点输出 double distance = 0;for(int i=0;i<4;i++) {      distance += (center[i][0]-new_center[i][0])*(center[i][0]-new_center[i][0]) + (center[i][1]-new_center[i][1])*(center[i][1]-new_center[i][1]);    }if(distance == 0.0) {//finishedfor(int j = 0;j<4;j++) {System.out.println("the final center: "+"  "+center[j][0]+" , "+center[j][1]);}break;}else {for(int i = 0;i<4;i++) {center[i][0] = new_center[i][0];center[i][1] = new_center[i][1];new_center[i][0] = 0;new_center[i][1] = 0;System.out.println("the new center: "+"  "+center[i][0]+" , "+center[i][1]);}}}env.execute();
}}

‘’’

ncenterTokenizer.java

‘’’

package flink5;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
public class ncenterTokenizer implements ReduceFunction<Tuple2<Integer, Tuple2<Double, Double>>>{public ncenterTokenizer(){}@Override
public Tuple2<Integer, Tuple2<Double, Double>> reduce(Tuple2<Integer, Tuple2<Double, Double>> value1,Tuple2<Integer, Tuple2<Double, Double>> value2) throws Exception {// TODO Auto-generated method stubSystem.out.println("***********testing**********");System.out.println(value1.f0 + " ," + value2.f0);System.out.println(value1.f1.f0 + " ,"+ value1.f1.f1);System.out.println(value2.f1.f0+" ,"+value2.f1.f1);kmeans.sum_x = value1.f1.f0 + value2.f1.f0;kmeans.sum_y = value1.f1.f1 + value2.f1.f1;//return null;return new Tuple2<Integer,Tuple2<Double,Double>>(value1.f0,new Tuple2<Double,Double>(kmeans.sum_x,kmeans.sum_y));
}
}

‘’’

NewcenterTokenizer.java

‘’’

package flink5;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;public class NewcenterTokenizer implements FlatMapFunction<Tuple2<Integer,Tuple2<Double,Double>>,Tuple2<Integer,Tuple2<Double,Double>>>{public NewcenterTokenizer(){}@Override
public void flatMap(Tuple2<Integer, Tuple2<Double, Double>> value,Collector<Tuple2<Integer, Tuple2<Double, Double>>> out) throws Exception {// TODO Auto-generated method stubSystem.out.println("+++++++++++++++++++++++++++++++++++++=");System.out.println(value.f0 + " " + value.f1.f0 + " " + value.f1.f1 +"   " + kmeans.number[value.f0] + " " );double average_x = value.f1.f0/kmeans.number[value.f0] * 2;double average_y = value.f1.f1/kmeans.number[value.f0] * 2;kmeans.new_center[value.f0][0] = average_x;kmeans.new_center[value.f0][1] = average_y;out.collect(new Tuple2<Integer,Tuple2<Double,Double>>(value.f0,new Tuple2<Double,Double>(average_x,average_y)));
}
}

‘’’

distanceTokenizer.java

‘’’

package flink5;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;public class distanceTokenizer implements FlatMapFunction<String,Tuple2<Integer,Tuple2<Double,Double>>>{public distanceTokenizer(){}@Override
public void flatMap(String value, Collector<Tuple2<Integer, Tuple2<Double, Double>>> out) throws Exception {// TODO Auto-generated method stubfinal double[][] loc = kmeans.center;  String[] datasplit = value.split(",");double x = Double.parseDouble(datasplit[0]);double y = Double.parseDouble(datasplit[1]);double minDistance = 99999999;int centerIndex = 0;for(int i = 0;i < 4;i++){double itsDistance = (x - loc[i][0])*(x-loc[i][0])+(y-loc[i][1])*(y-loc[i][1]);if(itsDistance < minDistance){minDistance = itsDistance;centerIndex = i; }
}kmeans.number[centerIndex]++;        //得到属于4个中心点的个数out.collect(new Tuple2<Integer,Tuple2<Double, Double>>(centerIndex, new Tuple2<Double,Double>(x,y)));}}

‘’’

input

  • 与spark之kmeans的输入一样

Outout

  • the final center: 98.31 , 803.89
  • the final center: 496.11650485436894 , 207.3398058252427
  • the final center: 493.2277227722772 , 798.3267326732673
  • the final center: 91.05208333333333 , 206.19791666666666

Flink之Kmeans相关推荐

  1. 基于flink使用K-Means算法对KDD CUP99数据集进行聚类分析

    1.算法简介 kmeans算法又称k均值算法,是一种聚类算法,属于无监督学习算法. 对于给定的样本集,kmeans将其中相似的样本成员分类组织到一起,最终将样本集划分成K个簇,每个簇内的样本成员相似度 ...

  2. Alink漫谈(一) : 从KMeans算法实现不同看Alink设计思想

    Alink漫谈(一) : 从KMeans算法实现不同看Alink设计思想 0x00 摘要 Alink 是阿里巴巴基于实时计算引擎 Flink 研发的新一代机器学习算法平台,是业界首个同时支持批式算法. ...

  3. Spark与Flink:对比与分析

    Spark是一种快速.通用的计算集群系统,Spark提出的最主要抽象概念是弹性分布式数据集(RDD),它是一个元素集合,划分到集群的各个节点上,可以被并行操作.用户也可以让Spark保留一个RDD在内 ...

  4. flink sql udf jar包_Flink 生态:一个案例快速上手 PyFlink

    简介: Flink 从 1.9.0 版本开始增加了对 Python 的支持(PyFlink),在刚刚发布的 Flink 1.10 中,PyFlink 添加了对 Python UDFs 的支持,现在可以 ...

  5. 年度回顾 | 2019 年的 Apache Flink

    2019 年即将落下帷幕,这一年对于 Apache Flink 来说是非常精彩的一年,里程碑式的一年.随着这一年在邮件列表发送了超过 1 万封邮件,JIRA 中超过 4 千个 tickets,以及 G ...

  6. Flink之DataSet迭代计算

    目录 (1)迭代计算分类与原理 (2)全量迭代计算详解 (2.1)案例分析 (2.2)案例实战 (3)增量迭代计算详解 (1)迭代计算分类与原理 迭代计算在批量数据处理过程中的应用非常广泛,如常用的机 ...

  7. flink与spark的区别----阅读笔记1

    Flink简介 spark基本架构 flink基本架构 Spark提出的最主要抽象概念是弹性分布式数据集(RDD) flink支持增量迭代计算.基于流执行引擎,Flink提供了诸多更高抽象层的API以 ...

  8. 大数据流式处理框架Flink介绍

    1.Flink的介绍 随着数据的飞速发展,出现了很多热门的开源社区,比如:hadoop.spark.storm社区,他们都有各自专注的适用场景,比如hadoop主要是做数据的存储及批处理计算,spar ...

  9. Alink使用入门,基于flink的机器学习

    一.什么是 Alink? ​ Alink 是阿里巴巴计算平台事业部PAI团队从 2017 年开始基于实时计算引擎 Flink 研发的新一代机器学习算法平台,提供丰富的算法组件库和便捷的操作框架,开发者 ...

最新文章

  1. win7下不能替换系统文件的解决办法
  2. 1001 A+B Format (20分)——12行代码AC
  3. ThriftParserError: ThriftPy does not support generating module with path in protocol 'd'
  4. 计算机编程ebcdic码,EBCDIC 与 ASCII 编码相互转换
  5. goland 修改.gitignore无效问题
  6. 明晚直播预告丨一则ORA-600案例分析
  7. python numpy 写入、读取 .npz 压缩文件
  8. css行内元素和块级元素
  9. 使用CROS解决跨域问题
  10. 声学信号频谱图分类(十三)
  11. 【Unity3D插件】VOXL插件分享《多人沙盒游戏插件》
  12. 仿生机制算法——细胞吸引子模型(附Matlab代码)
  13. 计算机专业英语常用词汇整理
  14. 外挂原理之植物大战僵尸
  15. 三年级下册计算机课程工作计划,三年级数学下册教学工作计划
  16. 201771010101 白玛次仁 《2018面向对象程序设计(Java)》第十六周学习总结
  17. rx.xxx 和 io.reactivex.xxx RxJava1 和 RxJava2 和 RxJava3
  18. dos命令根据大小查询文件
  19. 入驻华为云·云享专家了?!
  20. php文件后面有bak,bak文件查看器

热门文章

  1. 什么叫做“程序计数器”?它能做什么?
  2. 安全测试都不敢写精通,还敢要25K?
  3. c语言ofstream头文件,【c++】c++中的ofstream和ifstream
  4. 计算机网络-传输层:UDP协议
  5. 【文本分析】基于公众需求文本分析的深圳自然博物馆发展策略研究
  6. meltdown linux 内核,内核开发者称应更新Linux内核应对 Meltdown 和 Spectre漏洞
  7. C# listView 增 改 删 查
  8. Win7安装Ubuntu虚拟机异常处理:FATAL: NO bootable medium found! System halted
  9. js点击按钮div显示隐藏
  10. CMake——cmake_minimum_required