spark-streaming连接消费nsq

目的

使用 NSQ作为消息流

使用 spark-streaming 进行消费

对数据进行清洗后,保存到hive仓库中

连接方案

1、编写Spark Streaming Custom Receivers(spark-streaming 自定义接收器),详细见文档

2、使用 nsq 官方提供的Java程序连接包 JavaNSQClient ,详细见文档

详细代码

自定义连接器

ReliableNSQReceiver.scala

import com.github.brainlag.nsq.callbacks.NSQMessageCallback

import com.github.brainlag.nsq.lookup.DefaultNSQLookup

import com.github.brainlag.nsq.{NSQConsumer, NSQMessage}

import org.apache.spark.internal.Logging

import org.apache.spark.storage.StorageLevel

import org.apache.spark.streaming.receiver.Receiver

class MessageCallbacks(store_fun:String => Unit) extends NSQMessageCallback with Logging {

def message(message: NSQMessage): Unit ={

val s = new String(message.getMessage())

store_fun(s)

message.finished()

}

}

/* 自定义连接器 */

class ReliableNSQReceiver(host: String, port: Int, topic: String, channel: String)

extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging {

var consumer: NSQConsumer = null

def onStart() {

// 启动通过连接接收数据的线程

new Thread("Socket Receiver") {

override def run() { receive() }

}.start()

}

def onStop() {

logInfo("Stopped receiving")

consumer.close

}

/** 接收数据 */

private def receive() {

try {

val lookup = new DefaultNSQLookup

lookup.addLookupAddress(host, port)

consumer = new NSQConsumer(lookup, topic, channel, new MessageCallbacks(store))

consumer.start

} catch {

case e: java.net.ConnectException =>

restart("Error connecting to " + host + ":" + port, e)

case t: Throwable =>

restart("Error receiving data", t)

}

}

}

使用连接器

import com.google.gson.JsonParser

import org.apache.spark.SparkConf

import org.apache.spark.internal.Logging

import org.apache.spark.sql.{DataFrame, SparkSession}

import org.apache.spark.streaming.dstream.DStream

import org.apache.spark.streaming.{Seconds, StreamingContext}

/*

* 在定义一个 context 之后,您必须执行以下操作.

* 通过创建输入 DStreams 来定义输入源.

* 通过应用转换和输出操作 DStreams 定义流计算(streaming computations).

* 开始接收输入并且使用 streamingContext.start() 来处理数据.

* 使用 streamingContext.awaitTermination() 等待处理被终止(手动或者由于任何错误).

* 使用 streamingContext.stop() 来手动的停止处理.

*/

object ELKStreaming extends Logging{

def main(args: Array[String]): Unit ={

if (args.length < 4) {

System.err.println("Usage: ELKStreaming ")

System.exit(1)

}

logInfo("start ===========>")

StreamingExamples.setStreamingLogLevels()

val sparkConf = new SparkConf().setAppName("ELKStreaming").setMaster("yarn").set("hive.metastore.uris", "thrift://hadoop15.bigdata.org:9083")

// 创建一个批次间隔为10

val ssc = new StreamingContext(sparkConf, Seconds(args(2).toInt))

// 使用自定义的NSQReceiver

val lines = ssc.receiverStream(new ReliableNSQReceiver(args(0), args(1).toInt, "log", "scalatest"))

val hiveStream: DStream[(String, String)] = lines.map(line => prefix_exit(line))

// 将计算后的数据保存到hive中

hiveStream.foreachRDD(rdd => {

// 利用SparkConf来初始化SparkSession。

val sparkSession: SparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()

// 导入隐式转换来将RDD

import sparkSession.implicits._

// 将RDD转换成DF

val df: DataFrame = rdd.toDF("str", "ymd")

// 取出表中的字段

logInfo("df count ===========>"+ df.count)

df.createOrReplaceTempView("spark_logs")

sparkSession.sql("insert into "+args(3)+" partition (ymd) select str,ymd from spark_logs")

})

ssc.start()

ssc.awaitTermination()

}

def prefix_exit(line:String):(String,String) ={

// 对数据进行清洗计算

val obj = new JsonParser().parse(line).getAsJsonObject

val data_str1 = obj.get("recv_timestamp").toString().split("T|Z|\"")

val data_str2 = data_str1(1).split('-')

val data_str3 = data_str2(1)+"/"+data_str2(2)+"/"+data_str2(0)+" "+data_str1(2)+" [I] "+obj.get("index_type").toString().split("\"")(1)+" "+line

val data_str4 = data_str2(0)+data_str2(1)+data_str2(2)

(data_str3.toString(), data_str4.toString())

}

}

java多线程调用nsq消费_spark-streaming连接消费nsq相关推荐

  1. java多线程调用一个函数_Java 多线程(一)

    1. 多线程使用方法 使用多线程,绝大部分情况都是通过如下两种方式实现的,即继承Thread类或者实现Runnable接口.以下对两种方式分别进行介绍并比较. 1.1 使用Thread类实现多线程 自 ...

  2. java读写德卡数据_Spark Streaming 读取Kafka数据写入ES

    简介: 目前项目中已有多个渠道到Kafka的数据处理,本文主要记录通过Spark Streaming 读取Kafka中的数据,写入到Elasticsearch,达到一个实时(严格来说,是近实时,刷新时 ...

  3. java 多线程 调用 dll 出错,JAVA调用DLL异常,请高手寻找异常原因,给出解决方法...

    当前位置:我的异常网» J2SE » JAVA调用DLL异常,请高手寻找异常原因,给出解决方法 JAVA调用DLL异常,请高手寻找异常原因,给出解决方法 www.myexceptions.net  网 ...

  4. druid java直接调用_Spring Boot使用Druid连接池的示例代码

    Druid是Java语言中最好的数据库连接池.Druid相比于其他的数据库连接池,有两大特性: 监控数据库,有利于分析线上数据库问题 更容易扩展,同时也很高效. 今天演示一下Spring Boot集成 ...

  5. 安装智能消费机服务器连接,消费一卡通系统/消费机布线安装方法

    [导读] 一. 消费机工程布线要求: 1. 选用120欧姆的8芯双屏蔽网线(通常采用),线路截面积0.75平方毫米.根据布线环境及长度,再选择不同的屏蔽材质及抗拉强度不同的线材. 2. 所用的两芯线必 ...

  6. 多线程调用生成主键流水号存储过程产生主键冲突问题解决方案

    遇到开发多线程测试插入数据的时候发现主键冲突问题 问题具体描述如下: -------------------------------------------------------------- 调用 ...

  7. java 多线程 压缩_Java 多线程拷贝文件夹并调用tinyPng算法接口压缩图片实现(生产消费变种)...

    线程模型 生产者Provider线程为一,主要进行深搜目录文件:. 消费者Consumer线程多个, 因为RPC服务调用时延较长, 启用多个线程请求服务. 持久化线程Persist 将已经消费的消息存 ...

  8. java多线程异步调用别的系统接口代码_抢先准备,40个 Java 多线程面试题及答案大汇总!...

    ↑↑↑点上方蓝字关注并标⭐「IT技术思维」 一起培养顶尖技术思维 来源:程序员共成长(id:finishbug) 这些多线程的问题,有些来源于各大网站.有些来源于自己的思考.可能有些问题网上有.可能有 ...

  9. Java 多线程启动为什么调用 start() 方法而不是 run() 方法?

    点击上方 好好学java ,选择 星标 公众号 重磅资讯.干货,第一时间送达 今日推荐:终于放弃了单调的swagger-ui了,选择了这款神器-knife4j个人原创100W+访问量博客:点击前往,查 ...

最新文章

  1. rar for linux缺少GLIBC_2.7
  2. SVN的使用(服务端与客户端)
  3. 地图画指定区域_零基础学CAD绘制一张桌子为例,使亲们更好地熟悉三维绘图环境...
  4. Visual Studio 2010/2013 UTF8编码调试时显示中文
  5. phpmyadmin忘记mysql密码_忘记phpmyadmin密码怎么重置
  6. Ethernity Chain将发布棒球球星Fernando Tatis Jr.系列NFT
  7. matlab中运用demod解调程序,matlab调制解调源码有代码解释原理分析
  8. 帆软报表插件开发之fine-decision中的AccessProvider扩展
  9. 携程到底有没有杀熟?
  10. python 服务端判断客户端异常断开
  11. Arrays.copyOf
  12. 【机器学习系列】MCMC第一讲:蒙特卡罗方法初认识
  13. 2021年第四届安洵杯WriteUp(转)
  14. wps如何设置分段页眉
  15. 职业等级计算机操作员,计算机操作员是什么职业?
  16. 【组合数学】多项式定理 ( 多项式系数 | 多重集全排列 | 对应放球子模型方案数 | 多项式系数相关恒等式 )
  17. MongoDB的分片集群
  18. 如何修改微信公众号内部网页的头部标签内容
  19. fastdfs连接mysql_使用fastdfs-zyc监控FastDFS文件系统
  20. 传奇玩家申请怪物攻城脚本

热门文章

  1. mysql for update 锁_MySql FOR UPDATE 锁的一点问题……
  2. mysql 水平拆分实例_2021先定个小目标?搞清楚MyCat分片的两种拆分方法和分片规则!(二):水平拆分实例解析和代码实现!...
  3. 石油勘探是属于计算机应用中的,计算机在石油勘探开发中的应用论文
  4. linux堡垒机开源软件,Jumpserver开源堡垒机
  5. servlet与MySQL商品管理系统_servlet和mysql实现宿舍管理系统
  6. 【PIC18单片机学习笔记】一、程序的烧录
  7. Android 代码实现查看SQLite数据库中的表
  8. [win7] 去除将窗口拖到屏幕边缘时“自动最大化”
  9. DataGridView添加一行数据、全选、取消全选、清空数据、删除选中行
  10. 前端学习(3286):Aop