需求

客户希望通过spark来分析二进制文件中0和1的数量以及占比。如果要分析的是目录,则针对目录下的每个文件单独进行分析。分析后的结果保存与被分析文件同名的日志文件中,内容包括0和1字符的数量与占比。

要求:如果值换算为二进制不足八位,则需要在左侧填充0。

可以在linux下查看二进制文件的内容。命令:

xxd –b –c 1 filename

-c 1是显示1列1个字符,-b是显示二进制

Python版本

代码

# This Python file uses the following encoding: utf-8from __future__ import division
import os
import time
import sys
from pyspark import SparkConf, SparkContextAPP_NAME = "Load Bin Files"def main(spark_context, path):file_paths = fetch_files(path)for file_path in file_paths:outputs = analysis_file_content(spark_context, path + "/" + file_path)print_outputs(outputs)save_outputs(file_path, outputs)def fetch_files(path):if os.path.isfile(path):return [path]return os.listdir(path)def analysis_file_content(spark_context, file_path):data = spark_context.binaryRecords(file_path, 1)records = data.flatMap(lambda d: list(bin(ord(d)).replace('0b', '').zfill(8)))mapped_with_key = records.map(lambda d: ('0', 1) if d == '0' else ('1', 1))result = mapped_with_key.reduceByKey(lambda x, y: x + y)total = result.map(lambda r: r[1]).sum()return result.map(lambda r: format_outputs(r, total)).collect()def format_outputs(value_with_key, total):tu = (value_with_key[0], value_with_key[1], value_with_key[1] / total * 100)return "字符{0}的数量为{1}, 占比为{2:.2f}%".format(*tu)def print_outputs(outputs):for output in outputs:print outputdef save_outputs(file_path, outputs):result_dir = "result"if not os.path.exists(result_dir):os.mkdir(result_dir)output_file_name = "result/" + file_name_with_extension(file_path) + ".output"with open(output_file_name, "a") as result_file:for output in outputs:result_file.write(output + "\n")result_file.write("统计于{0}\n\n".format(format_logging_time()))def format_logging_time():return time.strftime('%Y-%m-%d %H:%m:%s', time.localtime(time.time()))def file_name_with_extension(path):last_index = path.rfind("/") + 1length = len(path)return path[last_index:length]if __name__ == "__main__":conf = SparkConf().setMaster("local[*]")conf = conf.setAppName(APP_NAME)sc = SparkContext(conf=conf)if len(sys.argv) != 2:print("请输入正确的文件或目录路径")else:main(sc, sys.argv[1])

核心逻辑都在analysis_file_content方法中。

运行

python是脚本文件,无需编译。不过运行的前提是要安装好pyspark。运行命令为:

./bin/spark-submit /Users/zhangyi/PycharmProjects/spark_binary_files_demo/parse_files_demo.py "files"

遇到的坑

开发环境的问题

要在spark下使用python,需要事先使用pip安装pyspark。结果安装总是失败。python的第三方库地址是https://pypi.python.org/simple/,在国内访问很慢。通过搜索问题,许多文章提到了国内的镜像库,例如豆瓣的库,结果安装时都提示找不到pyspark。

查看安装错误原因,并非不能访问该库,仅仅是访问较慢,下载了不到8%的时候就提示下载失败。这实际上是连接超时的原因。因而可以修改连接超时值。可以在~/.pip/pip.conf下增加:

[global]
timeout = 6000

虽然安装依然缓慢,但至少能保证pyspark安装完毕。但是在安装py4j时,又提示如下错误信息(安装环境为mac):

OSError: [Errno 1] Operation not permitted: '/System/Library/Frameworks/Python.framework/Versions/2.7/share'

即使这个安装方式是采用sudo,且在管理员身份下安装,仍然提示该错误。解决办法是执行如下安装:

pip install --upgrade pipsudo pip install numpy --upgrade --ignore-installedsudo pip install scipy --upgrade --ignore-installedsudo pip install scikit-learn --upgrade --ignore-installed

然后再重新执行sudo pip install pyspark,安装正确。

字符编码的坑

在提示信息以及最后分析的结果中都包含了中文。运行代码时,会提示如下错误信息:

SyntaxError: Non-ASCII character '\xe5' in file /Users/zhangyi/PycharmProjects/spark_binary_files_demo/parse_files_demo.py on line 36, but no encoding declared; see http://python.org/dev/peps/pep-0263/ for details

需要在代码文件的首行添加如下编码声明:

# This Python file uses the following encoding: utf-8

SparkConf的坑

初始化SparkContext的代码如下所示:

conf = SparkConf().setMaster("local[*]")
conf = conf.setAppName(APP_NAME)
sc = SparkContext(conf)

结果报告运行错误:

Error initializing SparkContext.
org.apache.spark.SparkException: Could not parse Master URL: '<pyspark.conf.SparkConf object at 0x106666390>'

根据错误提示,以为是Master的设置有问题,实际上是实例化SparkContext有问题。阅读代码,发现它的构造函数声明如下所示:

    def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,environment=None, batchSize=0, serializer=PickleSerializer(), conf=None,gateway=None, jsc=None, profiler_cls=BasicProfiler):

而前面的代码仅仅是简单的将conf传递给SparkContext构造函数,这就会导致Spark会将conf看做是master参数的值,即默认为第一个参数。所以这里要带名参数:

sc = SparkContext(conf = conf)

sys.argv的坑

我需要在使用spark-submit命令执行python脚本文件时,传入我需要分析的文件路径。与scala和java不同。scala的main函数参数argv实际上可以接受命令行传来的参数。python不能这样,只能使用sys模块来接收命令行参数,即sys.argv

argv是一个list类型,当我们通过sys.argv获取传递进来的参数值时,一定要明白它会默认将spark-submit后要执行的python脚本文件路径作为第一个参数,而之后的参数则放在第二个。例如命令如下:

./bin/spark-submit /Users/zhangyi/PycharmProjects/spark_binary_files_demo/parse_files_demo.py "files"

则:

  • argv[0]: /Users/zhangyi/PycharmProjects/spark_binary_files_demo/parse_files_demo.py
  • argv[1]: files

因此,我需要获得files文件夹名,就应该通过argv[1]来获得。

此外,由于argv是一个list,没有size属性,而应该通过len()方法来获得它的长度,且期待的长度为2。

整数参与除法的坑

在python 2.7中,如果直接对整数执行除法,结果为去掉小数。因此4 / 5得到的结果却是0。在python 3中,这种运算会自动转型为浮点型。

要解决这个问题,最简单的办法是导入一个现成的模块:

from __future__ import division

注意:这个import的声明应该放在所有import声明前面。

Scala版本

代码

package bigdata.demoimport java.io.File
import java.text.SimpleDateFormat
import java.util.Calendarimport com.google.common.io.{Files => GoogleFiles}
import org.apache.commons.io.Charsets
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object Main {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("Binary Files").setMaster("local[*]")val sc = new SparkContext(conf)if (args.size != 1) {println("请输入正确的文件或目录路径")return}def analyseFileContent(filePath: String): RDD[String] = {val data = sc.binaryRecords(filePath, 1)val records = data.flatMap(x => x.flatMap(x => toBinaryStr(byteToShort(x)).toCharArray))val mappedWithKey = records.map(i => if (i == '0') ('0', 1L) else ('1', 1L))val result = mappedWithKey.reduceByKey(_ + _)val sum = result.map(_._2).sum()result.map { case (key, count) => formatOutput(key, count, sum)}}val path = args.headval filePaths = fetchFiles(path)filePaths.par.foreach { filePath =>val outputs = analyseFileContent(filePath)printOutputs(outputs)saveOutputs(filePath, outputs)}}private def byteToShort(b: Byte): Short =if (b < 0) (b + 256).toShort else b.toShortprivate def toBinaryStr(i: Short, digits: Int = 8): String =String.format("%" + digits + "s", i.toBinaryString).replace(' ', '0')private def printOutputs(outputs: RDD[String]): Unit = {outputs.foreach(println)}private def saveOutputs(filePath: String, outputs: RDD[String]): Unit = {val resultDir = new File("result")if (!resultDir.exists()) resultDir.mkdir()val resultFile = new File("result/" + getFileNameWithExtension(filePath) + ".output")outputs.foreach(line => GoogleFiles.append(line + "\n", resultFile, Charsets.UTF_8))GoogleFiles.append(s"统计于:${formatLoggingTime()}\n\n", resultFile, Charsets.UTF_8)}private def formatLoggingTime(): String = {val formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")formatter.format(Calendar.getInstance().getTime)}private def getFileNameWithExtension(filePath: String): String = {filePath.substring(filePath.lastIndexOf("/") + 1)}private def fetchFiles(path: String): List[String] = {val fileOrDirectory = new File(path)fileOrDirectory.isFile match {case true => List(path)case false => fileOrDirectory.listFiles().filter(_.isFile).map(_.getPath).toList}}private def formatPercent(number: Double): String = {val percent = "%1.2f" format number * 100s"${percent}%"}private def formatOutput(key: Char, count: Long, sum: Double): String = {s"字符${key}的数量为${count}, 占比为${formatPercent(count/sum)}"}
}

运行

通过sbt对代码进行编译、打包后,生成jar文件。然后在spark主目录下运行:

$SPARK_HOME/bin/spark-submit --class bigdata.demo.Main --master spark://<ip>  $SPARK_HOME/jars/binaryfilesstastistics_2.11-1.0.jar file:///share/spark-2.2.0-bin-hadoop2.7/derby.log

最后的参数"file:///share/spark-2.2.0-bin-hadoop2.7/derby.log"就是main函数接收的参数,即要分析的文件目录。如果为本地目录,需要指定文件协议file://,如果为HDFS目录,则指定协议hdfs://

遇到的坑

byte类型的值

在Scala中,Byte类型为8位有符号补码整数。数值区间为 -128 到 127。倘若二进制值为11111111,通过SparkContext的binaryRecords()方法读进Byte数据后,其值为-1,而非255。原因就是补码的缘故。如果十进制为128,转换为Byte类型后,值为-128。

而对于-1,如果执行toBinaryString(),则得到的字符串为"11111111111111111111111111111111",而非我们期待的"11111111"。如下图所示:

执行结果

针对八位的二进制数值,可以编写一个方法,将Byte类型转为Short类型,然后再调用toBinaryString()方法转换为对应的二进制字符串。

  private def byteToShort(b: Byte): Short =if (b < 0) (b + 256).toShort else b.toShort

而对于不足八位的二进制数值,如果直接调用toBinaryString()方法,则二进制字符串将不到八位。可以利用String的format进行格式化:

  private def toBinaryStr(i: Short, digits: Int = 8): String =String.format("%" + digits + "s", i.toBinaryString).replace(' ', '0')

当然,可以将这两个方法定义为Byte与Short的隐式方法。

使用Spark读取并分析二进制文件相关推荐

  1. spark 源码分析 Blockmanager

    原文链接 参考, Spark源码分析之-Storage模块 对于storage, 为何Spark需要storage模块?为了cache RDD  Spark的特点就是可以将RDD cache在memo ...

  2. Spark SQL 源代码分析系列

    从决定写Spark SQL文章的源代码分析,到现在一个月的时间,一个又一个几乎相同的结束很快,在这里也做了一个综合指数,方便阅读,下面是读取顺序 :) 第一章 Spark SQL源代码分析之核心流程 ...

  3. spark 源码分析之十八 -- Spark存储体系剖析

    本篇文章主要剖析BlockManager相关的类以及总结Spark底层存储体系. 总述 先看 BlockManager相关类之间的关系如下: 我们从NettyRpcEnv 开始,做一下简单说明. Ne ...

  4. Spark源码分析之Sort-Based Shuffle读写流程

    一 概述 我们知道Spark Shuffle机制总共有三种: # 未优化的Hash Shuffle:每一个ShuffleMapTask都会为每一个ReducerTask创建一个单独的文件,总的文件数是 ...

  5. mongodb数据导入hbase,spark读取hbase数据分析

    为什么80%的码农都做不了架构师?>>>    使用mavn管理相关依赖包pom.xml <project xmlns="http://maven.apache.or ...

  6. Hadoop 和 spark 读取多个文件通配符规则(正则表达式)joe

    最近在公司需要计算手机信令数据 但是每次spark读取文件的时候都是把当天24小时从头到尾读取一遍 非常耗时,在一步操作中处理批量文件,这个要求很常见.举例来说,处理日志的MapReduce作业可能会 ...

  7. 【大数据基础】基于信用卡逾期数据的Spark数据处理与分析

    https://dblab.xmu.edu.cn/blog/2707/ 实验过程 数据预处理 本次实验数据集来自和鲸社区的信用卡评分模型构建数据,以数据集cs-training.csv为分析主体,其中 ...

  8. Spark学生上网分析(清洗,上网情况分析,用网情况分析)

    一. 1.需求: (1).数据清洗     1)读取sexDictFile.csv文件,把读出的数据封装成性别Map     2)spark读取netClean.csv文件,写一个过滤脏数据的方法,用 ...

  9. Spark 源码分析之ShuffleMapTask内存数据Spill和合并

    Spark 源码分析之ShuffleMapTask内存数据Spill和合并 更多资源分享 SPARK 源码分析技术分享(视频汇总套装视频): https://www.bilibili.com/vide ...

最新文章

  1. 2022-2028年中国异戊二烯橡胶产业竞争现状及发展规模预测报告
  2. FireDAC 报系统找不到指定的文件错误,解决之道。
  3. rcp errata
  4. Linux学习 LVM ***
  5. 【NOI2019】弹跳【二维线段树】【dijkstra】
  6. linux初学文档,51CTO博客-专业IT技术博客创作平台-技术成就梦想
  7. php mysql 分行执行,php执行mysql存储及执行脚本
  8. 2008 微软IT英雄人物获奖感言
  9. 抛开复杂的架构设计,MySQL优化思想基本都在这了
  10. TF400511: Your team has not defined any iterations to use as sprints
  11. 远程计算机桌面图标不见了怎么办,远程桌面图标不见了怎么办
  12. 高一计算机函数公式,高一函数公式汇总
  13. 学习制作FlappyBird时遇到的问题
  14. javaOOP -- 笔记
  15. 形容词,名词记忆(三):ment, ent后缀常用词
  16. 工程训练(第一章 关于劳动 )-江苏海洋大学-mooc 答案
  17. 基于STM32CubeMX的stm32f103c6t6液晶0.96OLED显示字母数字汉字图片显示
  18. 考华为云认证的必要条件、注意事项
  19. !!!RFID原理及应用期末复习总结!!!少走弯路,直接满绩!
  20. NCCL源码解析②:Bootstrap网络连接的建立

热门文章

  1. 照片批量重命名为拍摄日期
  2. 红蓝对抗之Windows内网渗透
  3. linux amd显卡使用情况查看
  4. csdn博客markdown编辑器下修改图片大小及文字颜色
  5. java农业银行面试题
  6. 转行IT行业怎么学习比较好
  7. [转]60个英文阅读网站强力推荐
  8. 学生表/教师表/课程表/成绩表常见SQL查询
  9. 基于Python AutoCAD ActiveX 二次开发,pyautocad应用技术
  10. go pear.php 下载,请注意!有人攻破了PEAR网站并篡改了go-pear.phar安装包