这篇主要记录一下数据量较大时使用scala处理数据,运用spark组件连接linux上的MongoDB并创建数据库进行数据写入

import com.mongodb.casbah.commons.MongoDBObject
import com.mongodb.casbah.{MongoClient, MongoClientURI}
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}/*** Movie数据集,数据集字段通过分割** 151^                          电影的ID* Rob Roy (1995)^               电影的名称* In the highlands ....^        电影的描述* 139 minutes^                  电影的时长* August 26, 1997^              电影的发行日期* 1995^                         电影的拍摄日期* English ^                     电影的语言* Action|Drama|Romance|War ^    电影的类型* Liam Neeson|Jessica Lange...  电影的演员* Michael Caton-Jones           电影的导演** tag1|tag2|tag3|....           电影的Tag**/case class Movie(val mid: Int, val name: String, val descri: String, val timelong: String, val issue: String,val shoot: String, val language: String, val genres: String, val actors: String, val directors: String)/*** Rating数据集,用户对于电影的评分数据集,用,分割** 1,           用户的ID* 31,          电影的ID* 2.5,         用户对于电影的评分* 1260759144   用户对于电影评分的时间*/
case class Rating(val uid: Int, val mid: Int, val score: Double, val timestamp: Int)/*** Tag数据集,用户对于电影的标签数据集,用,分割** 15,          用户的ID* 1955,        电影的ID* dentist,     标签的具体内容* 1193435061   用户对于电影打标签的时间*/
case class Tag(val uid: Int, val mid: Int, val tag: String, val timestamp: Int)/*** MongoDB的连接配置* @param uri   MongoDB的连接* @param db    MongoDB要操作数据库*/
case class MongoConfig(val uri:String, val db:String)/*** ElasticSearch的连接配置* @param httpHosts       Http的主机列表,以,分割* @param transportHosts  Transport主机列表, 以,分割* @param index           需要操作的索引* @param clustername     ES集群的名称,*/
case class ESConfig(val httpHosts:String, val transportHosts:String, val index:String, val clustername:String)// 数据的主加载服务
object DataLoader {val MOVIE_DATA_PATH = "/Users/jacquelin/IdeaProjects/MovieRecommendSystem/recommender/dataloader/src/main/resources/movies.csv"val RATING_DATA_PATH = "/Users/jacquelin/IdeaProjects/MovieRecommendSystem/recommender/dataloader/src/main/resources/ratings.csv"val TAG_DATA_PATH = "/Users/jacquelin/IdeaProjects/MovieRecommendSystem/recommender/dataloader/src/main/resources/tags.csv"val MONGODB_MOVIE_COLLECTION = "Movie"val MONGODB_RATING_COLLECTION = "Rating"val MONGODB_TAG_COLLECTION = "Tag"val ES_MOVIE_INDEX = "Movie"// 程序的入口def main(args: Array[String]): Unit = {val config = Map("spark.cores" -> "local[*]","mongo.uri" -> "mongodb://172.16.104.13:27017/recommender",      //此处要换成自己的URI也即替换127.16.104.13,端口默认27017,recommender是我的数据库"mongo.db" -> "recommender","es.httpHosts" -> "linux:9200",       "es.transportHosts" -> "linux:9300","es.index" -> "recommender","es.cluster.name" -> "es-cluster")// 需要创建一个SparkConf配置val sparkConf = new SparkConf().setAppName("DataLoader").setMaster(config.get("spark.cores").get)// 创建一个SparkSessionval spark = SparkSession.builder().config(sparkConf).getOrCreate()import spark.implicits._// 将Movie、Rating、Tag数据集加载进来val movieRDD = spark.sparkContext.textFile(MOVIE_DATA_PATH)//将MovieRDD装换为DataFrameval movieDF = movieRDD.map(item =>{val attr = item.split("\\^")Movie(attr(0).toInt,attr(1).trim,attr(2).trim,attr(3).trim,attr(4).trim,attr(5).trim,attr(6).trim,attr(7).trim,attr(8).trim,attr(9).trim)}).toDF()val ratingRDD = spark.sparkContext.textFile(RATING_DATA_PATH)//将ratingRDD转换为DataFrameval ratingDF = ratingRDD.map(item => {val attr = item.split(",")Rating(attr(0).toInt,attr(1).toInt,attr(2).toDouble,attr(3).toInt)}).toDF()val tagRDD = spark.sparkContext.textFile(TAG_DATA_PATH)//将tagRDD装换为DataFrameval tagDF = tagRDD.map(item => {val attr = item.split(",")Tag(attr(0).toInt,attr(1).toInt,attr(2).trim,attr(3).toInt)}).toDF()implicit val mongoConfig = MongoConfig(config.get("mongo.uri").get,config.get("mongo.db").get)// 需要将数据保存到MongoDB中storeDataInMongoDB(movieDF, ratingDF, tagDF)// 关闭Sparkspark.stop()}// 将数据保存到MongoDB中的方法def storeDataInMongoDB(movieDF: DataFrame, ratingDF:DataFrame, tagDF:DataFrame)(implicit mongoConfig: MongoConfig): Unit = {//新建一个到MongoDB的连接val mongoClient = MongoClient(MongoClientURI(mongoConfig.uri))//如果MongoDB中有对应的数据库,那么应该删除mongoClient(mongoConfig.db)(MONGODB_MOVIE_COLLECTION).dropCollection()mongoClient(mongoConfig.db)(MONGODB_RATING_COLLECTION).dropCollection()mongoClient(mongoConfig.db)(MONGODB_TAG_COLLECTION).dropCollection()//将当前数据写入到MongoDBmovieDF.write.option("uri",mongoConfig.uri).option("collection",MONGODB_MOVIE_COLLECTION).mode("overwrite").format("com.mongodb.spark.sql").save()ratingDF.write.option("uri",mongoConfig.uri).option("collection",MONGODB_RATING_COLLECTION).mode("overwrite").format("com.mongodb.spark.sql").save()tagDF.write.option("uri",mongoConfig.uri).option("collection",MONGODB_TAG_COLLECTION).mode("overwrite").format("com.mongodb.spark.sql").save()//对数据表建索引mongoClient(mongoConfig.db)(MONGODB_MOVIE_COLLECTION).createIndex(MongoDBObject("mid" -> 1))mongoClient(mongoConfig.db)(MONGODB_RATING_COLLECTION).createIndex(MongoDBObject("uid" -> 1))mongoClient(mongoConfig.db)(MONGODB_RATING_COLLECTION).createIndex(MongoDBObject("mid" -> 1))mongoClient(mongoConfig.db)(MONGODB_TAG_COLLECTION).createIndex(MongoDBObject("uid" -> 1))mongoClient(mongoConfig.db)(MONGODB_TAG_COLLECTION).createIndex(MongoDBObject("mid" -> 1))//关闭MongoDB的连接mongoClient.close()}}

studio 3t里连接MongoDB数据库能够可视化看到:

使用scala将数据写入linux上的MongoDB数据库相关推荐

  1. du -sh 如何找到最大的文件夹_小白必看!手把手教你如何在linux上安装redis数据库...

    首先我们要清楚redis是什么? redis是一种非关系型数据库,它与MySQL的这种关系型数据库不同,MySQL是将数据存储在磁盘中,而redis是储存在内存中.一般很多公司都是使用MySQL+re ...

  2. Linux上连接sybase数据库

    在Linux上连接sybase数据库  用命令isql isql命令的使用  简单版本          isql -U aaa -P aaa           -U 用户名 -P表示密码 这样访问 ...

  3. 如何在linux上安装sqlite数据库

    如何在linux上安装sqlite数据库 一.下载 二.解压 三.配置(configure) 四.编译和安装 五.执行sqlite3程序 六.测试代码 一.下载 首先要先下载sqlite3源码包 链接 ...

  4. studio 3t连接linux上的MongoDB

    一.linux上的步骤 用本地wmware上的linux测试,先在linux上启动MongoDB并测试其功能: 留意这句:db.createUser({user:"admin",p ...

  5. oracle12541 linux,PLSQL连接Linux上的oracle数据库出现,ORA-12541 TNS 无监听程序

    PLSQL连接Linux上的oracle数据库出现,ORA-12541 TNS 无监听程序 外部的PLSQL无法连接Linux上的oracle数据库,出现ORA-12541 TNS 无监听程序错误.待 ...

  6. Linux上安装MongoDB

    MongoDB在Windows上的安装过程整体上来说并不难,网上的资料也比较多,这里我就不介绍了,我主要说下如何在Linux环境下安装MongoDB. 环境: CentOS 7 MongoDB 3.4 ...

  7. (翻译) MongoDB(10) 在 Red Hat 企业版或者 Centos Linux 上安装MongoDB社区版

    概述 使用这个教程在 Red Hat 企业版 Linux 或者 CentOS6/7 Linux 使用 .rpm 软件包安装 MongoDB 社区版. 虽然一些发行版包含自己的 MongoDB 软件包, ...

  8. DStream输出之使用foreachRDD()将数据写入诸如Mysql的外部数据库中

    前言 输出操作指定了对流数据经转化操作得到的数据所要执行的操作(例如把结果推入外部数据库或输出到屏幕上).与RDD中的惰性求值类似,如果一个DStream及其派生出的DStream都没有被执行输出操作 ...

  9. 从sqlserver中数据写入mysql_从SQL server数据库导入Mysql数据库的体验

    起原:网海拾贝  ,因任务需要,要将寄存在sqlserver数据库中的数据全部导入到mysql数据库中,在网上集合关连资料,找到两种体例,而今分袂谈谈对他 起原:网海拾贝 因任务需要,要将寄存在sql ...

最新文章

  1. python3 Parallel Python
  2. SpringMVC访问静态资源的三种方式
  3. 编程之美-程序理解和时间分析整理
  4. 【知识图谱】如何构建知识体系:知识图谱搭建的第一步
  5. 使用Oracle UTL_FILE包操作文件
  6. Oracle JDBC中的PreparedStatement占位符过多
  7. u盘分为windows和linux启动,【电脑软件】Ventoy 官方版,一个U盘,同时拥有启动win+linux+Ubuntu...
  8. makefile 基础(转)
  9. 梯度消失和梯度爆炸_梯度消失梯度爆炸-Gradient Clip
  10. C常用的字符串函数实现
  11. OpenGL超级宝典 绘制第一个三角形
  12. CUDA编程技术汇总
  13. 卡巴斯基激活码 卡巴斯基2010激活码下载
  14. 通信网络定级备案怎么做?工信部信息系统定级备案流程介绍
  15. 10大顶级运营商转型案例剖析
  16. Leetcode799-香槟塔
  17. Visual Paradigm 下载安装及使用
  18. 无法将类中的构造器应用到给定类型
  19. 公司组织管理与权限管理的设计原则
  20. 猿编程python分为几个阶段_各个阶段的python学习路线?

热门文章

  1. 你理解常见如阿里,和友商大数据平台的技术体系差异以及发展趋势和技术瓶颈,在存储和计算两个方面进行概述
  2. java微信二维码第三方后台登陆实现 ( 一 )
  3. 个人免费可访问网页制作【GitHub】及其二维码制作(需要有网页源码)——论前端的浪漫
  4. 浅尝辄止MongoDB:基础
  5. 【原创】畅言实现单点登录的设计流程和技术细节(1/2)
  6. hrm项目-day02
  7. 在2B和2C之间,还有一个2H
  8. 云仓是如何运行的?如何一件代发的?
  9. python中dice常见问题_【Pytorch】 Dice系数与Dice Loss损失函数实现
  10. Premiere Pro mac 22.6版本更新功能