1、 背景

由于MRv1在扩展性、可靠性、资源利用率和多框架等方面存在明显的不足,在Hadoop MRv2中引入了资源管理和调度系统YARN。YARN是 Hadoop MRv2计算机框架中构建的一个独立的、通用的资源管理系统可为上层应用提供统一的资源管理和调度,它的引入为集群在利用率资源统一管理和数据共享等方面带来了巨大好处。主要体现在以下几个方面:

(1)资源利用率大大提高。一种计算框架一个集群,往往会由于应用程序数量和资源需求的不均衡性,使得在某段时间有些计算框架集群资源紧张,而另外一些集群资源空闲。共享集群模式则通过多种框架共享资源,使得集群中的资源得到更加充分的利用;

(2)运维成本大大降低。共享集群模式使得少数管理员就可以完成多个框架的统一管理;

(3)共享集群的模式也让多种框架共享数据和硬件资源更为方便。

2、 产品介绍

巨杉数据库SequoiaDB是一款分布式非关系型文档数据库,可以被用来存取海量非关系型的数据,其底层主要基于分布式,高可用,高性能与动态数据类型设计,它兼顾了关系型数据库中众多的优秀设计:如索引、动态查询和更新等,同时以文档记录为基础更好地处理了动态灵活的数据类型。并且为了用户能够使用常见的分布式计算框架,SequoiaDB可以和常见分布式计算框架如Spark、Hadoop、HBase进行整合。本文主要讲解SequoiaDB与Spark、YARN的整合以及通过一个案例来演示MapReduce分析存储在SequoiaDB中的业务数据。

3、 环境搭建

3.1、 服务器分布

服务器

服务名称

192.168.1.46

192.168.1.47

192.168.1.48

NameNode、DataNode、

3.2、 软件配置

操作系统:RedHat6.5

JDK版本:1.7.0_80 64位

Scala版本:

Hadoop版本:2.7.2

Spark版本:2.0

SequoiaDB版本:2.0

3.3、 安装步骤

1、JDK安装

tar -xvf jdk-7u45-linux-x64.tar.gz –C /usr/localcd /usr/local ln -s jdk1.7.0_45 jdk

配置环境变量

vim ~/.bash_profile export JAVA_HOME=/usr/local/jdk export CLASS_PATH=$JAVA_HOME/lib:$JAVA_HOME/jre/lib export PATH=$PATH:$JAVA_HOME/binsource /etc/profile 

2、Scala安装

tar -xvf scala-2.11.8.tgz –C /usr/localcd /usr/local ln -s scala-2.11.8 scala

配置环境变量

vim ~/.bash_profile export SCALA_HOME=/usr/local/scala export PATH=$PATH:$SCALA_HOME/bin 

3、修改主机hosts文件配置

在每台主机上修改host文件

vim /etc/hosts192.168.1.46 node01192.168.1.47 node02192.168.1.48  master

4、 SSH免密钥登录

在master节点中执行ssh-keygen按回车键

cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys 

将master节点中的授权文件authorized_keys传输到slave节点中

scp ~/.ssh/id_rsa.pub root@master:~/.ssh/

在slave节点中执行

cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys

在slave节点中验证SSH免密钥登录

ssh master

5、Hadoop集群安装

拷贝hadoop文件hadoop-2.7.2.tar.gz到/opt目录中

解压hadoop安装包

tar –xvf hadoop-2.7.2.tar.gzmv hadoop-2.7.2 /opt/cloud/hadoop

创建hadoop数据存储及临时目录

mkdir –p /opt/hadoop/datamkdir –p /opt/hadoop/tmp

配置Hadoop jdk环境变量

vim hadoop-env.shexport JAVA_HOME=/usr/local/jdk

编辑core.xml文件

<configuration><property><name>fs.defaultFS</name><value>hdfs://master:9000</value></property><property><name>hadoop.tmp.dir</name><value>/opt/data/tmp</value></property><property><name>io.file.buffer.size</name><value>4096</value></property></configuration>

编辑mapred-site.xml

<configuration><property><name>mapreduce.framework.name</name><value>yarn</value></property><property><name>mapreduce.jobtracker.http.address</name><value> master:50030</value></property><property><name>mapreduce.jobhistory.address</name><value> master:10020</value></property><property><name>mapreduce.jobhistory.webapp.address</name><value>master:19888</value></property></configuration>

编辑hdfs-site.xml

<configuration><property><name>dfs.nameservices</name><value>master</value></property><property><name>dfs.namenode.secondary.http-address</name><value> master:50090</value></property><property><name>dfs.namenode.name.dir</name><value>file:///opt/hadoop/data/name</value></property><property><name>dfs.datanode.data.dir</name><value>file:///opt/hadoop/data</value></property><property><name>dfs.replication</name><value>3</value></property><property><name>dfs.webhdfs.enabled</name><value>true</value></property></configuration>

编辑yarn-site.xml

<configuration><property><name>yarn.nodemanager.aux-services</name><value>mapreduce_shuffle</value></property><property><name>yarn.resourcemanager.address</name><value> master:8032</value></property><property><name>yarn.resourcemanager.scheduler.address</name><value> master:8030</value></property><property><name>yarn.resourcemanager.resource-tracker.address</name><value> master:8031</value></property><property><name>yarn.resourcemanager.admin.address</name><value> master:8033</value></property><property><name>yarn.resourcemanager.webapp.address</name><value>master:8088</value></property><property><name>yarn.nodemanager.resource.memory-mb</name><value>12288</value></property><property><name>yarn.nodemanager.log-dirs</name><value>/opt/hadoop/tmp/userlogs</value></property></configuration>

启动Hadoop

首次启动集群时,做如下操作

进入到/opt/cloud/hadoop/bin目录中执行./hdfs namenode –format格式化

hdfs文件系统

进入到/opt/cloud/hadoop/sbin目录中执行./start-all.sh启动hadoop集群

6、安装Spark集群

拷贝Spark安装包到/opt目录中,解压

tar –xvf spark-2.0.0-bin-hadoop2.7.tgzmv spark-2.0.0-bin-hadoop2.7 /opt/cloud/spark

编辑spark-env.sh

vim spark-env.shJAVA_HOME="/usr/jdk1.7"SPARK_DRIVER_MEMORY="1g"SPARK_EXECUTOR_CORES=1SPARK_EXECUTOR_MEMORY="512m"SPARK_MASTER_PORT="7077"SPARK_MASTER_WEBUI_PORT="8070"SPARK_CLASSPATH="/opt/cloud/spark/jars/sequoiadb.jar:/opt/cloud/spark/jars/spark-sequoiadb_2.11-2.6.0.jar"SPARK_MASTER_IP="node03"SPARK_WORKER_MEMORY="712m"SPARK_WORKER_CORES=1SPARK_WORKER_INSTANCES=1SPARK_WORKER_DIR="/opt/data/spark/work"SPARK_LOCAL_DIRS="/opt/data/spark/tmp"HADOOP_HOME="/opt/cloud/hadoop"HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop

编辑 slaves

node02node03

启动spark集群

进入到目录/opt/cloud/spark/sbin目录中

./start-all.sh

Spark成功启动后截图如下:

7、Spark Yarn连接SequoiaDB

在SequoiaDB中创建集合空间、集合

db.createCS('poc');db.poc.createCL('test');

进入到spark安装目录bin中,执行./spark-sql –master yarn启动spark sql交互界面

创建表,映射到上述poc集合空间中test集合

CREATE TABLE `test` (`id` INT, `name` STRING)USING com.sequoiadb.sparkOPTIONS (`collection` 'test',`host` 'node02:11810,node03:11810',`serialization.format` '1',`collectionspace` 'poc');

查询表test数据,执行:

Select * from test;

进入到yarn管理页面查看spark任务

5、 案例演示

为了配合司法部门的执法和银行内部的风险监管,部分商业银行对于存取款业务定制了相关预警方案,本案例以个人存取款业务高频交易来讲述MapReduce如何分析SequoiaDB中的个人交易明细数据。

具体场景为:分析同一实体柜员办理,1小时内同一账户连续3笔以上支取类金额的交易账户及明细。

本演示案例采用Hadoop Map Reduce实现,开发语言为Java语言。整个测试程序分为两个部分Map算法和Reduce算法。演示程序中Map算法负责将同一个账号的所有对应交易明细归并在一起并输出给Reduce端,Reduce端根据Map算法的结果运算具体的业务场景,最后将运算结果写入到SequoiaDB中。

具体架构如下:

Reduce端具体算法流程如下:

Map端算法代码如下:

static class TMapper extends Mapper<Object, BSONWritable,Text,BSONWritable>{@Overrideprotected void map(Object key, BSONWritable value, Context context)throws IOException, InterruptedException {BSONObject obj = value.getBson();String acct_no=(String) obj.get("ACCT_NO");context.write(new Text(acct_no), value);}}

Reduce端算法代码如下:

static class TReducer extends Reducer<Text,BSONWritable,NullWritable,NullWritable>{private static String pattern = "yyyy-MM-dd HH:mm:ss";private DateFormat df = new SimpleDateFormat(pattern);private static int tradeNum1 = 3;private static int tradeTime1 = 3600;private static int tradeNum2 = 2;private static int tradeTime2 = 1800;private static int tradeAll = 100000;private Sequoiadb sdb = null;private CollectionSpace cs = null;private DBCollection cl_1 = null;private DBCollection cl_2 = null;private static String CS_NAME="";private static String CL_NAME_1="";private static String CL_NAME_2="";public TReducer(){if (null == sdb) {sdb = ConnectionPool.getInstance().getConnection();}if (sdb.isCollectionSpaceExist(CS_NAME)) {cs = sdb.getCollectionSpace(CS_NAME);} else {throw new BaseException("集合空间" + CS_NAME + "不存在!");}if (null == cs) {throw new BaseException("集合空间不能为null!");} else {this.cl_1 = cs.getCollection(CL_NAME_1);}if (null == cs) {throw new BaseException("集合空间不能为null!");} else {this.cl_2 = cs.getCollection(CL_NAME_2);}}@Overrideprotected void reduce(Text key, Iterable<BSONWritable> values,Context context)throws IOException, InterruptedException{Iterator<BSONWritable> iterator=values.iterator();long sum=0;List<BSONWritable> oldList = new ArrayList<BSONWritable>();while(iterator.hasNext()){BSONWritable bsonWritable = iterator.next();oldList.add(bsonWritable);}//对values进行排序,排序字段为TRN_TIME(交易时间)Collections.sort(oldList, new Comparator<BSONWritable>() {@Overridepublic int compare(BSONWritable o1, BSONWritable o2) {String trn_time1 = (String)o1.getBson().get("TRN_TIME");String trn_time2 = (String)o2.getBson().get("TRN_TIME");return trn_time2.compareTo(trn_time1);}});Map<String,BSONWritable> result = new HashMap<String,BSONWritable>();if(oldList != null && oldList.size() > 0){//记录同一账户满足条件的笔数Map<String,BSONWritable> tempMap = new HashMap<String,BSONWritable>();for(int i=0;i<oldList.size()-1;i++){BSONWritable bSONWritable1 = oldList.get(i);//交易代码String trn_cd = (String)bSONWritable1.getBson().get("TRN_CD");if(trn_cd.equals("000045") || trn_cd.equals("001045") || trn_cd.equals("021031") || trn_cd.equals("020031") || trn_cd.equals("001060") || trn_cd.equals("000060")){//交易柜员String tran_teller_no1 = (String)bSONWritable1.getBson().get("TRAN_TELLER_NO");//流水号String jrnl_no = (String)bSONWritable1.getBson().get("JRNL_NO");//交易日期String trn_date1 = (String)bSONWritable1.getBson().get("TRN_DATE");//交易时间String trn_time1 = (String)bSONWritable1.getBson().get("TRN_TIME");Date bigDate = null;try {bigDate = df.parse(trn_date1+" "+trn_time1);} catch (ParseException e) {e.printStackTrace();}tempMap.put(jrnl_no,bSONWritable1);for(int j=i+1;j<oldList.size();j++){BSONWritable bSONWritable2 = oldList.get(j);//交易代码String trn_cd1 = (String)bSONWritable2.getBson().get("TRN_CD");if(trn_cd1.equals("000045") || trn_cd1.equals("001045") || trn_cd1.equals("021031") || trn_cd1.equals("020031") || trn_cd1.equals("001060") || trn_cd1.equals("000060")){//交易柜员String tran_teller_no2 = (String)bSONWritable2.getBson().get("TRAN_TELLER_NO");//流水号String jrnl_no2 = (String)bSONWritable2.getBson().get("JRNL_NO");//交易日期String trn_date2 = (String)bSONWritable2.getBson().get("TRN_DATE");//交易时间String trn_time2 = (String)bSONWritable2.getBson().get("TRN_TIME");Date smallDate = null;try {smallDate = df.parse(trn_date1+" "+trn_time1);} catch (ParseException e) {e.printStackTrace();}//判断是否是同一实体{交易柜员}办理if(!tran_teller_no1.equals(tran_teller_no2)){continue;}//判断{交易日期}{交易时间}是否是[1小时]内if((bigDate.getTime()-smallDate.getTime())/1000 > tradeTime1){break;}tempMap.put(jrnl_no2,bSONWritable2);}else{ //end if TRN_CD1.equals("000045")continue;}}//end forif(tempMap.size() >= tradeNum1){result.putAll(tempMap);tempMap.clear();}}else{continue;}//end if ||}//end for}Map<String,BSONWritable> result2 = new HashMap<String,BSONWritable>();List<BSONObject> cl_1_list = new ArrayList<BSONObject>();//结果写入sdbIterator iter1 = result.keySet().iterator();while(iter1.hasNext()){String keyValue = (String)iter1.next();BSONWritable resultValue = result.get(keyValue);cl_1_list.add(resultValue.getBson());cl_1.insert(resultValue.getBson());}cl_1.bulkInsert(cl_1_list, DBCollection.FLG_INSERT_CONTONDUP);cl_1_list = null;List<BSONObject> cl_2_list = new ArrayList<BSONObject>();context.write(null,null);}}

SequoiaDB巨杉数据库2.6最新版下载
SequoiaDB巨杉数据库技术博客
SequoiaDB巨杉数据库社区

转载于:https://my.oschina.net/u/2444165/blog/800856

SequoiaDB Spark Yarn部署及案例演示相关推荐

  1. 【原创 HadoopSpark 动手实践 6】Spark 编程实例与案例演示

     [原创 Hadoop&Spark 动手实践 6]Spark 编程实例与案例演示 Spark 编程实例和简易电影分析系统的编写 目标: 1. 掌握理论:了解Spark编程的理论基础 2. 搭建 ...

  2. Spark的Yarn模式及其案例

    目录 基本概念 Yarn模式搭建 1. 解压缩文件 2.修改配置文件 启动集群 测试Spark中examples案例 1. 提交应用 2.Web 页面查看日志 配置历史服务器 1.具体步骤 2.重新提 ...

  3. Spark 安装部署(Local+Standalone+Yarn等)

    概述 Spark作为一个数据处理框架和计算引擎,被设计在所有常见的集群环境中运行, 在国内工作中主流的环境为Yarn,不过逐渐容器式环境也慢慢流行起来.接下来,我们就分别看看不同环境下Spark的运行 ...

  4. spark on yarn 部署问题

    spark on yarn 部署报:java.io.IOException: Resource file:/usr/local/spark-1.6.3-bin-hadoop2.6/lib/spark- ...

  5. DSS部署-11、Spark on Yarn部署

    文章目录 第七部分 Spark on Yarn部署 相关配置 操作记录如下 spark-sql -e "show databases" 第七部分 Spark on Yarn部署 相 ...

  6. 关于Spark的部署yarn模式

    一.Yarn Client的工作流程图 Spark Yarn Client向Yarn的ResourceManager申请启动ApplicationMaster.同时在SparkContext初始化将D ...

  7. 【学会了学会了】Flink官网翻译——Standalone以及YARN部署

    Standalone 开始(Getting Started) This Getting Started section guides you through the local setup (on o ...

  8. 用两个使用Caffe的小项目案例演示迁移学习的实用性

    近年来随着深度学习的急剧升温,不管是学术界还是工业界都把大量资源投入了深度学习.作为一个普通的工程师或者程序员,也想对机器学习,尤其是深度学习有所了解,应当如何入手?最好的回答当然是"get ...

  9. Spark 独立部署模式

    2019独角兽企业重金招聘Python工程师标准>>> Spark 独立部署模式 博客分类: spark 除了在 Mesos 或 YARN 集群上运行之外, Spark 还提供一个简 ...

最新文章

  1. python标准化输出_Python设定模板及标准化输出,输入
  2. java大公司后端多线程面试题最强分享
  3. Http请求报头设置(C#)
  4. php 快速路由,基于FastRoute的快速路由(支持RESTful)
  5. Android 之神 Jake Wharton 从 Square 离职!
  6. 谷歌浏览器的一个新特点—关于获取iframe的parent对象
  7. Altium Designer 20相关操作及使用技巧
  8. 两个小故事告诉你静下来的力量
  9. Google正确搜索方法
  10. 免费开源网站系统有哪些
  11. 法国老医生和他“驱动”的摄像头们
  12. 清华姚班陈丹琦获斯隆奖!博士毕业论文是近十年最热之一
  13. 三分钟集成 TapTap 防沉迷 SDK(Unity 版)
  14. (ICML-2020)通过超球面的对齐和均匀性理解对比表示学习(一)
  15. 高中计算机专业满分多少,高中各科满分是多少
  16. 文言文编程火了,可我完全学不懂
  17. 爱情公寓原版片头AE模板
  18. 看门狗喂狗实验(有问题)
  19. Visual Basic 6.0 中文企业版
  20. 海天讲座(四)最优传输理论

热门文章

  1. Web版RSS阅读器(三)——解析在线Rss订阅
  2. 数据结构与算法(3)栈
  3. reportviewer控件mysql_真正的全动态报表:ReportViewer+RDLC
  4. FUSION / LDV LIDAR分析和可视化软件如何使用?| Fusion如何使用?使用教程及案例
  5. FPGA:IIC验证镁光EEPROM仿真模型(纯Verilog)
  6. 冤大头?NBA球星库里花了 116 万买了一只猴头像,到底咋回事?
  7. Unity Shader学习记录(15) —— Unity的光源类型
  8. 字符串中的十六进制字符如何转换成十六进制数
  9. java.lang.classcastexception怎么解决_java.lang.ClassCastException怎么解决
  10. STL vector成员函数详解