scala模板写入es_Spark——scala 实用小方法
这一阵刚刚接触scala,主要也是用在spark上~完全小白一个,看着Scala感觉与python很像,想着可能比较容易上手,结果……真是需要处理一个就得查一个啊,用python或Java很容易写出来的代码,用scala得查半天,晕死……为了方便记忆,现在将这一阵用到的Scala记录一下,也给同样是小白的你一些参考,大神请自行飘过~~~
很多人都说scala要比Java方便的多,可能我熟悉之后也会这么认为吧,但是目前真是……突然想到了二哈伸舌头歪头的图片,哈哈~
1. 时间戳、字符串互转
A. 时间戳转时间字符串
import java.text.SimpleDateFormat
def tranTimeToString(tm:Long) :String={val fm = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")val tim = fm.format(new Date(tm))tim // 返回值 tim}
=>println(tranTimeToString(1502036122000L))
=>2017-08-07 00:15:22
B. 字符串转时间戳,时间字符串格式:yyyy-MM-dd HH:mm:ss
import java.util.Locale
import java.text.SimpleDateFormat
def strToTranTime(tm: String): Long = {val loc = new Locale("en")val fm = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss",loc)val dt2 = fm.parse(tm) // 转成Date类型dt2.getTime()}
=>println(strToTranTime("2019-12-18 15:30:12"))
=>1576654212000
C. 获取传入时间戳的天数差
import java.util.Calendar
def caculate2Days(t1: Long, t2: Long): Int = {val calendar = Calendar.getInstancecalendar.setTimeInMillis(t2)val t2Day = calendar.get(Calendar.DAY_OF_YEAR)calendar.setTimeInMillis(t1)val t1Day = calendar.get(Calendar.DAY_OF_YEAR)var days = t2Day - t1Dayif (days < 0) {caculate2Days(t2, t1)}days}
=>println(caculate2Days(1576664164000l, 1576684800000l))
=>1 (相差1天)
2. 队列Queue
import scala.collection.mutable.Queue
A. 队列初始化N个0
def queueInit(N: Int): Queue[Int] = {var list = List.fill(N)(0) // 先初始化N个0的listvar queue = Queue[Int]() // 定义队列queue ++= list // 将list转成队列queue}
=>println(queueInit(5))
=>Queue(0, 0, 0, 0, 0)
B. 队列增加元素,保持队列总长度不变
def queueAdd(queue: Queue[Int], value: Int): Queue[Int] ={queue.dequeue() // 删掉第一个元素queue += value // 增加一个元素queue}
=>var q = queueInit(5)
=>println(q)
=>Queue(0, 0, 0, 0, 0)
=>q = queueAdd(q, 1)
=>println(q)
=>Queue(0, 0, 0, 0, 1)
C. 队列求和
def queueSum(list: Queue[Int]): Int = {var sum = 0for(i <- list){sum += i}sum}
=> println(queueSum(q))
=>1
D. 队列转字符串
def queueToString(list: Queue[Int]): String = {val b = list.mkString(",")b}
=>println(queueToString(q))
=>0,0,0,0,1
3. int列表、Byte列表互转
A. int列表转Byte列表
注:转成Array[Byte],主要是因为写入hbase的时候只能Array[Byte]类型写入
def listToBytes(list: List[Int]): Array[Byte] = {val b = list.mkString(",") // 转成字符串val c = b.getBytes() // 转Array[Byte]c}
=>val a= List(0,0,1)
=>val c = listToBytes(a)
=>println(c)
=>[B@22a67b4
B. Byte列表转int列表
def bytesToList(c: Array[Byte]): List[Int] = {var e : List[Int] = List() // 初始化listif(c != null && !c.isEmpty){val d = Bytes.toString(c) // Array[Byte]转成字符串e = d.split(",").map(_.toInt).toList // 转成int列表}e}
=>println(bytesToList(c))
=>List(0, 0, 1)
4. 字符串的md5值
def hashMD5(content: String): String = {val md5 = MessageDigest.getInstance("MD5")val encoded = md5.digest((content).getBytes)encoded.map("%02x".format(_)).mkString}
=>println(hashMD5("9001244008859"))
=>fa5b6a9ab0a0a6aae6e0151a88596b4d
5. 数据写入ES
注:一定要先配置ES索引
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Duration, StreamingContext}
def dataToEs():Unit = {val conf = new SparkConf().setAppName("dataToEs").setMaster("local")
// conf.set("es.index.auto.create", "true") // 自动建_idconf.set("cluster.name", "es")conf.set("es.nodes", "xxx") // hostconf.set("es.port", "xxx") // portconf.set("es.net.http.auth.user", "xxx") // userconf.set("es.net.http.auth.pass", "xxx") // passwordconf.set("es.index.read.missing.as.empty","true")conf.set("es.nodes.wan.only","true")conf.set("es.write.operation","upsert") // 有->更新,没有->添加val sc = new SparkContext(conf)val ssc = new StreamingContext(sc, new Duration(10 * 1000))val appId = "2"val orgId = "3"val subscribeType = "4"val numbers = Map("appId" -> appId, "orgId" -> orgId, "subscribeType" -> subscribeType)ssc.sparkContext.makeRDD( // 或 sc.makeRDD(Seq(numbers) // 多个时用逗号,Seq(numbers, airports)).saveToEs("$index/$type", Map("es.mapping.id" -> "orgId")) // 填入ES的index和type}
saveToEs的这种方式在本机可以调用通,但是上到集群,又有分区什么的时候,就不一定好用了,我就是遇到了这种问题,最后无奈使用Java代码完成的导入ES。
6. 字符串分隔成list
def test(): Unit = {var page = "aa/submitinfo1033/bb/cc" var elems = List[String]()var idsList = page.split("/") // 分隔var id = ""for(j <- idsList.indices) { // for遍历breakable{if(idsList(j).equals("bb")) break else{ // 跳过“bb”id += idsList(j) + "="}}}println(id)
}
=>test()
=>aa=submitinfo1033=cc=
scala中没有continue关键字,所以利用break实现continue的功能。
用scala的时候还遇到了很多问题,目前也还在遇到 ~以后再慢慢整理吧~先整理这些,希望对你有些帮助~~~
有问题或建议请留意哦~
scala模板写入es_Spark——scala 实用小方法相关推荐
- string转int的方法_Spark——scala 实用小方法
这一阵刚刚接触scala,主要也是用在spark上~完全小白一个,看着Scala感觉与python很像,想着可能比较容易上手,结果--真是需要处理一个就得查一个啊,用python或Java很容易写出来 ...
- 怎么把图片转成word文档?会议实用小方法分享
当您需要在Word文档中插入图片时,有时候您可能会遇到需要将图片转换为Word文档的情况.下面是一些小技巧,可以帮助您快速将图片转换为Word文档. 使用截图工具 如果您需要将电脑屏幕上的图片转换为W ...
- python实用小方法
一.创建随机字母 import random# 创建随机字母 def make_code(n):res = ''for i in range(n):num = str(random.randint(1 ...
- u盘坏了数据可以恢复吗?实用小方法
u盘作为一个电子设备,经常出现在我们的生活与工作中.小巧便于携带.适合随身携带.我们可以把它放到口袋里,钥匙链上等都可以只要放得下就行.但随着使用的频繁也容易出现问题,比如u盘中毒.u盘损坏.u盘格式 ...
- 微信帐号检测的实用小方法
微信账号检测在不懂的人看来根本不懂是什么意思,甚至可能会联想到检测自己的微信账号,可是微信账号检测真的是这个意思吗?在微商和推广人群看来,微信账号检测就意味着用户源,微信账号检测是一种以手机号.QQ号 ...
- Linux 实用小脚本7(各种方法添加用户)
Linux 实用小脚本7(各种方法添加批量用户,用户存在就提示,不存在就添加) 前言: shell脚本的主要作用就是提升运维效率,用户对运维工作来说,通常是环境 ...
- 通过opencv标记图片以及写入Excel小方法
通过opencv标记图片以及写入Excel小方法 通过opencv根据坐标绘制图片框,然后保存图片 将结果读入并且写入Excel中进行保存 Python strip() 方法用于移除字符串头尾指定的字 ...
- zblog小程序导航主题模板,简单好用小程序商城主题设置方法
小程序现在非常的火爆,大家的手游内存是有限的,无需下载的小程序就能为大家提供和app相同的服务,非常的受欢迎.不过由于小程序数量太多,怎么才能顺利找到想要找到的小程序呢?小程序导航站就应运而生,如何做 ...
- PS实用小技巧:把png批量转换成jpg的方法
内容提要:本文的PS实用小技巧:把png批量转换成jpg的方法,不仅适用于png批量转换jpg,还适用于PSD等格式批量转换成jpg格式.对PS感兴趣的朋友可加 ps学习交流群:142574315 今 ...
最新文章
- python2 json大数据_大数据技术之python 操作json
- Fabric 链码Chaincode 的安装、初始化、调用、升级
- 2015年北京国电通面试题
- 地摊经济和夜经济-国情讲坛·周荣江:城市生态谋定治理转型
- CUDA并行算法系列之FFT快速卷积
- python安装各种插件
- archlinux详细安装步骤_最新Centos的liunx安装宝塔的详细步骤
- 转的:SQL执行提高效率的多种方法
- 黑马程序员之SQL server基础学习笔记(三)
- 汉化)称号插件.php,[管理|信息][UD]NameTags——基于权限的称号插件,兼容计分板,GUI显示[1.7.10-1.12.2]...
- Myeclipse学习总结(7)——Eclipse插件之Maven配置及问题解析
- ffmpeg 分辨率 压缩_ffmpeg 视频压缩
- linux系统安全加固
- PSP 个人软件过程
- 三角矩阵的逆矩阵怎么求_「线性代数」求可逆矩阵P,使得相似矩阵对角化
- php获取海康的视频流,全平台RTMP组件EasyRTMP如何通过海康SDK获取视频流推送到RTMP流媒体服务器...
- elastic APM 深入测试 二 基于spring cloud微服务框架的分布式追踪
- Java基础笔试练习(八)
- 280049_CAN 模块
- 新辰:浅谈那些被挑毛病的90后创业者 到底谁错了?