絮叨两句:
博主是一名软件工程系的在校生,利用博客记录自己所学的知识,也希望能帮助到正在学习的同学们
人的一生中会遇到各种各样的困难和折磨,逃避是解决不了问题的,唯有以乐观的精神去迎接生活的挑战
少年易老学难成,一寸光阴不可轻。
最喜欢的一句话:今日事,今日毕


博主刚刚接触爬虫,有什么不足之处请大家谅解,也希望能指导一下


系列文章目录

从Python爬虫到Spark预处理数据的真实需求[一]
从Python爬虫到Spark预处理数据的真实需求[二]
从Python爬虫到Spark预处理数据的真实需求[三]
从Python爬虫到Spark预处理数据的真实需求[四]
从Python爬虫到Spark预处理数据的真实需求[五]


文章目录

  • 系列文章目录
  • 前言
  • 数据
  • 代码
    • pom.xml文件
    • 创建一个样例类
    • 读取数据进行处理
      • 火花塞
      • 机油
      • 轮胎
      • 刹车片
      • 添加剂
      • 原厂件
  • 总结

前言

这一章是使用Spark对数据进行处理


提示:以下是本篇文章正文内容,下面案例可供参考

数据




将相同几张表的相同字段作为单个字段,不同字段的作为一个json格式放入一个字段中

代码

pom.xml文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>SparkConnectMysql</artifactId><version>1.0-SNAPSHOT</version><!-- 指定仓库位置,依次为aliyun、cloudera和jboss仓库 --><repositories><repository><id>aliyun</id><url>http://maven.aliyun.com/nexus/content/groups/public/</url></repository><repository><id>cloudera</id><url>https://repository.cloudera.com/artifactory/cloudera-repos/</url></repository><repository><id>jboss</id><url>http://repository.jboss.com/nexus/content/groups/public</url></repository></repositories><properties>
<!--        111--><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><encoding>UTF-8</encoding><scala.version>2.11.8</scala.version><scala.compat.version>2.11</scala.compat.version><hadoop.version>2.7.4</hadoop.version><spark.version>2.2.0</spark.version></properties><dependencies><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_2.11</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive-thriftserver_2.11</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.11</artifactId><version>${spark.version}</version></dependency><!-- <dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-8_2.11</artifactId><version>${spark.version}</version></dependency>--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.11</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql-kafka-0-10_2.11</artifactId><version>${spark.version}</version></dependency><!--<dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>2.6.0-mr1-cdh5.14.0</version></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>1.2.0-cdh5.14.0</version></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-server</artifactId><version>1.2.0-cdh5.14.0</version></dependency>--><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>2.7.4</version></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>1.3.1</version></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-server</artifactId><version>1.3.1</version></dependency><dependency><groupId>com.typesafe</groupId><artifactId>config</artifactId><version>1.3.3</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.38</version></dependency></dependencies><build><sourceDirectory>src/main/java</sourceDirectory><testSourceDirectory>src/test/java</testSourceDirectory><plugins><!-- 指定编译java的插件 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.5.1</version></plugin><!-- 指定编译scala的插件 --><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.2.2</version><executions><execution><goals><goal>compile</goal><goal>testCompile</goal></goals><configuration><args><arg>-dependencyfile</arg><arg>${project.build.directory}/.scala_dependencies</arg></args></configuration></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-surefire-plugin</artifactId><version>2.18.1</version><configuration><useFile>false</useFile><disableXmlReport>true</disableXmlReport><includes><include>**/*Test.*</include><include>**/*Suite.*</include></includes></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.3</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformerimplementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass></mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins></build></project>

创建一个样例类

case class TRulet(id:Int,govid:String,shortname:String,brand:String,govurl:String,price:String,name:String,picurl:String,description:String)

读取数据进行处理

火花塞

import java.util.Propertiesimport org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}object ReadMysqlToMysqlHHS {def setdata_args( data:String): String ={var rulet=""if (data=="NULL"){rulet="\"\""}else{rulet= "\""+data+"\""}return rulet}def setdata_NULL_args( data:String): String ={var rulet=""if (data=="NULL"){rulet="\"\""return rulet}else{return data}}def main(args: Array[String]): Unit = {val spark = SparkSession.builder().master("local[*]").appName("Mysql").getOrCreate()val sparkContext = spark.sparkContextsparkContext.setLogLevel("WARN")import  spark.implicits._//引入java 和scala相互转换import scala.collection.JavaConverters._//引入sparkSQL的内置函数import org.apache.spark.sql.functions._val mysqlUrl="jdbc:mysql://localhost:3306/jd_qipei"var table="xxuan_car_jd_hhs_product"var properties = new Properties()properties.put("user","root")properties.put("password","root")val mysqlCooenct = spark.read.jdbc(mysqlUrl, table, properties)val ALLDF: DataFrame = mysqlCooenct.select('skuid, 'name, 'brand, 'price, 'url, 'commodity_Name, 'image, 'sales, 'material, 'type,'ArticleNumbera, 'GrossWeight)val ss: List[TRulet] = ALLDF.map(row => {var skuid = row.getAs("skuid").toStringskuid=setdata_NULL_args(skuid)var name = row.getAs("name").toStringname=setdata_NULL_args(name)var brand = row.getAs("brand").toStringbrand=setdata_NULL_args(brand)var url = row.getAs("url").toStringurl=setdata_NULL_args(url)var price = row.getAs("price").toStringprice=setdata_NULL_args(price)var commodity_Name = row.getAs("commodity_Name").toStringcommodity_Name=setdata_NULL_args(commodity_Name)var image = row.getAs("image").toStringimage=setdata_NULL_args(image)var sales = row.getAs("sales").toStringsales=setdata_args(sales)var material = row.getAs("material").toStringmaterial=setdata_args(material)var types = row.getAs("type").toStringtypes=setdata_args(types)var ArticleNumbera = row.getAs("ArticleNumbera").toStringArticleNumbera=setdata_args(ArticleNumbera)var GrossWeight = row.getAs("GrossWeight").toStringGrossWeight=setdata_args(GrossWeight)var all_Josn="{\"sales\":"+sales+","+"\"material\":"+material+","+"\"types\":"+types+","+"\"ArticleNumbera\":"+ArticleNumbera+","+"\"GrossWeight\":"+GrossWeight+"}"TRulet(id=0,govid=skuid,shortname=name,brand = brand, govurl=url , price = price, name = commodity_Name, picurl = image,description = all_Josn)}).collectAsList().asScala.toListval result_Json = ss.toDF()result_Json.show()val sqlconnect = result_Json.write.mode(saveMode = SaveMode.Append).jdbc(mysqlUrl, "xxuan_car_jd_test_product", properties)result_Json.show()}
}

机油

import java.util.Propertiesimport org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}object ReadMysqlToMysqlJY {def setdata_args( data:String): String ={var rulet=""if (data=="NULL"){rulet="\"\""}else{rulet= "\""+data+"\""}return rulet}def setdata_NULL_args( data:String): String ={var rulet=""if (data=="NULL"){rulet="\"\""return rulet}else{return data}}def main(args: Array[String]): Unit = {val spark = SparkSession.builder().master("local[*]").appName("Mysql").getOrCreate()val sparkContext = spark.sparkContextsparkContext.setLogLevel("WARN")import  spark.implicits._//引入java 和scala相互转换import scala.collection.JavaConverters._//引入sparkSQL的内置函数import org.apache.spark.sql.functions._val mysqlUrl="jdbc:mysql://localhost:3306/jd_qipei"var table="xxuan_car_jd_mobil_product"var properties = new Properties()properties.put("user","root")properties.put("password","root")val mysqlCooenct = spark.read.jdbc(mysqlUrl, table, properties)val ALLDF: DataFrame = mysqlCooenct.select('skuid, 'name, 'brand, 'price, 'type, 'url, 'originplace, 'netweight, 'commodity_Name, 'image, 'viscosity, 'volume)val ss: List[TRulet] = ALLDF.map(row => {var skuid = row.getAs("skuid").toStringskuid=setdata_NULL_args(skuid)var name = row.getAs("name").toStringname=setdata_NULL_args(name)var brand = row.getAs("brand").toStringbrand=setdata_NULL_args(brand)var url = row.getAs("url").toStringurl=setdata_NULL_args(url)var price = row.getAs("price").toStringprice=setdata_NULL_args(price)var commodity_Name = row.getAs("commodity_Name").toStringcommodity_Name=setdata_NULL_args(commodity_Name)var image = row.getAs("image").toStringimage=setdata_NULL_args(image)var types = row.getAs("type").toStringtypes=setdata_args(types)var originplace = row.getAs("originplace").toStringoriginplace=setdata_args(originplace)var netweight = row.getAs("netweight").toStringnetweight=setdata_args(netweight)var viscosity = row.getAs("viscosity").toStringviscosity=setdata_args(viscosity)var volume = row.getAs("volume").toStringvolume=setdata_args(volume)var all_Josn="{\"types\":"+types+","+"\"originplace\":"+originplace+","+"\"netweight\":"+netweight+","+"\"viscosity\":"+viscosity+","+"\"volume\":"+volume+"}"TRulet(id=0,govid=skuid,shortname=name,brand = brand, govurl=url , price = price, name = commodity_Name, picurl = image,description = all_Josn)}).collectAsList().asScala.toListval result_Json = ss.toDF()result_Json.show()val sqlconnect = result_Json.write.mode(saveMode = SaveMode.Append).jdbc(mysqlUrl, "xxuan_car_jd_test_product", properties)}
}

轮胎

import java.util.Propertiesimport org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}object ReadMysqlToMysqlLT {def setdata_args( data:String): String ={var rulet=""if (data=="NULL"){rulet="\"\""}else{rulet= "\""+data+"\""}return rulet}def setdata_NULL_args( data:String): String ={var rulet=""if (data=="NULL"){rulet="\"\""return rulet}else{return data}}def main(args: Array[String]): Unit = {val spark = SparkSession.builder().master("local[*]").appName("Mysql").config("spark.driver.memory", "6g").getOrCreate()val sparkContext = spark.sparkContextsparkContext.setLogLevel("WARN")import  spark.implicits._//引入java 和scala相互转换import scala.collection.JavaConverters._//引入sparkSQL的内置函数import org.apache.spark.sql.functions._val mysqlUrl="jdbc:mysql://localhost:3306/jd_qipei"var table="xxuan_car_jd_lt_product"var properties = new Properties()properties.put("user","root")properties.put("password","root")val mysqlCooenct = spark.read.jdbc(mysqlUrl, table, properties)val ALLDF: DataFrame = mysqlCooenct.select('skuid, 'name, 'brand, 'url, 'price, 'commodity_Name, 'image, 'netweight, 'originplace,'size,'width, 'number, 'performance, 'Flattening, 'characteristics, 'type)val ss: List[TRulet] = ALLDF.map(row => {var skuid = row.getAs("skuid").toStringskuid=setdata_NULL_args(skuid)var name = row.getAs("name").toStringname=setdata_NULL_args(name)var brand = row.getAs("brand").toStringbrand=setdata_NULL_args(brand)var url = row.getAs("url").toStringurl=setdata_NULL_args(url)var price = row.getAs("price").toStringprice=setdata_NULL_args(price)var commodity_Name = row.getAs("commodity_Name").toStringcommodity_Name=setdata_NULL_args(commodity_Name)var image = row.getAs("image").toStringimage=setdata_NULL_args(image)var netweight = row.getAs("netweight").toStringnetweight=setdata_args(netweight)var originplace = row.getAs("originplace").toStringoriginplace=setdata_args(originplace)var size = row.getAs("size").toStringsize=setdata_args(size)var width = row.getAs("width").toStringwidth=setdata_args(width)var number = row.getAs("number").toStringnumber=setdata_args(number)var performance = row.getAs("performance").toStringperformance=setdata_args(performance)var Flattening = row.getAs("Flattening").toStringFlattening=setdata_args(Flattening)var characteristics = row.getAs("characteristics").toStringcharacteristics=setdata_args(characteristics)var types = row.getAs("type").toStringtypes=setdata_args(types)var all_Josn="{\"netweight\":"+netweight+","+"\"originplace\":"+originplace+","+"\"size\":"+size+","+"\"width\":"+width+","+"\"number\":"+number+","+"\"performance\":"+performance+","+"\"Flattening\":"+Flattening+","+"\"characteristics\":"+characteristics+","+"\"types\":"+types+"}"TRulet(id=0,govid=skuid,shortname=name,brand = brand, govurl=url , price = price, name = commodity_Name, picurl = image,description = all_Josn)}).collectAsList().asScala.toListval result_Json = ss.toDF()result_Json.show()val sqlconnect = result_Json.write.mode(saveMode = SaveMode.Append).jdbc(mysqlUrl, "xxuan_car_jd_test_product", properties)}
}

刹车片

import java.util.Propertiesimport org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}object ReadMysqlToMysqlSCP {def setdata_args( data:String): String ={var rulet=""if (data=="NULL"){rulet="\"\""}else{rulet= "\""+data+"\""}return rulet}def setdata_NULL_args( data:String): String ={var rulet=""if (data=="NULL"){rulet="\"\""return rulet}else{return data}}def main(args: Array[String]): Unit = {val spark = SparkSession.builder().master("local[*]").appName("Mysql").getOrCreate()val sparkContext = spark.sparkContextsparkContext.setLogLevel("WARN")import  spark.implicits._//引入java 和scala相互转换import scala.collection.JavaConverters._//引入sparkSQL的内置函数import org.apache.spark.sql.functions._val mysqlUrl="jdbc:mysql://localhost:3306/jd_qipei"var table="xxuan_car_jd_scp_product"var properties = new Properties()properties.put("user","root")properties.put("password","root")val mysqlCooenct = spark.read.jdbc(mysqlUrl, table, properties)val ALLDF: DataFrame = mysqlCooenct.select('skuid, 'name, 'brand, 'price, 'url, 'commodity_Name, 'image, 'ArticleNumber, 'boiling, 'package,'GrossWeight, 'CommodityOrigin, 'process, 'Installation, 'type, 'texture)val ss: List[TRulet] = ALLDF.map(row => {var skuid = row.getAs("skuid").toStringskuid=setdata_NULL_args(skuid)var name = row.getAs("name").toStringname=setdata_NULL_args(name)var brand = row.getAs("brand").toStringbrand=setdata_NULL_args(brand)var url = row.getAs("url").toStringurl=setdata_NULL_args(url)var price = row.getAs("price").toStringprice=setdata_NULL_args(price)var commodity_Name = row.getAs("commodity_Name").toStringcommodity_Name=setdata_NULL_args(commodity_Name)var image = row.getAs("image").toStringimage=setdata_NULL_args(image)var ArticleNumber = row.getAs("ArticleNumber").toStringArticleNumber=setdata_args(ArticleNumber)var boiling = row.getAs("boiling").toStringboiling=setdata_args(boiling)var packages = row.getAs("package").toStringpackages=setdata_args(packages)var GrossWeight = row.getAs("GrossWeight").toStringGrossWeight=setdata_args(GrossWeight)var CommodityOrigin = row.getAs("CommodityOrigin").toStringCommodityOrigin=setdata_args(CommodityOrigin)var process = row.getAs("process").toStringprocess=setdata_args(process)var Installation = row.getAs("Installation").toStringInstallation=setdata_args(Installation)var types = row.getAs("type").toStringtypes=setdata_args(types)var texture = row.getAs("texture").toStringtexture=setdata_args(texture)var all_Josn="{\"ArticleNumber\":"+ArticleNumber+","+"\"boiling\":"+boiling+","+"\"packages\":"+packages+","+"\"GrossWeight\":"+GrossWeight+","+"\"CommodityOrigin\":"+CommodityOrigin+","+"\"process\":"+process+","+"\"Installation\":"+Installation+","+"\"type\":"+types+","+"\"texture\":"+texture+"}"TRulet(id=0,govid=skuid,shortname=name,brand = brand, govurl=url , price = price, name = commodity_Name, picurl = image,description = all_Josn)}).collectAsList().asScala.toListval result_Json = ss.toDF()result_Json.show()val sqlconnect = result_Json.write.mode(saveMode = SaveMode.Append).jdbc(mysqlUrl, "xxuan_car_jd_test_product", properties)}
}

添加剂

import java.util
import java.util.Propertiesimport org.apache.spark.sql.{DataFrame, Dataset, SaveMode, SparkSession}object ReadMysqlToMysqlTJJ {def setdata_args( data:String): String ={var rulet=""if (data=="NULL"){rulet="\"\""}else{rulet= "\""+data+"\""}return rulet}def setdata_NULL_args( data:String): String ={var rulet=""if (data=="NULL"){rulet="\"\""return rulet}else{return data}}def main(args: Array[String]): Unit = {val spark = SparkSession.builder().master("local[*]").appName("Mysql").getOrCreate()val sparkContext = spark.sparkContextsparkContext.setLogLevel("WARN")import  spark.implicits._//引入java 和scala相互转换import scala.collection.JavaConverters._//引入sparkSQL的内置函数import org.apache.spark.sql.functions._val mysqlUrl="jdbc:mysql://localhost:3306/jd_qipei"var table="xxuan_car_jd_tjj_product"var properties = new Properties()properties.put("user","root")properties.put("password","root")val mysqlCooenct = spark.read.jdbc(mysqlUrl, table, properties)val ALLDF: DataFrame = mysqlCooenct.select('skuid, 'name, 'brand, 'price, 'url, 'commodity_Name, 'image, 'Additivetype, 'TypesOfAdditives, 'NetContent, 'ArticleNumber,'GrossWeight, 'CommodityOrigin)val ss: List[TRulet] = ALLDF.map(row => {var skuid = row.getAs("skuid").toStringskuid=setdata_NULL_args(skuid)var name = row.getAs("name").toStringname=setdata_NULL_args(name)var brand = row.getAs("brand").toStringbrand=setdata_NULL_args(brand)var url = row.getAs("url").toStringurl=setdata_NULL_args(url)var price = row.getAs("price").toStringprice=setdata_NULL_args(price)var commodity_Name = row.getAs("commodity_Name").toStringcommodity_Name=setdata_NULL_args(commodity_Name)var image = row.getAs("image").toStringimage=setdata_NULL_args(image)var Additivetype = row.getAs("Additivetype").toStringAdditivetype=setdata_args(Additivetype)var TypesOfAdditives = row.getAs("TypesOfAdditives").toStringTypesOfAdditives=setdata_args(TypesOfAdditives)var NetContent = row.getAs("NetContent").toStringNetContent=setdata_args(NetContent)var ArticleNumber = row.getAs("ArticleNumber").toStringArticleNumber=setdata_args(ArticleNumber)var GrossWeight = row.getAs("GrossWeight").toStringGrossWeight=setdata_args(GrossWeight)var CommodityOrigin = row.getAs("CommodityOrigin").toStringCommodityOrigin=setdata_args(CommodityOrigin)var all_Josn="{\"Additivetype\":"+Additivetype+","+"\"TypesOfAdditives\":"+TypesOfAdditives+","+"\"NetContent\":"+NetContent+","+"\"ArticleNumber\":"+ArticleNumber+","+"\"GrossWeight\":"+GrossWeight+","+"\"CommodityOrigin\":"+CommodityOrigin+"}"TRulet(id=0,govid=skuid,shortname=name,brand = brand, govurl=url , price = price, name = commodity_Name, picurl = image,description = all_Josn)}).collectAsList().asScala.toListval result_Json = ss.toDF()result_Json.show()val sqlconnect = result_Json.write.mode(saveMode = SaveMode.Append).jdbc(mysqlUrl, "xxuan_car_jd_test_product", properties)}
}

原厂件

import java.util
import java.util.Propertiesimport org.apache.spark.sql.{DataFrame, Dataset, SaveMode, SparkSession}object ReadMysqlToMysqlYCJ {def setdata_args( data:String): String ={var rulet=""if (data=="NULL"){rulet="\"\""}else{rulet= "\""+data+"\""}return rulet}def setdata_NULL_args( data:String): String ={var rulet=""if (data=="NULL"){rulet="\"\""return rulet}else{return data}}def main(args: Array[String]): Unit = {val spark = SparkSession.builder().master("local[*]").appName("Mysql").getOrCreate()val sparkContext = spark.sparkContextsparkContext.setLogLevel("WARN")import  spark.implicits._//引入java 和scala相互转换import scala.collection.JavaConverters._//引入sparkSQL的内置函数import org.apache.spark.sql.functions._val mysqlUrl="jdbc:mysql://localhost:3306/jd_qipei"var table="xxuan_car_jd_ycj_product"var properties = new Properties()properties.put("user","root")properties.put("password","root")val mysqlCooenct = spark.read.jdbc(mysqlUrl, table, properties)val ALLDF: DataFrame = mysqlCooenct.select( 'skuid, 'name, 'brand, 'url, 'price, 'commodity_Name, 'image,'freezing, 'originplace, 'netweight, 'commodity_Name, 'category, 'package, 'boiling, 'sales, 'installation, 'transmission)val ss: List[TRulet] = ALLDF.map(row => {var skuid = row.getAs("skuid").toStringskuid=setdata_NULL_args(skuid)var name = row.getAs("name").toStringname=setdata_NULL_args(name)var brand = row.getAs("brand").toStringbrand=setdata_NULL_args(brand)var url = row.getAs("url").toStringurl=setdata_NULL_args(url)var price = row.getAs("price").toStringprice=setdata_NULL_args(price)var commodity_Name = row.getAs("commodity_Name").toStringcommodity_Name=setdata_NULL_args(commodity_Name)var image = row.getAs("image").toStringimage=setdata_NULL_args(image)var freezing = row.getAs("freezing").toStringfreezing=setdata_args(freezing)var originplace = row.getAs("originplace").toStringoriginplace=setdata_args(originplace)var netweight = row.getAs("netweight").toStringnetweight=setdata_args(netweight)var category = row.getAs("category").toStringcategory=setdata_args(category)var packages = row.getAs("package").toStringpackages=setdata_args(packages)var boiling = row.getAs("boiling").toStringboiling=setdata_args(boiling)var sales = row.getAs("sales").toStringsales=setdata_args(sales)var installation = row.getAs("installation").toStringinstallation=setdata_args(installation)var transmission = row.getAs("transmission").toStringtransmission=setdata_args(transmission)var all_Josn="{\"freezing\":"+freezing+","+"\"originplace\":"+originplace+","+"\"netweight\":"+netweight+","+"\"category\":"+category+","+"\"packages\":"+packages+","+"\"boiling\":"+boiling+","+"\"sales\":"+sales+","+"\"installation\":"+installation+","+"\"transmission\":"+transmission+"}"TRulet(id=0,govid=skuid,shortname=name,brand = brand, govurl=url , price = price, name = commodity_Name, picurl = image,description = all_Josn)}).collectAsList().asScala.toListval result_Json = ss.toDF()result_Json.show()
//    val sqlconnect = result_Json.write.mode(saveMode = SaveMode.Append).jdbc(mysqlUrl, "xxuan_car_jd_test_product", properties)}
}

总结

完美结束 谢谢大家的阅读,有什么需要帮忙的随时私信我

从Python爬虫到Spark预处理数据的真实需求[五](Spark)相关推荐

  1. 从Python爬虫到Spark预处理数据的真实需求[四]

    絮叨两句: 博主是一名软件工程系的在校生,利用博客记录自己所学的知识,也希望能帮助到正在学习的同学们 人的一生中会遇到各种各样的困难和折磨,逃避是解决不了问题的,唯有以乐观的精神去迎接生活的挑战 少年 ...

  2. 从Python爬虫到Spark预处理数据的真实需求[三]

    絮叨两句: 博主是一名软件工程系的在校生,利用博客记录自己所学的知识,也希望能帮助到正在学习的同学们 人的一生中会遇到各种各样的困难和折磨,逃避是解决不了问题的,唯有以乐观的精神去迎接生活的挑战 少年 ...

  3. 从Python爬虫到Spark预处理数据的真实需求[二]

    絮叨两句: 博主是一名软件工程系的在校生,利用博客记录自己所学的知识,也希望能帮助到正在学习的同学们 人的一生中会遇到各种各样的困难和折磨,逃避是解决不了问题的,唯有以乐观的精神去迎接生活的挑战 少年 ...

  4. python爬虫数据分析可以做什么-python爬虫爬取的数据可以做什么

    在Python中连接到多播服务器问题,怎么解决你把redirect关闭就可以了.在send时,加上参数allow_redirects=False 通常每个浏览器都会设置redirect的次数.如果re ...

  5. python中国大学排名爬虫写明详细步骤-Python爬虫--2019大学排名数据抓取

    Python爬虫--2019大学排名数据抓取 准备工作 输入:大学排名URL连接 输出:大学排名信息屏幕输出 所需要用到的库:requests,bs4 思路 获取网页信息 提取网页中的内容并放到数据结 ...

  6. 如何用python爬股票数据_python爬虫股票数据,如何用python 爬虫抓取金融数据

    Q1:如何用python 爬虫抓取金融数据 获取数据是数据分析中必不可少的一部分,而网络爬虫是是获取数据的一个重要渠道之一.鉴于此,我拾起了Python这把利器,开启了网络爬虫之路. 本篇使用的版本为 ...

  7. python爬虫(四)数据存储

    python爬虫(四)数据存储 JSON文件存储 JSON是一种轻量级的数据交换格式,它是基于ECMAScript的一个子集 JSON采用完全独立于语言的文本格式 JSON在Python中分别由lis ...

  8. 如何用python抓取文献_浅谈Python爬虫技术的网页数据抓取与分析

    浅谈 Python 爬虫技术的网页数据抓取与分析 吴永聪 [期刊名称] <计算机时代> [年 ( 卷 ), 期] 2019(000)008 [摘要] 近年来 , 随着互联网的发展 , 如何 ...

  9. python 爬虫-京东用户评论数据和用户评分

    python 爬虫-京东用户评论数据和用户评分 在京东页面查找(例如:oppo r15),选择第一个商品点击进入. 点击第一个评论页面: 点击第二个评论页面: 第三个评论页面: 发现第二页和第三页的网 ...

最新文章

  1. 深入理解JDK动态代理
  2. 爱奇艺才被做空又爆裁员,技术研发为裁员重灾区
  3. Decorator 装饰器模式 -动态组合
  4. 如何看电脑安装了python-Python教程第1篇:查看电脑是否安装python
  5. 如何实现分享网站文章到微信朋友圈时显示指定缩略图或LOGO
  6. 关于拖拽上传 [一个拖拽上传修改头像的流程]
  7. HDU - 1542 Atlantis(线段树+扫描线)
  8. jquery替换onclick的方法
  9. dubbo官方文档_狂神说SpringBoot17:Dubbo和Zookeeper集成
  10. c++ 使用正则匹配url
  11. practice:在win2008R2上使用(NLB)网络负载均衡
  12. C#实现万年历(农历、节气、节日、星座、星宿、属相、生肖、闰年月、时辰)
  13. 【每日算法Day 87】今天我脱单了,所以大家不用做题了!
  14. 如何在Mac之间进行数据迁移,macbook怎么迁移数据
  15. Python:实现random forest regressor随机森林回归器算法(附完整源码)
  16. 20190904_chip-seq/ ATAC-seq/DAP-seq 原理理解
  17. 周杰伦专辑《最伟大的作品》在哪看?当贝盒子畅享周董最新MV
  18. Discuz X3.4模板创建与配置原理简介
  19. 航空器滑行时间预测研究(一)---定义及影响因素
  20. 一款强大易用的Vue-markdown文本编辑器插件

热门文章

  1. python图像质量评价_Python实现图像质量评价准则PSNR
  2. 零基础学习HTML(4)——style标签的使用
  3. “21好习惯“第一期-17
  4. 面向对象三大特性五大原则 + 低耦合高内聚
  5. 关于 access 中如何使用 vba语言 判断表是否存在的两种方法
  6. spring初识--bean的几种注册方式
  7. Canvas-04-Konva
  8. VIM使用教程-详细
  9. 【面试】 CVTE 视源股份 C++ 软件开发 一面
  10. 深度学习框架tensorflow二实战(训练一个简单二分类模型)