1、意外退出spark-shell,而不是quit,然后再输入spark-shell命令的时候,报错:

19/04/11 13:42:32 WARN util.Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.

jps查看,SparkSubmit在,查看4040端口也是被SparkSubmit占用中。
解决方法:https://blog.csdn.net/wawa8899/article/details/81016029

spark-shell启动时会启动一个Spark 的Web UI。由于刚刚启动spark-shell的时候并没有指定appName,所以Web UI右上角显示Spark shell application UI(源码$SPARK_HOME/bin/spark-shell里面定义)。如果指定了AppName,则这里显示AppName。
Spark Web UI默认使用端口号4040,如果4040被占用,它会自动+1,即使用4041;若4041也被占用,依此类推

2、
for (elem <- rdd.collect()) {println(elem.getClass.getSimpleName);print(elem);println("---------")}
查询出来elem数据类型为String(其中rdd为MapPartitionsRDD)

scala> val list01=List(1,2,3,4)
list01: List[Int] = List(1, 2, 3, 4)
scala> val rdd1 = sc.parallelize(list01)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[7] at parallelize at <console>:26
scala> rdd1.collect()
res2: Array[Int] = Array(1, 2, 3, 4)

如果不是放的list或者array,就会报错:

scala> sc.parallelize(set01).collect()
<console>:27: error: type mismatch;
found : scala.collection.immutable.Set[Int]
required: Seq[?]
sc.parallelize(set01).collect()

3、在创建sparksession实例的时候:
val spark: SparkSession = SparkSession.builder().appName(“node01”).master(“master”).enableHiveSupport().getOrCreate()
报错:

Exception in thread "main" java.lang.IllegalArgumentException: Unable to instantiate SparkSession with Hive support because Hive classes are not found.

去掉enableHiveSupport()之后报错:

Exception in thread "main" org.apache.spark.SparkException: Could not parse Master URL: 'node01'

将master参数改为local之后,报错:

Exception in thread "main" org.apache.spark.SparkException: Only one SparkContext may be running in this JVM (see SPARK-2243). To ignore this error, set spark.driver.allowMultipleContexts = true. The currently running SparkContext was created at:
org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:901)
com.jenny.spark.SparkStreamingKafkaReceiver$.main(SparkStreamingKafkaReceiver.scala:17)
com.jenny.spark.SparkStreamingKafkaReceiver.main(SparkStreamingKafkaReceiver.scala)at org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$2.apply(SparkContext.scala:2472)at org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$2.apply(SparkContext.scala:2468)at scala.Option.foreach(Option.scala:257)at org.apache.spark.SparkContext$.assertNoOtherContextIsRunning(SparkContext.scala:2468)at org.apache.spark.SparkContext$.markPartiallyConstructed(SparkContext.scala:2557)at org.apache.spark.SparkContext.<init>(SparkContext.scala:85)at com.jenny.spark.SparkStreamingKafkaReceiver$.main(SparkStreamingKafkaReceiver.scala:24)at com.jenny.spark.SparkStreamingKafkaReceiver.main(SparkStreamingKafkaReceiver.scala)
19/04/12 15:10:31 INFO spark.SparkContext: Invoking stop() from shutdown hook
19/04/12 15:10:31 INFO server.AbstractConnector: Stopped Spark@6ca0256d{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
19/04/12 15:10:31 INFO ui.SparkUI: Stopped Spark web UI at http://172.18.94.121:4040
19/04/12 15:10:31 INFO spark.MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
19/04/12 15:10:31 INFO memory.MemoryStore: MemoryStore cleared
19/04/12 15:10:31 INFO storage.BlockManager: BlockManager stopped
19/04/12 15:10:31 INFO storage.BlockManagerMaster: BlockManagerMaster stopped
19/04/12 15:10:31 INFO scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
19/04/12 15:10:31 INFO spark.SparkContext: Successfully stopped SparkContext
19/04/12 15:10:31 INFO util.ShutdownHookManager: Shutdown hook called
19/04/12 15:10:31 INFO util.ShutdownHookManager: Deleting directory C:\Users\Administrator\AppData\Local\Temp\spark-fcc174bd-0722-4b43-9852-522997a84140Process finished with exit code 1

3、
因为写spark的时候用的0.8版本kafka的依赖,但是cdh集群是1.0kafka,所以在集群上运行jar包的时候报错:

spark Exception in thread "main" java.lang.NoSuchMethodError: kafka.api.TopicMetadata.errorCode

后面改成1.0版本kafka的依赖就可以了
使用0.8版本kafka依赖和使用1.0版本kafka的区别:
pom依赖:

<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.11</artifactId><version>2.0.0</version></dependency><!--<dependency>--><!--<groupId>org.apache.spark</groupId>--><!--<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>--><!--<version>${spark.version}</version>--><!--</dependency>-->

package com.jenny.spark
import cn.just.spark.domain.Test02
import kafka.serializer.StringDecoder
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, ConsumerStrategy, LocationStrategies, LocationStrategy}
//import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkContext}
object SparkStreamingKafkaDirect {
def main(args: Array[String]): Unit = {
// 用的下面的属性
val spark: SparkSession = SparkSession.builder().appName(“SparkStreamingKafkaDirect”).master(“local[2]”).getOrCreate()
val sc: SparkContext = spark.sparkContext
sc.setLogLevel(“WARN”)
//设置schema结构
val schema = StructType(
Seq(
StructField(“col1”,StringType,true)
,StructField(“col2”,StringType,true)
,StructField(“col3”,StringType,true)
,StructField(“col4”,StringType,true)
,StructField(“update_time”,StringType,true)
)
)
//1、创建streamingcontext
val ssc: StreamingContext = new StreamingContext(sc,Seconds(10))
//2、准备kafka参数
// val kafkaparams = Map(“metadata.broker.list”->“node01:9092”,“group.id”->“spark_direct”) //0.8版本kafka
//1.0版本kafka
val kafkaparams = Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG ->“node01:9092”,
ConsumerConfig.GROUP_ID_CONFIG -> “spark_direct”,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer])
val topics=Set(“from_flume01”)

//3、获取kafka中的数据
// val dstream: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaparams,topics)//0.8版本kafka
val dstream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String,String](topics,kafkaparams))
// val data: DStream[String] = dstream.map(_.2) //0.8版本kafka
val data: DStream[String] = dstream.map(
.value)

data.foreachRDD(rdd=>
{
val rrdd: RDD[Row] = rdd.map(x=>x.split(",")).map(x=>Row(x(0), x(1), x(2), x(3), x(4)))
val tRdd: RDD[Test02] = rdd.map(x=>x.split(",")).map(x=>Test02(x(0),x(1),x(2),x(3),x(4)))
import spark.implicits._
val tDF: DataFrame = tRdd.toDF()
//将数据写到hive里面
// tDF.write.format(“textfile”).mode(saveMode = “Append”).insertInto(“test.test01”)
println("++++++++++++++++")
tDF.show
}
)
println("--------------")
//4、开启流式计算
ssc.start()
ssc.awaitTermination()
}
}

4、spark在代码中设置master为standalone模式:

val spark: SparkSession = SparkSession.builder().appName(“SparkStreamingKafkaDirect”).master(“spark://ip地址:7337”).getOrCreate()

报错:

19/04/26 16:09:10 INFO StandaloneAppClient$ClientEndpoint: Connecting to master spark://ip地址:7337...
19/04/26 16:09:10 WARN StandaloneAppClient$ClientEndpoint: Failed to connect to master ip地址:7337
org.apache.spark.SparkException: Exception thrown in awaitResult: at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:100)at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:108)at org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint$$anonfun$tryRegisterAllMasters$1$$anon$1.run(StandaloneAppClient.scala:106)at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)at java.util.concurrent.FutureTask.run(FutureTask.java:266)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: java.lang.IndexOutOfBoundsException: readerIndex(5) + length(864567) exceeds writerIndex(178): UnpooledUnsafeDirectByteBuf(ridx: 5, widx: 178, cap: 178/178)at org.spark_project.io.netty.buffer.AbstractByteBuf.checkReadableBytes0(AbstractByteBuf.java:1401)at org.spark_project.io.netty.buffer.AbstractByteBuf.checkReadableBytes(AbstractByteBuf.java:1388)at org.spark_project.io.netty.buffer.AbstractByteBuf.readBytes(AbstractByteBuf.java:870)at org.spark_project.io.netty.buffer.AbstractByteBuf.readBytes(AbstractByteBuf.java:878)at org.apache.spark.network.protocol.Encoders$Strings.decode(Encoders.java:42)at org.apache.spark.network.shuffle.protocol.UploadBlock.decode(UploadBlock.java:110)at org.apache.spark.network.shuffle.protocol.BlockTransferMessage$Decoder.fromByteBuffer(BlockTransferMessage.java:65)at org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.receive(ExternalShuffleBlockHandler.java:81)at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:157)at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:105)at org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:118)at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)at org.spark_project.io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
……19/04/26 16:09:31 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
19/04/26 16:09:32 ERROR SparkContext: Error initializing SparkContext.
java.lang.IllegalArgumentException: requirement failed: Can only call getServletHandlers on a running MetricsSystemat scala.Predef$.require(Predef.scala:224)at org.apache.spark.metrics.MetricsSystem.getServletHandlers(MetricsSystem.scala:91)at org.apache.spark.SparkContext.<init>(SparkContext.scala:524)at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2509)at org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:909)at org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:901)at scala.Option.getOrElse(Option.scala:121)at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:901)at com.jenny.spark.SparkStreamingKafkaDirect$.main(SparkStreamingKafkaDirect.scala:28)at com.jenny.spark.SparkStreamingKafkaDirect.main(SparkStreamingKafkaDirect.scala)
19/04/26 16:09:32 INFO SparkContext: SparkContext already stopped.
Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: Can only call getServletHandlers on a running MetricsSystemat scala.Predef$.require(Predef.scala:224)at org.apache.spark.metrics.MetricsSystem.getServletHandlers(MetricsSystem.scala:91)at org.apache.spark.SparkContext.<init>(SparkContext.scala:524)at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2509)at org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:909)at org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:901)at scala.Option.getOrElse(Option.scala:121)at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:901)at com.jenny.spark.SparkStreamingKafkaDirect$.main(SparkStreamingKafkaDirect.scala:28)at com.jenny.spark.SparkStreamingKafkaDirect.main(SparkStreamingKafkaDirect.scala)
19/04/26 16:09:32 INFO SparkContext: Successfully stopped SparkContext
19/04/26 16:09:32 INFO ShutdownHookManager: Shutdown hook called
19/04/26 16:09:32 INFO ShutdownHookManager: Deleting directory C:\Users\Administrator\AppData\Local\Temp\spark-d6b03962-4d1a-4bc9-b93b-5a0550f84240

(1)但是改为local模式之后,就没报错了。
(2)如果代码中不设置master模式,在spark-submit提交的时候,设置为
spark-submit
–class com.jenny.spark.SparkStreamingKafkaDirect
–master spark://node01:7077
–executor-memory 1g
–total-executor-cores 2
报错:

19/04/26 17:07:12 INFO client.StandaloneAppClient$ClientEndpoint: Connecting to master spark://node01:7077...
19/04/26 17:07:12 WARN client.StandaloneAppClient$ClientEndpoint: Failed to connect to master node01:7077
org.apache.spark.SparkException: Exception thrown in awaitResult: at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:100)at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:108)at org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint$$anonfun$tryRegisterAllMasters$1$$anon$1.run(StandaloneAppClient.scala:106)at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)at java.util.concurrent.FutureTask.run(FutureTask.java:266)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Failed to connect to node01/ip地址:7077at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:232)at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:182)at org.apache.spark.rpc.netty.NettyRpcEnv.createClient(NettyRpcEnv.scala:197)at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:194)at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:190)... 4 more
Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: node01/ip地址:7077at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:323)at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:340)at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:633)at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)... 1 more
Caused by: java.net.ConnectException: Connection refused

结论:在代码里设置master为local,在spark-submit提交的时候,master为standalone模式,就ok了

参考https://blog.csdn.net/huonan_123/article/details/84282843,几种模式提交的时候设置master不同而已,但是spark代码都是写的local

5、用yarn-cluster模式提交,一段时间之后,spark没有接收到数据,就报了这个错:

19/04/26 18:39:10 WARN conf.HiveConf: HiveConf of name hive.strict.checks.cartesian.product does not exist
19/04/26 18:40:30 ERROR yarn.ApplicationMaster: Uncaught exception:
java.util.concurrent.TimeoutException: Futures timed out after [100000 milliseconds]at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:201)at org.apache.spark.deploy.yarn.ApplicationMaster.runDriver(ApplicationMaster.scala:454)at org.apache.spark.deploy.yarn.ApplicationMaster.org$apache$spark$deploy$yarn$ApplicationMaster$$runImpl(ApplicationMaster.scala:296)at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$2.apply$mcV$sp(ApplicationMaster.scala:223)at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$2.apply(ApplicationMaster.scala:223)at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$2.apply(ApplicationMaster.scala:223)at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$4.run(ApplicationMaster.scala:802)at java.security.AccessController.doPrivileged(Native Method)at javax.security.auth.Subject.doAs(Subject.java:422)at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1726)at org.apache.spark.deploy.yarn.ApplicationMaster.doAsUser(ApplicationMaster.scala:801)at org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:222)at org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:835)at org.apache.spark.deploy.yarn.ApplicationMaster.main(ApplicationMaster.scala)For more detailed output, check the application tracking page: http://node01:8088/cluster/app/application_1556244595224_0003 Then click on links to logs of each attempt.
. Failing the application.ApplicationMaster host: N/AApplicationMaster RPC port: -1queue: root.users.rootstart time: 1556275020864final status: FAILEDtracking URL: http://node01:8088/cluster/app/application_1556244595224_0003user: root
Exception in thread "main" org.apache.spark.SparkException: Application application_1556244595224_0003 finished with failed statusat org.apache.spark.deploy.yarn.Client.run(Client.scala:1127)at org.apache.spark.deploy.yarn.Client$.main(Client.scala:1173)at org.apache.spark.deploy.yarn.Client.main(Client.scala)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:498)at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:775)at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
19/04/26 18:40:32 INFO util.ShutdownHookManager: Shutdown hook called
19/04/26 18:40:32 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-c628fa05-a49f-41b2-98bb-522749e1ffa8

6、用spark连接hbase的时候报错:

19/05/28 16:41:42 WARN zookeeper.ClientCnxn: Session 0x0 for server null, unexpected error, closing socket connection and attempting reconnect
java.net.ConnectException: Connection timed out: no further informationat sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:350)at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081)
19/05/28 16:41:43 INFO zookeeper.ClientCnxn: Opening socket connection to server iP地址/iP地址:2181. Will not attempt to authenticate using SASL (unknown error)

pom文件为:

<?xml version="1.0" encoding="UTF-8"?>

4.0.0

<groupId>com.jenny.spark.hbase</groupId>
<artifactId>connect_hbase03</artifactId>
<version>1.0-SNAPSHOT</version><properties><scala.version>2.11.8</scala.version><kafka.version>1.0.0</kafka.version><spark.version>2.2.0</spark.version><hadoop.version>2.6.0-cdh5.9.0</hadoop.version><hbase.version>1.2.0-cdh5.9.0</hbase.version></properties>
<!--添加cloudera的repository-->
<repositories>
<repository><id>cloudera</id><url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
</repository>
</repositories>
<dependencies><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency><!-- Kafka 依赖--><!--<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.11</artifactId><version>${kafka.version}</version></dependency>--><!-- Hadoop 依赖--><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>${hadoop.version}</version></dependency><!-- HBase 依赖--><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>${hbase.version}</version></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-server</artifactId><version>${hbase.version}</version></dependency><!-- Spark Streaming 依赖--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.11</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.11</artifactId><version>2.0.0</version></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId><version>3.5</version></dependency><!-- Spark SQL 依赖--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId><version>${spark.version}</version></dependency><dependency><groupId>net.jpountz.lz4</groupId><artifactId>lz4</artifactId><version>1.3.0</version></dependency><dependency><groupId>com.fasterxml.jackson.module</groupId><artifactId>jackson-module-scala_2.11</artifactId><version>2.6.5</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>2.4.1</version><configuration><!-- get all project dependencies --><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs><!-- MainClass in mainfest make a executable jar --><archive><manifest><mainClass>com.jenny.spark.hbase.TestHBase02</mainClass></manifest></archive></configuration><executions><execution><id>make-assembly</id><!-- bind to the packaging phase --><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin><plugin><!-- see http://davidb.github.com/scala-maven-plugin --><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.2.2</version><executions><execution><goals><goal>compile</goal><goal>testCompile</goal></goals><configuration><args><!--<arg>-make:transitive</arg>--><arg>-dependencyfile</arg><arg>${project.build.directory}/.scala_dependencies</arg></args></configuration></execution></executions></plugin></plugins>
</build>

6、sparksql 连接mysql数据库
pom.xml:

<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.38</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId><version>2.2.0</version></dependency>

代码:

object MysqlUtils01 {def main(args: Array[String]): Unit = {val jdbcURL = "…………"val userName = "……"val passWord = "……"val prop = new Properties()prop.put("user", userName)prop.put("password", passWord)val conf = new SparkConf().setMaster("local[*]").setAppName("from-to-mysql")val sparkContext = new SparkContext(conf)val sqlContext = new SQLContext(sparkContext)val sql_df = sqlContext.read.jdbc(jdbcURL, "dim_user_dep2", prop)sql_df.show}
}

报错:

java.lang.RuntimeException: Multiple sources found for jdbc
(org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider,
org.apache.spark.sql.execution.datasources.jdbc.DefaultSource), please
specify the fully qualified class name.

将获取DF的过程修改为以下代码就能正常执行了:

    val sql_df = sqlContext.read.format("org.apache.spark.sql.execution.datasources.jdbc.DefaultSource") .options(Map("url" -> jdbcURL, "driver"-> "com.mysql.jdbc.Driver", "dbtable" -> "dim_user_dep2", "user" -> "sx_etl","password" -> "dsuukXTMvf")).load()sql_df.show

7、spark连redis的时候报错1):ConnectException: Connection refused: connect
解决办法:参考https://www.cnblogs.com/shamo89/p/6681256.html
Redis.conf配置文件中 注释掉 bind 127.0.0.1
防火墙关闭(或添加可访问的端口,具体不在此描述)

报错2):JedisDataException: DENIED Redis is running in protected mode because protected mode is enabled, no bind address was specified, no authentication password is requested to clients.

解决方法:
修改配置文件redis.conf。将NETWORK下的protected-mode yes修改
为protected-mode no,然后重启服务

8、spark提交模式为yarn,报错:
System memory 452460544 must be at least 471859200
最后重新设置executor-memory,从450M加到512M,问题解决。
spark-submit
–class com.jenny.spark.sx.Analysis
–master yarn
–deploy-mode cluster
–executor-memory 512M
–driver-cores 3
bigdata_sx03_master-1.0-SNAPSHOT-jar-with-dependencies.jar

9、 sparkstreaming里面给StreamingContext设置了checkpoint之后报错:
ssc.checkpoint(“F:\IdeaProjects\spark\sx\bigdata_sx03_master\src\main\resources\checkpoint”)

Exception in thread "pool-18-thread-2" java.lang.NullPointerExceptionat java.lang.ProcessBuilder.start(ProcessBuilder.java:1012)at org.apache.hadoop.util.Shell.runCommand(Shell.java:505)at org.apache.hadoop.util.Shell.run(Shell.java:478)at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:738)at org.apache.hadoop.util.Shell.execCommand(Shell.java:831)at org.apache.hadoop.util.Shell.execCommand(Shell.java:814)at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:664)at org.apache.hadoop.fs.FilterFileSystem.setPermission(FilterFileSystem.java:506)at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:462)at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:428)at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:920)at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:901)at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:798)at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:787)at org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:237)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)at java.lang.Thread.run(Thread.java:748)

解决方法:路径写成hdfs的目录,然后程序不放在windows里面的idea执行,而放到cdh集群里面执行,就正常了。

10、spark代码,告警:

Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.

解决办法:
在读入数据字段量较大的表时,会提示此错误,在windows平台配置文件上可以直接修改,或者在程序内部直接修改

如代码中所示,.conf(config(‘spark.debug.maxToStringFields’, ‘100’))来修改配置

SparkSession.builder().master("local[4]").appName("report").config("spark.debug.maxToStringFields", "100").getOrCreate();

11、sparkstreaming集成kafka,结果放入redis,但是跑着跑着,5分钟之后,sparkstreaming就阻塞了,

 dstream.foreachRDD(rdd=>if (!rdd.isEmpty()){// 获取偏移量offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRangesfor (offsetInfo <- offsetRanges){println("fromOffset:" + offsetInfo.fromOffset)println("untilOffset:" + offsetInfo.untilOffset)println("topic:" + offsetInfo.topic)println("partition:" + offsetInfo.partition)}// 处理数据val map_rdd = rdd.map(_.value())val today = new SimpleDateFormat("yyyy-MM-dd").format(new Date)//1、取最新的维表数据ETL_Dm.dim_user_dep2val dimensionPeriodDF = DimensionsLoader.getDimension(sparkSession)dimensionPeriodDF.createOrReplaceTempView("dimensionPeriodDF")//2、分析订单表子订单表,将user_id对应的付款总金额及其他信息放入redisDF2Redis.orders2Redis(sparkSession,map_rdd,today)// 提交offsetdstream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)})

redis连接是用的Jedis连接池取得:pool.getResource

网上找到遇到这种情况的以下解决方法:

1)、对于 Direct Approach 的数据接收,我们可以通过配置 spark.streaming.kafka.maxRatePerPartition参数来限制每次作业中每个 Kafka 分区最多读取的记录条数。——我改了之后,没用

2)、设置sparkcontext参数的时候,用local[*],而不是local或者local[1],因为一个线程拿来跑driver了,就没多的线程跑数据了——我改了之后,没用

3)、第一,减小spark.streaming.kafka.consumer.poll.ms参数到3000ms以内,即3秒超时就重试,第二,将spark.task.maxFailures改为10,默认值是4,加大重试次数,修改完这两个参数后基本上解决了这个问题,多数批次在阻塞重连后都能很快读到消息并运行成功。——我改了之后,没用

4)、offset自己提交出错。——我改了之后,没用
改回kafka的offset自动提交,注释掉手动提交,但是保留获取offset的语句:dstream里面没有执行卡住的时候,没有内容,但是实际上还是自动提交了offset,消费offset随着Kafka在走。CURRENT-OFFSET在实时更新。
改回手动提交,注释掉offsetRanges初始化为空的语句,在dstream里面获取offset数据。结果数据在跑,sparkstreaming有消费,但是消费的offset,CURRENT-OFFSET显示滞后,没有消费。sparkstreaming阻塞之后,程序也没消费了,dstream里面也没有获取到offset值了。

5)、修改sparkstreaming批间隔时间。——我改了之后,没用

6)、修改redis最大连接数,从20修改为100之后,卡住的时间点从5分钟之后变成了21分钟之后。
确定是redis连接的问题。最后发现是jedis连接在用完之后没有关闭,用完之后,加上jedis.close()之后,搞定,没有阻塞了。

12、在sparkstreaming停止的过程中,Kafka中积压了很多数据,重启sparkstreaming的时候,报错:

sparkstreaming kafka Failed to get records for after polling for 512

解决方法:


new SparkConf().setAppName("XX").setMaster("local").set("spark.streaming.kafka.consumer.poll.ms", "30000");

参考:https://blog.csdn.net/lmb09122508/article/details/80522252

13、spark把dataframe结果写入hdfs
dimensionPeriodDF.write.mode(“append”).orc(“hdfs://……”)
如果是orc的必须要hive支持,不然会报错:

Exception in thread "main" org.apache.spark.sql.AnalysisException: The ORC data source must be used with Hive support enabled;```14、spark 写:// 获取偏移量(由kafka保管offset)offsetRanges = rdd.repartition(1).asInstanceOf[HasOffsetRanges].offsetRanges
报错:

java.lang.ClassCastException: org.apache.spark.rdd.MapPartitionsRDD cannot be cast to org.apache.spark.streaming.kafka010.HasOffsetRanges


六(1)、spark遇到的问题相关推荐

  1. Hadoop系列 (六):Spark搭建

    文章目录 Hadoop系列文章 Spark简介 Spark搭建 Scala安装 Spark安装 Spark启动 Spark界面 Spark简单使用 Spark Shell PySpark Shell ...

  2. Spark学习之路 (六)Spark Transformation和Action

    Transformation算子 基本的初始化 java static SparkConf conf = null;static JavaSparkContext sc = null;static { ...

  3. 客快物流大数据项目(四十六):Spark操作Kudu dataFrame操作kudu

    Spark操作Kudu dataFrame操作kudu 一.DataFrameApi读取kudu表中的数据 虽然我们可以通过上面显示的KuduContext执行大量操作,但我们还可以直接从默认数据源本 ...

  4. Spark基础:(六)Spark SQL

    1.相关介绍 Datasets:一个 Dataset 是一个分布式的数据集合 Dataset 是在 Spark 1.6 中被添加的新接口, 它提供了 RDD 的优点(强类型化, 能够使用强大的 lam ...

  5. 学习笔记Spark(六)—— Spark SQL应用(1)—— Spark SQL简介、环境配置

    一.Spark SQL简介 1.1.Spark SQL特性 Spark SQL是Spark Core之上的一个组件,它引入了一个称为SchemaRDD的新- 数据抽象,它为结构化和半结构化数据提供支持 ...

  6. Spark入门(六)Spark SQL shell启动方式(元数据存储在mysql)

    一.hive配置文件 在spak/conf目录添加hive-site.xml配置,设置mysql作为元数据存储的数据库 <?xml version="1.0" encodin ...

  7. Spark 学习(六) Spark 的线程安全和序列化问题

    一,必备知识 1.1 经典14问 1.2 问题前提 二,序列化问题 2.1 Spark序列化出现情况 2.2 Spark序列化问题解决 三,线程安全问题 3.1 Spark线程安全出现情况 3.2 S ...

  8. sparkstreaming监听hdfs目录如何终止_四十六、Spark Streaming简介及入门

    1.什么是Spark Streaming Spark Streaming是基于Spark Core之间的实时计算框架,可以从很多数据源消费数据并对数据进行处理.它是Spark核心API的一个扩展与封装 ...

  9. 大数据篇(六) Spark Stream简介

    是什么 Spark Streaming 用于流式数据的处理.Spark Streaming 支持的数据输入源很多,例如:Kafka. Flume.Twitter.ZeroMQ 和简单的 TCP 套接字 ...

  10. Spark SQL与外部数据源的操作(Spark SQL ——> CSV/JSON/Parquet/hive/mysql)

    目录 一.Spark SQL支持的外部数据源 二.Spark SQL -> CSV 2.1 读CSV文件 a.有列名 b.无列名 2.2 写CSV文件 三.Spark SQL -> JSO ...

最新文章

  1. 找新朋友(欧拉函数)
  2. 转我们经理的一篇文章,业务流程实现的讨论,希望大家集思广议。
  3. 客户端实时获取Oracle数据库服务器端的系统时间
  4. 正则验证多个邮箱用分号隔开
  5. 一步步编写操作系统 2 部署工作环境 2
  6. 学python多大年龄可以学车_2020想学车的注意,关于学车的年龄问题,你都了解吗?...
  7. 陈天奇的tvm更新了:VTA,开源AI芯片栈
  8. c++进阶(十八)stack容器和queue容器
  9. Ubuntu 14.04 Ruby 2.3.3 安装
  10. jsp调用servlet方法_Servlet的运行原理
  11. Swift - whose view is not in the window hierarchy 问题解决方法
  12. JAVA构造方法与构造方法的执行原理简单理解(栈与堆)
  13. Hibernate框架
  14. MYSQL安装遇到MySQL-server conflicts with错误(mysql5.6.17)
  15. 小众即时通信工具专项整治启动,关停“比邻”“聊聊”“密语”等9款违法App...
  16. 2016新年读书计划
  17. BP神经网络——基于近红外光谱的汽油辛烷值预测
  18. C/C++音频算法: noise suppression算法及技术资料汇总
  19. 北森测评登录_福利 | 惠卫“吉讯”大学生职业测评与规划系统上线了!
  20. 量子计算机相比,陈根:概率计算机相比量子计算机,谁胜一筹?

热门文章

  1. android crash存储位置,在Android手机上保存Crash Log
  2. 深圳软件测试培训:刚入行的软件测试工程师如何自学软件测试?
  3. 压缩算法——谷歌Webp
  4. 关于pycharm中运行代码报错的解决思路
  5. IBMMQ详解(一)
  6. 【设置一个类似手机的锁屏界面但又不知道如何操作——下载加速吧】
  7. 火狐浏览器webdriver下载
  8. IT人不可不听的10个职场故事 (摘自IT鲤鱼网)
  9. 多尺度量子谐振子优化算法(MQHOA)-matlab实现demo
  10. 如何在Python编码面试中脱颖而出