spark mongo java_java及spark2.X连接mongodb3.X单机或集群的方法(带认证及不带认证)...
首先,我们明确的是访问Mongos和访问单机Mongod并没有什么区别。接下来的方法都是既可以访问mongod又可以访问Mongos的。
另外,读作java写作scala,反正大家都看得懂......大概?
1、不带认证集群的连接方法(JAVAscala):
首先是创建连接的方法,我们先声明一个client,然后指定访问的DB和collection:
private lazy val mongo = new MongoClient("192.168.2.51", 27017)private lazy val db = mongo.getDatabase("test")private lazy val dbColl = db.getCollection("origin2")
然后我们读取数据:
import com.mongodb.client.model.Filters.{eq =>eqq}
val docs= dbColl.find(eqq("basiclabel.procedure", "second")).iterator()
额。。上面那段代码是带filter过滤的读取数据。首先Import com.mongodb.client.model.Filters.eq并把eq重命名为eqq,然后通过dbColl.find(Bson)方法读取指定数据。剩下的就是正常的迭代器的使用方法了,docs获取出来的数据是Iterator[Document]。
然后我们更新数据:
dbColl.updateOne(eqq("_id", x.get("_id")), set("segdata", fenduan(str, name)))
上面这段代码是说找到_id对应的数据,并将其中一个字段set为一个新的值,这个值可以为Document,String,Int,List等一系列数据结构。我这里fenduan方法返回的是一个Document,做了一层嵌套。
至于插入数据更为简单:
dbColl.insertOne(doc)
2、不带认证的spark读取方法(scala,理直气壮)
两种方式,其一是在创建sparksession的时候(SparkContext可以使用第二种方法,醒醒兄弟,2017年了),直接指定"spark.mongodb.input.uri"。然后使用正常的MongoSpark来读取数据。(pipeline里面是过滤条件,愿意尝试的各位可以自己试试filter下的其他方法)。使用rdd是因为rdd更适合进行map和flatmap等一系列精细的转换操作,如果只需要读数据,可以使用MongoSpark.read(spark)方法,直接获取DataFrameReader。
val spark =SparkSession.builder()
.master("spark://192.168.2.51:7077")
.config(new SparkConf().setJars(Array("hdfs://192.168.2.51:9000/mongolib/mongo-spark-connector_2.11-2.0.0.jar","hdfs://192.168.2.51:9000/mongolib/bson-3.4.2.jar","hdfs://192.168.2.51:9000/mongolib/mongo-java-driver-3.4.2.jar","hdfs://192.168.2.51:9000/mongolib/mongodb-driver-3.4.2.jar","hdfs://192.168.2.51:9000/mongolib/mongodb-driver-core-3.4.2.jar","hdfs://192.168.2.51:9000/mongolib/commons-io-2.5.jar","hdfs://192.168.2.51:9000/segwithorigin2.jar")))
.config("spark.cores.max", 80)
.config("spark.executor.cores", 16)
.config("spark.executor.memory", "32g")
.config("spark.mongodb.input.uri", "mongodb://192.168.2.51:27017/test.origin2")//.config("spark.mongodb.output.uri", "mongodb://192.168.12.161:27017/test.origin2")
.getOrCreate()
val rdd = MongoSpark.builder().sparkSession(spark).pipeline(Seq(`match`(eqq("basiclabel.procedure", "second")))).build.toRDD()
第二种方式也较为简单,创建一个ReadConfig,这个是connector提供的一个单例类,可以设置很多参数,例如(此时不必指定"spark.mongodb.input.uri"),如下所示是通过sparkcontext和通过sparksession两种方式读取数据的方法:
val readConfig =ReadConfig(Map("uri" -> "mongodb://192.168.2.48:27017/","database" -> "test","collection" -> "test"))
val r2=MongoSpark.load(spark, readConfig).rdd//val r2 = MongoSpark.load(spark.sparkContext, readConfig)
3、带认证的Java读取方法:
带认证的需要先创建一个MongoURI,在URI里把用户名,密码和认证库都指定清楚。这种方法通用性比较强,因为spark也这么用,如果使用其他方式认证要么是必须使用库等于认证库,要么是没有通用性。这种方法可以在admin认证然后去读test的数据,就很好。
//带认证的需要先创建一个MongoURI,在URI里把用户名,密码和认证库都指定清楚,至于为什么需要指定库建议看上一篇博客
val mongoURI = new MongoClientURI("mongodb://gaoze:gaolaoban@192.168.2.48:27017/?authSource=admin")//val mongoURI = new MongoClientURI("mongodb://192.168.2.48:27017/");
lazy val mongo = newMongoClient(mongoURI)private lazy val db = mongo.getDatabase("test")private lazy val dbColl = db.getCollection("test")
//然后和1一样
4、带认证的Spark读取方法:
同3一样,在URI里加入用户名密码和库就行了:
val spark =SparkSession.builder()
.master("spark://192.168.2.51:7077")
.config(new SparkConf().setJars(Array("hdfs://192.168.2.51:9000/mongolib/mongo-spark-connector_2.11-2.0.0.jar","hdfs://192.168.2.51:9000/mongolib/bson-3.4.2.jar","hdfs://192.168.2.51:9000/mongolib/mongo-java-driver-3.4.2.jar","hdfs://192.168.2.51:9000/mongolib/mongodb-driver-3.4.2.jar","hdfs://192.168.2.51:9000/mongolib/mongodb-driver-core-3.4.2.jar","hdfs://192.168.2.51:9000/mongolib/commons-io-2.5.jar","hdfs://192.168.2.51:9000/segwithorigin2.jar")))
.config("spark.cores.max", 80)
.config("spark.executor.cores", 16)
.config("spark.executor.memory", "32g")
//这里这个配置项指定了用户名gaoze,密码gaolaoban,认证库admin
.config("spark.mongodb.input.uri", "mongodb://gaoze:gaolaoban@192.168.2.51:27017/test.origin2?authSource=admin").getOrCreate()
val rdd= MongoSpark.builder().sparkSession(spark).pipeline(Seq(`match`(eqq("basiclabel.procedure", "second")))).build.toRDD()
或者:
//这里指定了用户名rw,密码1,认证库test
val readConfig =ReadConfig(Map("uri" -> "mongodb://rw:1@192.168.2.48:27017/?authSource=test","database" -> "test","collection" -> "test"))
val rdd = MongoSpark.builder().sparkSession(spark).readConfig(readConfig).build().toRDD()
//val r2 = MongoSpark.load(spark.sparkContext, readConfig)
spark mongo java_java及spark2.X连接mongodb3.X单机或集群的方法(带认证及不带认证)...相关推荐
- Phoenix连接安全模式下的HBase集群
Phoenix连接安全模式下的HBase集群 HBase集群开启安全模式(即启用kerberos认证)之后,用户无论是用HBase shell还是Phoenix去连接HBase都先需要通过kerber ...
- 本地连接服务器搭建的 Redis 集群
本地连接服务器搭建的 Redis 集群 在实际运行测试中,存在两个问题 安全组或防火墙开放端口 主要开放+10000端口. 如果要连接 Redis集群的应用服务不和 Redis集群在一个局域网下,会出 ...
- (七)使用jedis连接单机和集群(一步一个坑踩出来的辛酸泪)
环境准备: redis-4.0.9,最新版了 ruby:redis-x.x.x.gem 这个gem什么版本都行,我redis4用3.0.0的gem正常跑 jedis-2.9.0.jar,最新版 ...
- Mongodb3.4.4复制集群+分片配置文档
2019独角兽企业重金招聘Python工程师标准>>> 1. 复制 1.1. 复制简介 MongoDB复制是将数据同步在多个服务器的过程. 复制提供了数据的冗余备份,并在多个服务 ...
- Spark学习之第一个程序打包、提交任务到集群
1.免秘钥登录配置: ssh-keygen cd .ssh touch authorized_keys cat id_rsa.pub > authorized_keys chmod 600 au ...
- 使用Druid,C3P0连接池连接达梦主备集群
使用Druid,C3P0连接达梦数据库主备集群 导入连接池对应的驱动包,达梦的JDBC驱动包进行连接 连接池信息: Druid连接池版本:1.1.22 C3P0连接池版本:0.9.1 链接:https ...
- 启动spark集群的方法
- 在阿里云Serverless K8S集群上部署Spark任务并连接OSS(详细步骤)
在阿里云ASK集群上部署Spark任务并连接OSS 简介 ASK是阿里云的一个产品,属于Serverless Kubernetes 集群,这次实验是要在ASK集群上运行Spark计算任务(以WordC ...
- spark集群访问mysql_spark连接数据源以及在spark集群上跑
1.spark连接mysql import org.apache.spark.SparkConf import org.apache.spark.sql.{SaveMode, SparkSession ...
最新文章
- sql算术运算符_SQL运算符教程–按位,比较,算术和逻辑运算符查询示例
- 十八、二叉树遍历序列还原
- [设计模式] ------ 策略模式实战:java中替代if-else的大段逻辑
- c++ winpcap开发(1)
- LeetCode-116. 填充每个节点的下一个右侧节点指针
- lisp 提取字符串中的數字_Redis 数据结构之字符串的那些骚操作
- python设置一个初始为0的计数器_如何为python列表的每个元素实现一个计数器?...
- 为何大数据分析那么重要
- 偏光太阳镜测试图片软件,[专题]真假偏光太阳镜简单、实用辨别方法!
- 使用 snapseed p 图,图片局部黑白,简单实用!!
- Android studio下的DNK开发JNI详解流程
- 2020年裸辞的人,真的待业了一整年吗?
- PyTorch搭建CNN-LSTM混合模型实现多变量多步长时间序列预测(负荷预测)
- java修改mariadb数据_MariaDB更新数据
- html怎么把字做成动画效果,用纯CSS实现文字的动态效果
- Hive之——Hive分区(静态分区+动态分区)
- 【建议收藏】功率电子器件界面热阻和接触热阻是如何测量的?
- Webpack基础应用篇 -[9]管理资源(下)
- python透明色的代码_Matplotlib中的透明标记面颜色
- mysql英文怎么发音_mysql5.5安装教程:mysql下载与安装
热门文章
- 画直线_在鸡的面前画直线,鸡为什么会晕呢,西瓜视频带你揭秘
- 拉格朗日乘子法 KKT条件
- Boost Asio总结(16)例子
- 跨链(6)波卡XCMP跨链通信协议
- C++ Primer 5th笔记(10)chapter10 泛型算法 :write
- 2022年美国大学生数学建模竞赛O奖流程图
- 哈希表(散列表)知识点概述
- [HOW TO]-VirtualBox的虚拟机通过宿主机代理上网
- TEEC_AllocateSharedMemory()和 TEEC_RegisterSharedMemory()的总结
- zookeeper for mac安装