2019独角兽企业重金招聘Python工程师标准>>>

spark Core的使用基础知识

     rdd为spark的一个分布式数据源的计算的抽象

sparkContext为spark环境上下文用于保持集群连接,创建RDD 并行数据 accumular boardcast变量 用户创建spark job作业

SparkConf conf = new SparkConf().setAppName("indeximage").setMaster("local");JavaSparkContext context = new JavaSparkContext(conf);JavaPairRDD<String, PortableDataStream> imagefiles=context.binaryFiles("C:/baidu/features/sift", 2);

RDD是个分布式不变的抽象数据计算源 ,被划分多个分区,并在多台机器上分布计算,懒加载模式,当rdd开始计算时候才会加载来源使用,所以使用时候rdd 可以设置store级别,可以存储在内存 磁盘 或者 系列化 等几种方式 默认cache就是存储在内存中 其他几种见storagelevel

JavaRDD<File> imagefiles=context.parallelize(getFiles("C:/baidu/imagetest/"), 2);//.binaryFiles("C:/baidu/imagetest/*").coalesce(1);imagefiles.persist(StorageLevel.MEMORY_ONLY());

它能够从多种数据来源加载 常用的数组 文件 以及 hadoop格式文件(可以用来自定义后面叫大家创建自定义)

context.textFile(path)

context.paralllize/binarryFile

spark有很多行为操作,这里不一一介绍了 只介绍 几种常用的 ,比如

1,map  map其实就是一个转换 从一个格式 转换从 另一种格式的操作

2,flatMap 同map基本一致,但是flatmap 返回的是一个数组的格式

3.filter 主要是用来过滤掉rdd中 的数据

另外javaRDD 与 javaPairRDD 基本无非就是List 与 map的区别,分布式上 key value的转换

所以会javaPairRDD会涉及一些group by key combie key value的操作

由于就是key value  map可以相当于list 所以我这里只讲一下List的情况,其他都是类似的方法加上byKey values等等关键字

distinct()  同sql 一样 去重
union()  因为rdd是不可变 所以外部添加联合
intersection() 交集
subtract() 差集
cartesian() 笛卡尔积
reduce() reduce运算 把多个rdd的的元素最后合并成一个值
aggregate()/fold() 同上面类似,不过可以设定初值 以及 聚合后的值类型可以与合并前的不一样,比如多个integer 聚合成double
collect()  把RDD中所有元素 最后聚合汇总到主分区中 返回List 或者 map
foreach 故名思意 遍历 rdd的元素 并进行操作,比如遍历元素把它存储到Hbase中
cogroup、join 前者就是全连接 后者 就是内关联
count 同sql一样统计数目
coalesce、repartition 意义一样 都是设定RDD分区数目

Spark 计数器 同hadoop中的couter差不多 ,不过主要是外部最后使用 ,内部中不能使用实时的值,智能使用localValue

Accumulator<Integer> counter = sc.accumulator(0);
counter.add(1);

spark 共享数据变量

很多情况需要数据变量进行共享

Broadcast<Object> share = sc.broadcast(object)
发送广播通知 会实时更新,在内部可以获取调用

Spark的MLIB的使用基础知识

本地向量

本地向量的基类是 Vector,我们提供了两个实现 DenseVector 和 SparseVector。我们建议通过 Vectors中实现的工厂方法来创建本地向量:(注意:Scala语言默认引入的是 scala.collection.immutable.Vector,为了使用MLlib的Vector,你必须显示引入org.apache.spark.mllib.linalg.Vector。)

import org.apache.spark.mllib.linalg.{Vector, Vectors}// Create a dense vector (1.0, 0.0, 3.0).
val dv: Vector = Vectors.dense(1.0, 0.0, 3.0)// Create a sparse vector (1.0, 0.0, 3.0) by specifying its indices and values
corresponding to nonzero entries.val sv1: Vector = Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0))// Create a sparse vector (1.0, 0.0, 3.0) by specifying its nonzero entries.
val sv2: Vector = Vectors.sparse(3, Seq((0, 1.0), (2, 3.0)))

2.含类标签的点

含有类标签的点通过case class LabeledPoint来表示。

import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint// Create a labeled point with a positive label and a dense feature vector.
val pos = LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0))// Create a labeled point with a negative label and a sparse feature vector.
val neg = LabeledPoint(0.0, Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0)))

3.稀疏数据Sparse data

实际运用中,稀疏数据是很常见的。MLlib可以读取以LIBSVM格式存储的训练实例,LIBSVM格式是 LIBSVM 和 LIBLINEAR的默认格式,这是一种文本格式,每行代表一个含类标签的稀疏特征向量。格式如下:

label index1:value1 index2:value2 ...

索引是从 1 开始并且递增。加载完成后,索引被转换为从 0 开始。

通过 MLUtils.loadLibSVMFile读取训练实例并以LIBSVM 格式存储。

import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.rdd.RDDval examples: RDD[LabeledPoint] = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")

4.本地矩阵

一个本地矩阵由整型的行列索引数据和对应的 double 型值数据组成,存储在某一个机器中。MLlib 支持密集矩阵(暂无稀疏矩阵!),实体值以列优先的方式存储在一个 double数组中。

本 地 矩 阵 的 基 类 是 Matrix , 我 们 提 供 了 一 个 实 现 DenseMatrix 。 我 们 建 议 通过 Matrices 中实现的工厂方法来创建本地矩阵:

import org.apache.spark.mllib.linalg.{Matrix, Matrices}
// Create a dense matrix ((1.0, 2.0), (3.0, 4.0), (5.0, 6.0))val dm: Matrix = Matrices.dense(3, 2, Array(1.0, 3.0, 5.0, 2.0, 4.0, 6.0))

5.分布式矩阵

一个分布式矩阵由 long 型行列索引数据和对应的 double 型值数据组成,分布式存储在一个或多个 RDD 中。对于巨大的分布式的矩阵来说,选择正确的存储格式非常重要。将一个分布式矩阵转换为另一个不同格式需要全局洗牌(shuffle),所以代价很高。目前,实现了三类分布式矩阵存储格式。最基本的类型是 RowMatrix。一个 RowMatrix 是一个面向行的分布式矩阵,其行索引是没有具体含义的。比如一系列特征向量的一个集合。通过一个 RDD 来代表所有的行,每一行就是一个本地向量。对于 RowMatrix,我们假定其列数量并不巨大,所以一个本地向量可以恰当的与驱动节点(driver)交换信息,并且能够在某一节点中存储和操作。

IndexedRowMatrix 与 RowMatrix 相似,但有行索引,可以用来识别行和进行 join 操作。而 CoordinateMatrix 是一个以三元组列表格式(coordinate list ,COO)存储的分布式矩阵,其实体集合是一个 RDD。注 意 : 因 为 我 们 需 要 缓 存 矩 阵 大 小 , 分 布 式 矩 阵 的 底 层 RDD 必 须 是 确 定 的(deterministic)。通常来说,使用非确定的 RDD(non-deterministic RDDs)会导致错误。

5.1 面向行的分布式矩阵(RowMatrix)

一个 RowMatrix 是一个面向行的分布式矩阵,其行索引是没有具体含义的。比如一系列特征向量的一个集合。通过一个 RDD 来代表所有的行,每一行就是一个本地向量。既然每一行由一个本地向量表示,所以其列数就被整型数据大小所限制,其实实践中列数是一个很小的数值。

一个 RowMatrix可从一个RDD[Vector]实例创建。然后我们可以计算出其概要统计信息。

import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.linalg.distributed.RowMatrixval rows: RDD[Vector] = ... // an RDD of local vectors// Create a RowMatrix from an RDD[Vector].
val mat: RowMatrix = new RowMatrix(rows)// Get its size.
val m = mat.numRows()
val n = mat.numCols()

5.2行索引矩阵(IndexedRowMatrix)

IndexedRowMatrix 与 RowMatrix 相似,但其行索引具有特定含义,本质上是一个含有索引信息的行数据集合(an RDD of indexed rows)。每一行由 long 型索引和一个本地向量组成。一个 IndexedRowMatrix可从一个RDD[IndexedRow]实例创建,这里的 IndexedRow是 (Long, Vector) 的 封 装 类 。 剔 除 IndexedRowMatrix 中 的 行 索 引 信 息 就 变 成 一 个RowMatrix。

import org.apache.spark.mllib.linalg.distributed.{IndexedRow, IndexedRowMatrix, RowMatrix}
val rows: RDD[IndexedRow] = ... // an RDD of indexed rows// Create an IndexedRowMatrix from an RDD[IndexedRow].
val mat: IndexedRowMatrix = new IndexedRowMatrix(rows)// Get its size.
val m = mat.numRows()
val n = mat.numCols()// Drop its row indices.
val rowMat: RowMatrix = mat.toRowMatrix(

)

5.3三元组矩阵(CoordinateMatrix)

一个 CoordinateMatrix 是一个分布式矩阵,其实体集合是一个 RDD。每一个实体是一个(i: Long, j: Long, value: Double)三元组,其中 i 代表行索引,j 代表列索引,value 代表实体的值。只有当矩阵的行和列都很巨大,并且矩阵很稀疏时才使用 CoordinateMatrix。

一个 CoordinateMatrix可从一个RDD[MatrixEntry]实例创建,这里的 MatrixEntry是 (Long, Long, Double) 的 封 装 类 。 通 过 调 用 toIndexedRowMatrix 可 以 将 一 个CoordinateMatrix转变为一个IndexedRowMatrix(但其行是稀疏的)。目前暂不支持其他计算操作。

import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry}
val entries: RDD[MatrixEntry] = ... // an RDD of matrix entries// Create a CoordinateMatrix from an RDD[MatrixEntry].
val mat: CoordinateMatrix = new CoordinateMatrix(entries)// Get its size.
val m = mat.numRows()
val n = mat.numCols()// Convert it to an IndexRowMatrix whose rows are sparse vectors.
val indexedRowMatrix = mat.toIndexedRowMatrix()

Spark MLIB测试基本使用 

理解了vector 就是double[] 然后我们就好办了 只需要double[] 转为vector 或者加上标签的labelPoint 带入spark 的Mlib包的机器学习的类 就行了 下面我们看看kmean聚类使用。其他类似

static{System.loadLibrary(Core.NATIVE_LIBRARY_NAME);}final static FeatureDetector detector = FeatureDetector.create(FeatureDetector.ORB);//ORBfinal static DescriptorExtractor extractor = DescriptorExtractor.create(DescriptorExtractor.ORB);  //BRIEF
public static List<double[]> readFeatureByStream(DataInputStream open) throws Exception {MatOfKeyPoint keypoints=new MatOfKeyPoint();Mat mat=OpenCVUtil.bufferedImageToMat(ImageIO.read(open));Mat descriptors=new Mat();detector.detect(mat, keypoints);
//      List<KeyPoint> referenceKeypointsList =
//              keypoints.toList();extractor.compute(mat, keypoints, descriptors);int numPoints = (int) keypoints.rows();int descrpnum=(int) descriptors.rows();// double[][] descriptions = new double[numPoints][descrpnum];List<double[]> descriptions=Lists.newArrayList();System.out.println(numPoints+"=============="+descrpnum+"=================="+descriptors.rows()+"=================="+descriptors.cols());for (int i = 0; i < descriptors.rows(); i++) {int cols=descriptors.cols();double[] desc=new double[cols];for (int j = 0; j < cols; j++) {desc[j]=descriptors.get(i, j)[0];}//descriptions[i]=desc;descriptions.add(desc);}return descriptions;}
 SparkConf conf = new SparkConf().setAppName("indeximage").setMaster("local");JavaSparkContext context = new JavaSparkContext(conf);JavaRDD<File> imagefiles=context.parallelize(getFiles("C:/baidu/imagetest/"), 2);//.binaryFiles("C:/baidu/imagetest/*").coalesce(1);imagefiles.persist(StorageLevel.MEMORY_ONLY());JavaRDD<Vector> vectors=imagefiles.map(new Function<File, List<Vector>>() {
@Override
public List<Vector> call(File v1)
throws Exception {
try{
List<Vector> sample=Lists.newArrayList();
final List<double[]> fkeys =readFeatureByFile(v1);
final int[] indices = RandomData.getUniqueRandomInts((int) (fkeys.size() * 0.1f), 0,
fkeys.size());
for (int i : indices) {
sample.add(Vectors.dense(fkeys.get(i)));
}
return sample;
}catch(Exception e){
e.printStackTrace();
return null;
}
}
}).flatMap(new FlatMapFunction<List<Vector>, Vector>() { //这里多余的其实不需要这样写,我为了演示flatMap的用法
@Override
public Iterable<Vector> call(
List<Vector> t) throws Exception {
return t;
}
});//.repartition(1);vectors.persist(StorageLevel.MEMORY_ONLY());
int numClusters = 64;int numIterations = 1000;long startTime = System.nanoTime();
//    BisectingKMeans kmeans=new BisectingKMeans();
//    kmeans.setK(64);
//    kmeans.setMaxIterations(100);
//    kmeans.setMinDivisibleClusterSize(1.0);
//    BisectingKMeansModel clusters=kmeans.run(vectors.rdd());KMeansModel clusters = KMeans.train(vectors.rdd(), numClusters, numIterations);//KMeansModel clusters = KMeans.train(vectors.rdd(), numClusters, numIterations);System.out.println("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@");double WSSSE = clusters.computeCost(vectors.rdd());long endTime = System.nanoTime();System.out.println("Execution Time: " + (endTime - startTime)/1000000 + " ms");System.out.println("Within Set Sum of Squared Errors = " + WSSSE);
System.out.println("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$聚类成功¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥"+clusters.k()+"=============");
Vector[] vs=clusters.clusterCenters();

下面运行结果展示

这是我本地刚刚运行的结果。,。。spark很方便 跟storm 一样 本地 调试非常方便快捷。

转载于:https://my.oschina.net/yilian/blog/652191

Spark 云计算 ML 机器学习教程 以及 SPARK使用教程相关推荐

  1. Spark ML机器学习

    Spark提供了常用机器学习算法的实现, 封装于spark.ml和spark.mllib中. spark.mllib是基于RDD的机器学习库, spark.ml是基于DataFrame的机器学习库. ...

  2. spark 逻辑回归算法案例_黄美灵的Spark ML机器学习实战

    原标题:黄美灵的Spark ML机器学习实战 本课程主要讲解基于Spark 2.x的ML,ML是相比MLlib更高级的机器学习库,相比MLlib更加高效.快捷:ML实现了常用的机器学习,如:聚类.分类 ...

  3. 离线轻量级大数据平台Spark之MLib机器学习库概念学习

    Mlib机器学习库 1.1机器学习概念 机器学习有很多定义,倾向于下面这个定义.机器学习是对能通过经验自动改进的计算机算法的研究.机器学习依赖数据经验并评估和优化算法所运行出的模型.机器学习算法尝试根 ...

  4. 学习笔记Spark(九)—— Spark MLlib应用(1)—— 机器学习简介、Spark MLlib简介

    一.机器学习简介 1.1.机器学习概念 机器学习就是让机器能像人一样有学习.理解.认识的能力. 机器学习是一门人工智能的科学,该领域的主要研究对象是人工智能,特别是如何在经验学习中改善具体算法的性能. ...

  5. [机器学习] LightGBM on Spark (MMLSpark) 使用完全手册

    一 Spark上训练模型优势与劣势 (1)机器学习算法一般都有很多个步骤迭代计算的过程,机器学习的计算需要在多次迭代后获得足够小的误差或者足够收敛才会停止,迭代时如果使用一般的Hadoop分布式计算框 ...

  6. 离线轻量级大数据平台Spark之MLib机器学习库Word2Vec实例

    Word2Vecword2vec能将文本中出现的词向量化,可以在捕捉语境信息的同时压缩数据规模.Word2Vec实际上是两种不同的方法:Continuous Bag of Words (CBOW) 和 ...

  7. 离线轻量级大数据平台Spark之MLib机器学习库TF-IDF实例

    TF-IDF(termfrequency–inverse document frequency)是TF-IDF是一种统计方法,用以评估一字词对于一个文件集或一个语料库中的其中一份文件的重要程度.字词的 ...

  8. 离线轻量级大数据平台Spark之MLib机器学习协同过滤ALS实例

    1.协同过滤 协同过滤(Collaborative Filtering,简称CF,WIKI上的定义是:简单来说是利用某个兴趣相投.拥有共同经验之群体的喜好来推荐感兴趣的资讯给使用者,个人透过合作的机制 ...

  9. [机器学习] XGBoost on Spark 分布式使用完全手册

    一 XGBoost分布式概述 在XGBoost设计之初,就考虑了分布式的实现.树模型最重要的一个问题即是分割点的确定,XGBoost在单机的环境中,数据全部load进内存,feature已经按照值的大 ...

  10. 【极简spark教程】spark聚合函数

    聚合函数分为两类,一种是spark内置的常用聚合函数,一种是用户自定义聚合函数 UDAF 不带类型的UDAF[较常用] 继承UserDefinedAggregateFunction 定义输入数据的sc ...

最新文章

  1. 浅析@Deprecated,调用方法时出现横线划掉样式
  2. 运维自动化之基于python语言的文字界面的运维管理软件
  3. 当你拼命挣死工资时,他们已抢占2018年最火爆高科技赚钱项目......
  4. HashSet中实现不插入重复的元素
  5. Git学习(4)基本操作
  6. 最短寻道时间算法c语言,如果北京到上海有千亿条路,寻找最短路径用C语言编程用枚举法没效率,应该用什么算法才能高效解决它?...
  7. 阿里云边缘计算三年,都为开发者带来了什么?
  8. 解决vue在ie9中的兼容问题
  9. notepad++正则表达式使用
  10. java 按行读取txt文件并存入数组
  11. 数据结构1800试题(第3章)
  12. c语言题库及答案(选择题,C语言题库及答案(选择题).doc
  13. ZigBee-CC2530单片机 - 4路硬件定时器PWM输出
  14. 第一次开卡SSD硬盘成功,主控为SM2258XT(附软件)
  15. pdf批量添加图章_还没找到适合自己的PDF阅读器吗,它来了
  16. 重庆大学计算机学院专硕分析,重庆大学电子信息(专硕)专业考研难度分析-专业排名-难度大小...
  17. OSChina 周日乱弹 —— 会后空翻算个屁,咱这卖艺卖身吃鱼干
  18. 一键制作低多边形海报效果教程
  19. 使用EDU邮箱申请JetBrains学生包免费使用一年JetBrains全家桶
  20. k8s之滚动更新、金丝雀发布、蓝绿发布

热门文章

  1. c语言可以发现注释错误,编译时可以发现注释中的错误_c语言中不允许使用
  2. python判断丑数_LeetCode-python 264.丑数 II
  3. python中的换行符是哪个键_Python换行符问题:\r\n还是\n?
  4. linux环境下,Tomcat详细部署步骤
  5. 计算机辅助设计与制造考试重点,计算机辅助设计与制造复习内容
  6. 设计一个服务器资源管理系统,基于虚拟化技术的服务器资源管理系统的设计与实现.pdf...
  7. 怎么使用systemctl启动rabbitmq_光纤激光切割机已经很久没有使用了。再次重新启动它,该怎么办?...
  8. 深度学习图片卷积输出大小计算公式
  9. _inflateEnd, referenced from _inflateInit_错误,
  10. 基于IHttpAsyncHandler的TCP收发器