使用scala将数据写入linux上的MongoDB数据库
这篇主要记录一下数据量较大时使用scala处理数据,运用spark组件连接linux上的MongoDB并创建数据库进行数据写入
import com.mongodb.casbah.commons.MongoDBObject
import com.mongodb.casbah.{MongoClient, MongoClientURI}
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}/*** Movie数据集,数据集字段通过分割** 151^ 电影的ID* Rob Roy (1995)^ 电影的名称* In the highlands ....^ 电影的描述* 139 minutes^ 电影的时长* August 26, 1997^ 电影的发行日期* 1995^ 电影的拍摄日期* English ^ 电影的语言* Action|Drama|Romance|War ^ 电影的类型* Liam Neeson|Jessica Lange... 电影的演员* Michael Caton-Jones 电影的导演** tag1|tag2|tag3|.... 电影的Tag**/case class Movie(val mid: Int, val name: String, val descri: String, val timelong: String, val issue: String,val shoot: String, val language: String, val genres: String, val actors: String, val directors: String)/*** Rating数据集,用户对于电影的评分数据集,用,分割** 1, 用户的ID* 31, 电影的ID* 2.5, 用户对于电影的评分* 1260759144 用户对于电影评分的时间*/
case class Rating(val uid: Int, val mid: Int, val score: Double, val timestamp: Int)/*** Tag数据集,用户对于电影的标签数据集,用,分割** 15, 用户的ID* 1955, 电影的ID* dentist, 标签的具体内容* 1193435061 用户对于电影打标签的时间*/
case class Tag(val uid: Int, val mid: Int, val tag: String, val timestamp: Int)/*** MongoDB的连接配置* @param uri MongoDB的连接* @param db MongoDB要操作数据库*/
case class MongoConfig(val uri:String, val db:String)/*** ElasticSearch的连接配置* @param httpHosts Http的主机列表,以,分割* @param transportHosts Transport主机列表, 以,分割* @param index 需要操作的索引* @param clustername ES集群的名称,*/
case class ESConfig(val httpHosts:String, val transportHosts:String, val index:String, val clustername:String)// 数据的主加载服务
object DataLoader {val MOVIE_DATA_PATH = "/Users/jacquelin/IdeaProjects/MovieRecommendSystem/recommender/dataloader/src/main/resources/movies.csv"val RATING_DATA_PATH = "/Users/jacquelin/IdeaProjects/MovieRecommendSystem/recommender/dataloader/src/main/resources/ratings.csv"val TAG_DATA_PATH = "/Users/jacquelin/IdeaProjects/MovieRecommendSystem/recommender/dataloader/src/main/resources/tags.csv"val MONGODB_MOVIE_COLLECTION = "Movie"val MONGODB_RATING_COLLECTION = "Rating"val MONGODB_TAG_COLLECTION = "Tag"val ES_MOVIE_INDEX = "Movie"// 程序的入口def main(args: Array[String]): Unit = {val config = Map("spark.cores" -> "local[*]","mongo.uri" -> "mongodb://172.16.104.13:27017/recommender", //此处要换成自己的URI也即替换127.16.104.13,端口默认27017,recommender是我的数据库"mongo.db" -> "recommender","es.httpHosts" -> "linux:9200", "es.transportHosts" -> "linux:9300","es.index" -> "recommender","es.cluster.name" -> "es-cluster")// 需要创建一个SparkConf配置val sparkConf = new SparkConf().setAppName("DataLoader").setMaster(config.get("spark.cores").get)// 创建一个SparkSessionval spark = SparkSession.builder().config(sparkConf).getOrCreate()import spark.implicits._// 将Movie、Rating、Tag数据集加载进来val movieRDD = spark.sparkContext.textFile(MOVIE_DATA_PATH)//将MovieRDD装换为DataFrameval movieDF = movieRDD.map(item =>{val attr = item.split("\\^")Movie(attr(0).toInt,attr(1).trim,attr(2).trim,attr(3).trim,attr(4).trim,attr(5).trim,attr(6).trim,attr(7).trim,attr(8).trim,attr(9).trim)}).toDF()val ratingRDD = spark.sparkContext.textFile(RATING_DATA_PATH)//将ratingRDD转换为DataFrameval ratingDF = ratingRDD.map(item => {val attr = item.split(",")Rating(attr(0).toInt,attr(1).toInt,attr(2).toDouble,attr(3).toInt)}).toDF()val tagRDD = spark.sparkContext.textFile(TAG_DATA_PATH)//将tagRDD装换为DataFrameval tagDF = tagRDD.map(item => {val attr = item.split(",")Tag(attr(0).toInt,attr(1).toInt,attr(2).trim,attr(3).toInt)}).toDF()implicit val mongoConfig = MongoConfig(config.get("mongo.uri").get,config.get("mongo.db").get)// 需要将数据保存到MongoDB中storeDataInMongoDB(movieDF, ratingDF, tagDF)// 关闭Sparkspark.stop()}// 将数据保存到MongoDB中的方法def storeDataInMongoDB(movieDF: DataFrame, ratingDF:DataFrame, tagDF:DataFrame)(implicit mongoConfig: MongoConfig): Unit = {//新建一个到MongoDB的连接val mongoClient = MongoClient(MongoClientURI(mongoConfig.uri))//如果MongoDB中有对应的数据库,那么应该删除mongoClient(mongoConfig.db)(MONGODB_MOVIE_COLLECTION).dropCollection()mongoClient(mongoConfig.db)(MONGODB_RATING_COLLECTION).dropCollection()mongoClient(mongoConfig.db)(MONGODB_TAG_COLLECTION).dropCollection()//将当前数据写入到MongoDBmovieDF.write.option("uri",mongoConfig.uri).option("collection",MONGODB_MOVIE_COLLECTION).mode("overwrite").format("com.mongodb.spark.sql").save()ratingDF.write.option("uri",mongoConfig.uri).option("collection",MONGODB_RATING_COLLECTION).mode("overwrite").format("com.mongodb.spark.sql").save()tagDF.write.option("uri",mongoConfig.uri).option("collection",MONGODB_TAG_COLLECTION).mode("overwrite").format("com.mongodb.spark.sql").save()//对数据表建索引mongoClient(mongoConfig.db)(MONGODB_MOVIE_COLLECTION).createIndex(MongoDBObject("mid" -> 1))mongoClient(mongoConfig.db)(MONGODB_RATING_COLLECTION).createIndex(MongoDBObject("uid" -> 1))mongoClient(mongoConfig.db)(MONGODB_RATING_COLLECTION).createIndex(MongoDBObject("mid" -> 1))mongoClient(mongoConfig.db)(MONGODB_TAG_COLLECTION).createIndex(MongoDBObject("uid" -> 1))mongoClient(mongoConfig.db)(MONGODB_TAG_COLLECTION).createIndex(MongoDBObject("mid" -> 1))//关闭MongoDB的连接mongoClient.close()}}
studio 3t里连接MongoDB数据库能够可视化看到:
使用scala将数据写入linux上的MongoDB数据库相关推荐
- du -sh 如何找到最大的文件夹_小白必看!手把手教你如何在linux上安装redis数据库...
首先我们要清楚redis是什么? redis是一种非关系型数据库,它与MySQL的这种关系型数据库不同,MySQL是将数据存储在磁盘中,而redis是储存在内存中.一般很多公司都是使用MySQL+re ...
- Linux上连接sybase数据库
在Linux上连接sybase数据库 用命令isql isql命令的使用 简单版本 isql -U aaa -P aaa -U 用户名 -P表示密码 这样访问 ...
- 如何在linux上安装sqlite数据库
如何在linux上安装sqlite数据库 一.下载 二.解压 三.配置(configure) 四.编译和安装 五.执行sqlite3程序 六.测试代码 一.下载 首先要先下载sqlite3源码包 链接 ...
- studio 3t连接linux上的MongoDB
一.linux上的步骤 用本地wmware上的linux测试,先在linux上启动MongoDB并测试其功能: 留意这句:db.createUser({user:"admin",p ...
- oracle12541 linux,PLSQL连接Linux上的oracle数据库出现,ORA-12541 TNS 无监听程序
PLSQL连接Linux上的oracle数据库出现,ORA-12541 TNS 无监听程序 外部的PLSQL无法连接Linux上的oracle数据库,出现ORA-12541 TNS 无监听程序错误.待 ...
- Linux上安装MongoDB
MongoDB在Windows上的安装过程整体上来说并不难,网上的资料也比较多,这里我就不介绍了,我主要说下如何在Linux环境下安装MongoDB. 环境: CentOS 7 MongoDB 3.4 ...
- (翻译) MongoDB(10) 在 Red Hat 企业版或者 Centos Linux 上安装MongoDB社区版
概述 使用这个教程在 Red Hat 企业版 Linux 或者 CentOS6/7 Linux 使用 .rpm 软件包安装 MongoDB 社区版. 虽然一些发行版包含自己的 MongoDB 软件包, ...
- DStream输出之使用foreachRDD()将数据写入诸如Mysql的外部数据库中
前言 输出操作指定了对流数据经转化操作得到的数据所要执行的操作(例如把结果推入外部数据库或输出到屏幕上).与RDD中的惰性求值类似,如果一个DStream及其派生出的DStream都没有被执行输出操作 ...
- 从sqlserver中数据写入mysql_从SQL server数据库导入Mysql数据库的体验
起原:网海拾贝 ,因任务需要,要将寄存在sqlserver数据库中的数据全部导入到mysql数据库中,在网上集合关连资料,找到两种体例,而今分袂谈谈对他 起原:网海拾贝 因任务需要,要将寄存在sql ...
最新文章
- python3 Parallel Python
- SpringMVC访问静态资源的三种方式
- 编程之美-程序理解和时间分析整理
- 【知识图谱】如何构建知识体系:知识图谱搭建的第一步
- 使用Oracle UTL_FILE包操作文件
- Oracle JDBC中的PreparedStatement占位符过多
- u盘分为windows和linux启动,【电脑软件】Ventoy 官方版,一个U盘,同时拥有启动win+linux+Ubuntu...
- makefile 基础(转)
- 梯度消失和梯度爆炸_梯度消失梯度爆炸-Gradient Clip
- C常用的字符串函数实现
- OpenGL超级宝典 绘制第一个三角形
- CUDA编程技术汇总
- 卡巴斯基激活码 卡巴斯基2010激活码下载
- 通信网络定级备案怎么做?工信部信息系统定级备案流程介绍
- 10大顶级运营商转型案例剖析
- Leetcode799-香槟塔
- Visual Paradigm 下载安装及使用
- 无法将类中的构造器应用到给定类型
- 公司组织管理与权限管理的设计原则
- 猿编程python分为几个阶段_各个阶段的python学习路线?
热门文章
- 你理解常见如阿里,和友商大数据平台的技术体系差异以及发展趋势和技术瓶颈,在存储和计算两个方面进行概述
- java微信二维码第三方后台登陆实现 ( 一 )
- 个人免费可访问网页制作【GitHub】及其二维码制作(需要有网页源码)——论前端的浪漫
- 浅尝辄止MongoDB:基础
- 【原创】畅言实现单点登录的设计流程和技术细节(1/2)
- hrm项目-day02
- 在2B和2C之间,还有一个2H
- 云仓是如何运行的?如何一件代发的?
- python中dice常见问题_【Pytorch】 Dice系数与Dice Loss损失函数实现
- Premiere Pro mac 22.6版本更新功能