最近研究了一下时间序列预测的使用,网上找了大部分的资源,都是使用python来实现的,使用python来实现虽然能满足大部分的需求,但是python有一点缺点按就是只能使用一台计算资源进行计算,如果数据量大的时候,就有可能不能胜任,虽然这种情况很少,但是还是有可能会发生,因此就查了一下spark有没有这方面的资料,没想到还真的有,使用spark集群进行计算速度方面提升明显。

首先非常感谢这位博主,我是在学习了他的代码之下才能更好的理解spark-timeseries的使用。

下面是我对代码的改进,主要是调整的是时间类型的通用性与arima模型能自定义pdq参数等,能通用大部分类型的时间。

TimeFormatUtils.java

import java.text.ParseException;

import java.text.SimpleDateFormat;

import java.util.HashMap;

import java.util.regex.Pattern;

public class TimeFormatUtils {

/**

* 获取时间类型格式

*

* @param timeStr

* @return

*/

public static String getDateType(String timeStr) {

HashMap dateRegFormat = new HashMap();

dateRegFormat.put("^\\d{4}\\D+\\d{1,2}\\D+\\d{1,2}\\D+\\d{1,2}\\D+\\d{1,2}\\D+\\d{1,2}\\D*$", "yyyy-MM-dd HH:mm:ss");//2014年3月12日 13时5分34秒,2014-03-12 12:05:34,2014/3/12 12:5:34

dateRegFormat.put("^\\d{4}\\D+\\d{2}\\D+\\d{2}\\D+\\d{2}\\D+\\d{2}$", "yyyy-MM-dd HH:mm");//2014-03-12 12:05

dateRegFormat.put("^\\d{4}\\D+\\d{2}\\D+\\d{2}\\D+\\d{2}$", "yyyy-MM-dd HH");//2014-03-12 12

dateRegFormat.put("^\\d{4}\\D+\\d{2}\\D+\\d{2}$", "yyyy-MM-dd");//2014-03-12

dateRegFormat.put("^\\d{4}\\D+\\d{2}$", "yyyy-MM");//2014-03

dateRegFormat.put("^\\d{4}$", "yyyy");//2014

dateRegFormat.put("^\\d{14}$", "yyyyMMddHHmmss");//20140312120534

dateRegFormat.put("^\\d{12}$", "yyyyMMddHHmm");//201403121205

dateRegFormat.put("^\\d{10}$", "yyyyMMddHH");//2014031212

dateRegFormat.put("^\\d{8}$", "yyyyMMdd");//20140312

dateRegFormat.put("^\\d{6}$", "yyyyMM");//201403

try {

for (String key : dateRegFormat.keySet()) {

if (Pattern.compile(key).matcher(timeStr).matches()) {

String formater = "";

if (timeStr.contains("/"))

return dateRegFormat.get(key).replaceAll("-", "/");

else

return dateRegFormat.get(key);

}

}

} catch (Exception e) {

System.err.println("-----------------日期格式无效:" + timeStr);

e.printStackTrace();

}

return null;

}

public static String fromatData(String time, SimpleDateFormat format) {

try {

SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

return formatter.format(format.parse(time));

} catch (ParseException e) {

e.printStackTrace();

}

return null;

}

}

TimeSeriesTrain.scala

import java.sql.Timestamp

import java.text.SimpleDateFormat

import java.time.{ZoneId, ZonedDateTime}

import com.cloudera.sparkts._

import com.sendi.TimeSeries.Util.TimeFormatUtils

import org.apache.log4j.{Level, Logger}

import org.apache.spark.mllib.linalg.{Vector, Vectors}

import org.apache.spark.rdd.RDD

import org.apache.spark.sql.types._

import org.apache.spark.sql.{DataFrame, Row, SparkSession}

/**

* 时间序列模型time-series的建立

*/

object TimeSeriesTrain {

/**

* 总方法调用

*/

def timeSeries(args: Array[String]) {

args.foreach(println)

Logger.getLogger("org.apache.spark").setLevel(Level.WARN)

Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

/**

* 1、初始化spark环境

*/

val sparkSession = SparkSession.builder

.master("local[4]").appName("SparkTest")

.enableHiveSupport() //创建支持HiveContext;

.getOrCreate()

/**

* 2、初始化参数

*/

//hive中的数据库名字

val databaseTableName = args(0)

//输入的列名必须是time data

val hiveColumnName = List(args(1).toString.split(","): _*)

//开始与结束时间

val startTime = args(2)

val endTime = args(3)

//获取时间类型

val sdf = new SimpleDateFormat(TimeFormatUtils.getDateType(startTime))

//时间跨度

val timeSpanType = args(4)

val timeSpan = args(5).toInt

//预测后面N个值

val predictedN = args(6).toInt

//存放的表名字

val outputTableName = args(7)

var listPDQ: List[String] = List("")

var period = 0

var holtWintersModelType = ""

//选择模型(holtwinters或者是arima)

val modelName = args(8)

//根据不同的类型赋值不同的参数

if (modelName.equals("arima")) {

listPDQ = List(args(9).toString.split(","): _*)

} else {

//季节性参数(12或者4)

period = args(9).toInt

//holtWinters选择模型:additive(加法模型)、Multiplicative(乘法模型)

holtWintersModelType = args(10)

}

/**

* 3、 读取数据源,最终转换成 {time key data} 这种类型的RDD格式

*/

val timeDataKeyDf = readHiveData(sparkSession, databaseTableName, hiveColumnName)

val zonedDateDataDf = timeChangeToDate(sparkSession, timeDataKeyDf, hiveColumnName, startTime, sdf)

/**

* 4、创建数据中时间的跨度(Create an daily DateTimeIndex):开始日期+结束日期+递增数

* 日期的格式要与数据库中time数据的格式一样

*/

val dtIndex = getTimeSpan(startTime, endTime, timeSpanType, timeSpan, sdf)

/**

* 5、创建训练数据

*/

val trainTsrdd = TimeSeriesRDD.timeSeriesRDDFromObservations(dtIndex, zonedDateDataDf,

hiveColumnName(0), hiveColumnName(0) + "Key", hiveColumnName(1))

trainTsrdd.cache()

//填充缺失值

val filledTrainTsrdd = trainTsrdd.fill("linear")

/**

* 6、建立模型对象,并使用训练数据进行训练

*/

val timeSeriesKeyModel = new TimeSeriesKeyModel(predictedN, outputTableName)

var forecastValue: RDD[(String, Vector)] = sparkSession.sparkContext.parallelize(Seq(("", Vectors.dense(1))))

//选择模型

modelName match {

case "arima" => {

//创建和训练arima模型

val (forecast, coefficients) = timeSeriesKeyModel.arimaModelTrainKey(filledTrainTsrdd, listPDQ)

//Arima模型评估参数的保存

forecastValue = forecast

timeSeriesKeyModel.arimaModelKeyEvaluationSave(sparkSession, coefficients, forecast)

}

case "holtwinters" => {

//创建和训练HoltWinters模型(季节性模型)

val (forecast, sse) = timeSeriesKeyModel.holtWintersModelTrainKey(filledTrainTsrdd, period, holtWintersModelType)

//HoltWinters模型评估参数的保存

forecastValue = forecast

timeSeriesKeyModel.holtWintersModelKeyEvaluationSave(sparkSession, sse, forecast)

}

case _ => throw new UnsupportedOperationException("Currently only supports 'ariam' and 'holtwinters")

}

/**

* 7、合并实际值和预测值,并加上日期,形成dataframe(Date,Data),并保存

*/

timeSeriesKeyModel.actualForcastDateKeySaveInHive(sparkSession, filledTrainTsrdd, forecastValue, predictedN, startTime,

endTime, timeSpanType, timeSpan, sdf, hiveColumnName)

}

/**

* 读取hive中的数据,并对其进行处理操作,返回 time data key

*

* @param sparkSession

* @param databaseTableName

* @param hiveColumnName

*/

def readHiveData(sparkSession: SparkSession, databaseTableName: String, hiveColumnName: List[String]): DataFrame = {

//read the data form the hive where的作用是取出字段为time的列

var hiveDataDf = sparkSession.sql("select * from " + databaseTableName + " where " + hiveColumnName(0) + " !='" + hiveColumnName(0) + "'")

.select(hiveColumnName.head, hiveColumnName.tail: _*)

//去除空值

hiveDataDf = hiveDataDf.filter(hiveColumnName(1) + " != ''")

//In hiveDataDF:increase a new column.This column's name is hiveColumnName(0)+"Key",it's value is 0.

//timeDataKeyDf : time data timeKey column

val timeDataKeyDf = hiveDataDf.withColumn(hiveColumnName(0) + "Key", hiveDataDf(hiveColumnName(1)) * 0.toString)

.select(hiveColumnName(0), hiveColumnName(1), hiveColumnName(0) + "Key")

timeDataKeyDf

}

/**

* 把数据中的“time”列转换成固定时间格式:ZonedDateTime(such as 2007-12-03T10:15:30+01:00 Europe/Paris.)

*

* @param sparkSession

* @param timeDataKeyDf

* @param hiveColumnName

* @param startTime

* @param sdf

* @return

*/

def timeChangeToDate(sparkSession: SparkSession, timeDataKeyDf: DataFrame, hiveColumnName: List[String], startTime: String,

sdf: SimpleDateFormat): DataFrame = {

var rowRDD: RDD[Row] = sparkSession.sparkContext.parallelize(Seq(Row(""), Row("")))

rowRDD = timeDataKeyDf.rdd.map { row =>

row match {

case Row(time, data, key) => {

val date = sdf.parse(time.toString)

val timestamp = new Timestamp(date.getTime)

Row(timestamp, key.toString, data.toString.toDouble)

}

}

}

//根据模式字符串生成模式,转化成dataframe格式

var field = Seq(

StructField(hiveColumnName(0), TimestampType, true),

StructField(hiveColumnName(0) + "Key", StringType, true),

StructField(hiveColumnName(1), DoubleType, true))

val schema = StructType(field)

val zonedDateDataDf = sparkSession.createDataFrame(rowRDD, schema)

return zonedDateDataDf

}

/**

* 获取时间区间与时间跨度

*

* @param timeSpanType

* @param timeSpan

* @param sdf

* @param startTime

* @param endTime

*/

def getTimeSpan(startTime: String, endTime: String, timeSpanType: String, timeSpan: Int, sdf: SimpleDateFormat): UniformDateTimeIndex = {

val start = TimeFormatUtils.fromatData(startTime, sdf)

val end = TimeFormatUtils.fromatData(endTime, sdf)

val zone = ZoneId.systemDefault()

val frequency = timeSpanType match {

case "year" => new YearFrequency(timeSpan);

case "month" => new MonthFrequency(timeSpan);

case "day" => new DayFrequency(timeSpan);

case "hour" => new HourFrequency(timeSpan);

case "minute" => new MinuteFrequency(timeSpan);

}

val dtIndex: UniformDateTimeIndex = DateTimeIndex.uniformFromInterval(

ZonedDateTime.of(start.substring(0, 4).toInt, start.substring(5, 7).toInt, start.substring(8, 10).toInt,

start.substring(11, 13).toInt, start.substring(14, 16).toInt, 0, 0, zone),

ZonedDateTime.of(end.substring(0, 4).toInt, end.substring(5, 7).toInt, end.substring(8, 10).toInt,

end.substring(11, 13).toInt, end.substring(14, 16).toInt, 0, 0, zone),

frequency)

return dtIndex

}

}

TimeSeriesKeyModel.scala

import java.text.SimpleDateFormat

import java.util.Calendar

import com.cloudera.sparkts.TimeSeriesRDD

import com.cloudera.sparkts.models.{ARIMA}

import org.apache.spark.mllib.linalg.{Vector, Vectors}

import org.apache.spark.mllib.stat.Statistics

import org.apache.spark.rdd.RDD

import org.apache.spark.sql.{Row, SaveMode, SparkSession}

import org.apache.spark.sql.types.{StringType, StructField, StructType}

import scala.collection.mutable.ArrayBuffer

/**

* 时间序列模型(处理的数据多一个key列)

* Created by llq on 2017/5/3.

*/

class TimeSeriesKeyModel {

//预测后面N个值

private var predictedN = 1

//存放的表名字

private var outputTableName = "time_series.timeseries_output"

def this(predictedN: Int, outputTableName: String) {

this()

this.predictedN = predictedN

this.outputTableName = outputTableName

}

/**

* 实现Arima模型,处理数据是多一个key列

*

* @param trainTsrdd

* @return

*/

def arimaModelTrainKey(trainTsrdd: TimeSeriesRDD[String], listPDQ: List[String]): (RDD[(String, Vector)], RDD[(String, (String, (String, String, String), String, String))]) = {

/** *参数设置 ******/

val predictedN = this.predictedN

/** *创建arima模型 ***/

//创建和训练arima模型.其RDD格式为(ArimaModel,Vector)

val arimaAndVectorRdd = trainTsrdd.map { line =>

line match {

case (key, denseVector) => {

if (listPDQ.size >= 3) {

(key, ARIMA.fitModel(listPDQ(0).toInt, listPDQ(1).toInt, listPDQ(2).toInt, denseVector), denseVector)

} else {

(key, ARIMA.autoFit(denseVector), denseVector)

}

}

}

}

/** 参数输出:p,d,q的实际值和其系数值、最大似然估计值、aic值 **/

val coefficients = arimaAndVectorRdd.map { line =>

line match {

case (key, arimaModel, denseVector) => {

(key, (arimaModel.coefficients.mkString(","),

(arimaModel.p.toString,

arimaModel.d.toString,

arimaModel.q.toString),

arimaModel.logLikelihoodCSS(denseVector).toString,

arimaModel.approxAIC(denseVector).toString))

}

}

}

coefficients.collect().map {

_ match {

case (key, (coefficients, (p, d, q), logLikelihood, aic)) =>

println(key + " coefficients:" + coefficients + "=>" + "(p=" + p + ",d=" + d + ",q=" + q + ")")

}

}

/** *预测出后N个的值 *****/

val forecast = arimaAndVectorRdd.map { row =>

row match {

case (key, arimaModel, denseVector) => {

(key, arimaModel.forecast(denseVector, predictedN))

}

}

}

//取出预测值

val forecastValue = forecast.map {

_ match {

case (key, value) => {

val partArray = value.toArray.mkString(",").split(",")

var forecastArrayBuffer = new ArrayBuffer[Double]()

var i = partArray.length - predictedN

while (i < partArray.length) {

forecastArrayBuffer += partArray(i).toDouble

i = i + 1

}

(key, Vectors.dense(forecastArrayBuffer.toArray))

}

}

}

println("Arima forecast of next " + predictedN + " observations:")

forecastValue.foreach(println)

return (forecastValue, coefficients)

}

/**

* Arima模型评估参数的保存

* coefficients、(p、d、q)、logLikelihoodCSS、Aic、mean、variance、standard_deviation、max、min、range、count

*

* @param sparkSession

* @param coefficients

* @param forecastValue

*/

def arimaModelKeyEvaluationSave(sparkSession: SparkSession, coefficients: RDD[(String, (String, (String, String, String), String, String))], forecastValue: RDD[(String, Vector)]): Unit = {

/** 把vector转置 **/

val forecastRdd = forecastValue.map {

_ match {

case (key, forecast) => forecast.toArray

}

}

// Split the matrix into one number per line.

val byColumnAndRow = forecastRdd.zipWithIndex.flatMap {

case (row, rowIndex) => row.zipWithIndex.map {

case (number, columnIndex) => columnIndex -> (rowIndex, number)

}

}

// Build up the transposed matrix. Group and sort by column index first.

val byColumn = byColumnAndRow.groupByKey.sortByKey().values

// Then sort by row index.

val transposed = byColumn.map {

indexedRow => indexedRow.toSeq.sortBy(_._1).map(_._2)

}

val summary = Statistics.colStats(transposed.map(value => Vectors.dense(value(0))))

/** 统计求出预测值的均值、方差、标准差、最大值、最小值、极差、数量等;合并模型评估数据+统计值 **/

//评估模型的参数+预测出来数据的统计值

val evaluation = coefficients.join(forecastValue.map {

_ match {

case (key, forecast) => {

(key, (summary.mean.toArray(0).toString,

summary.variance.toArray(0).toString,

math.sqrt(summary.variance.toArray(0)).toString,

summary.max.toArray(0).toString,

summary.min.toArray(0).toString,

(summary.max.toArray(0) - summary.min.toArray(0)).toString,

summary.count.toString))

}

}

})

val evaluationRddRow = evaluation.map {

_ match {

case (key, ((coefficients, pdq, logLikelihoodCSS, aic), (mean, variance, standardDeviation, max, min, range, count))) => {

Row(coefficients, pdq.toString, logLikelihoodCSS, aic, mean, variance, standardDeviation, max, min, range, count)

}

}

}

//形成评估dataframe

val schemaString = "coefficients,pdq,logLikelihoodCSS,aic,mean,variance,standardDeviation,max,min,range,count"

val schema = StructType(schemaString.split(",").map(fileName => StructField(fileName, StringType, true)))

val evaluationDf = sparkSession.createDataFrame(evaluationRddRow, schema)

println("Evaluation in Arima:")

evaluationDf.show()

/**

* 把这份数据保存到hive与db中

*/

evaluationDf.write.mode(SaveMode.Overwrite).saveAsTable(outputTableName + "_arima_evaluation")

}

/**

* 实现holtwinters模型,处理的数据多一个key列

*

* @param trainTsrdd

* @param period

* @param holtWintersModelType

* @return

*/

def holtWintersModelTrainKey(trainTsrdd: TimeSeriesRDD[String], period: Int, holtWintersModelType: String): (RDD[(String, Vector)], RDD[(String, Double)]) = {

/** *参数设置 ******/

//往后预测多少个值

val predictedN = this.predictedN

/** *创建HoltWinters模型 ***/

//创建和训练HoltWinters模型.其RDD格式为(HoltWintersModel,Vector)

val holtWintersAndVectorRdd = trainTsrdd.map { line =>

line match {

case (key, denseVector) =>

(key, HoltWinters.fitModel(denseVector, period, holtWintersModelType), denseVector)

}

}

/** *预测出后N个的值 *****/

//构成N个预测值向量,之后导入到holtWinters的forcast方法中

val predictedArrayBuffer = new ArrayBuffer[Double]()

var i = 0

while (i < predictedN) {

predictedArrayBuffer += i

i = i + 1

}

val predictedVectors = Vectors.dense(predictedArrayBuffer.toArray)

//预测

val forecast = holtWintersAndVectorRdd.map { row =>

row match {

case (key, holtWintersModel, denseVector) => {

(key, holtWintersModel.forecast(denseVector, predictedVectors))

}

}

}

println("HoltWinters forecast of next " + predictedN + " observations:")

forecast.foreach(println)

/** holtWinters模型评估度量:SSE和方差 **/

val sse = holtWintersAndVectorRdd.map { row =>

row match {

case (key, holtWintersModel, denseVector) => {

(key, holtWintersModel.sse(denseVector))

}

}

}

return (forecast, sse)

}

/**

* HoltWinters模型评估参数的保存

* sse、mean、variance、standard_deviation、max、min、range、count

*

* @param sparkSession

* @param sse

* @param forecastValue

*/

def holtWintersModelKeyEvaluationSave(sparkSession: SparkSession, sse: RDD[(String, Double)], forecastValue: RDD[(String, Vector)]): Unit = {

/** 把vector转置 **/

val forecastRdd = forecastValue.map {

_ match {

case (key, forecast) => forecast.toArray

}

}

// Split the matrix into one number per line.

val byColumnAndRow = forecastRdd.zipWithIndex.flatMap {

case (row, rowIndex) => row.zipWithIndex.map {

case (number, columnIndex) => columnIndex -> (rowIndex, number)

}

}

// Build up the transposed matrix. Group and sort by column index first.

val byColumn = byColumnAndRow.groupByKey.sortByKey().values

// Then sort by row index.

val transposed = byColumn.map {

indexedRow => indexedRow.toSeq.sortBy(_._1).map(_._2)

}

val summary = Statistics.colStats(transposed.map(value => Vectors.dense(value(0))))

/** 统计求出预测值的均值、方差、标准差、最大值、最小值、极差、数量等;合并模型评估数据+统计值 **/

//评估模型的参数+预测出来数据的统计值

val evaluation = sse.join(forecastValue.map {

_ match {

case (key, forecast) => {

(key, (summary.mean.toArray(0).toString,

summary.variance.toArray(0).toString,

math.sqrt(summary.variance.toArray(0)).toString,

summary.max.toArray(0).toString,

summary.min.toArray(0).toString,

(summary.max.toArray(0) - summary.min.toArray(0)).toString,

summary.count.toString))

}

}

})

val evaluationRddRow = evaluation.map {

_ match {

case (key, (sse, (mean, variance, standardDeviation, max, min, range, count))) => {

Row(sse.toString, mean, variance, standardDeviation, max, min, range, count)

}

}

}

//形成评估dataframe

val schemaString = "sse,mean,variance,standardDeviation,max,min,range,count"

val schema = StructType(schemaString.split(",").map(fileName => StructField(fileName, StringType, true)))

val evaluationDf = sparkSession.createDataFrame(evaluationRddRow, schema)

println("Evaluation in HoltWinters:")

evaluationDf.show()

/**

* 存入到hive与db中

*/

evaluationDf.write.mode(SaveMode.Overwrite).saveAsTable(outputTableName + "_holtwinters_evaluation")

}

/**

* 把信息存储到hive中

*

* @param sparkSession

* @param dateDataRdd

* @param hiveColumnName

*/

private def keySaveInHive(sparkSession: SparkSession, dateDataRdd: RDD[Row], hiveColumnName: List[String]): Unit = {

//把dateData转换成dataframe

val schemaString = hiveColumnName(0) + " " + hiveColumnName(1)

val schema = StructType(schemaString.split(" ")

.map(fieldName => StructField(fieldName, StringType, true)))

val dateDataDf = sparkSession.createDataFrame(dateDataRdd, schema)

//dateDataDf存进hive中

dateDataDf.write.mode(SaveMode.Overwrite).saveAsTable(outputTableName)

}

/**

* 合并实际值和预测值,并加上日期,形成dataframe(Date,Data)

*

* @param sparkSession

* @param trainTsrdd

* @param forecastValue

* @param predictedN

* @param startTime

* @param endTime

* @param timeSpanType

* @param timeSpan

* @param sdf

* @param hiveColumnName

*/

def actualForcastDateKeySaveInHive(sparkSession: SparkSession, trainTsrdd: TimeSeriesRDD[String], forecastValue: RDD[(String, Vector)],

predictedN: Int, startTime: String, endTime: String, timeSpanType: String, timeSpan: Int,

sdf: SimpleDateFormat, hiveColumnName: List[String]): Unit = {

//在真实值后面追加预测值

val actualAndForcastRdd = trainTsrdd.map {

_ match {

case (key, actualValue) => (key, actualValue.toArray.mkString(","))

}

}.join(forecastValue.map {

_ match {

case (key, forecastValue) => (key, forecastValue.toArray.mkString(","))

}

})

//获取从开始预测到预测后的时间,转成RDD形式

val dateArray = productStartDatePredictDate(predictedN, timeSpanType, timeSpan, sdf, startTime, endTime)

val dateRdd = sparkSession.sparkContext.parallelize(dateArray.toArray.mkString(",").split(",").map(date => (date)))

//合并日期和数据值,形成RDD[Row]+keyName

val actualAndForcastArray = actualAndForcastRdd.collect()

for (i

val dateDataRdd = actualAndForcastArray(i) match {

case (key, value) => {

val actualAndForcast = sparkSession.sparkContext.parallelize(value.toString().split(",")

.map(data => {

data.replaceAll("\\(", "").replaceAll("\\)", "")

}))

dateRdd.zip(actualAndForcast).map {

_ match {

case (date, data) => Row(date, data)

}

}

}

}

//保存信息

if (dateDataRdd.collect()(0).toString() != "[1]") {

keySaveInHive(sparkSession, dateDataRdd, hiveColumnName)

}

}

}

/**

* 批量生成日期,时间段为:训练数据的开始到预测的结束

*

* @param predictedN

* @param timeSpanType

* @param timeSpan

* @param format

* @param startTime

* @param endTime

* @return

*/

def productStartDatePredictDate(predictedN: Int, timeSpanType: String, timeSpan: Int,

format: SimpleDateFormat, startTime: String, endTime: String): ArrayBuffer[String] = {

//形成开始start到预测predicted的日期

val cal1 = Calendar.getInstance()

cal1.setTime(format.parse(startTime))

val cal2 = Calendar.getInstance()

cal2.setTime(format.parse(endTime))

/**

* 获取时间差

*/

var field = 1

var diff: Long = 0

timeSpanType match {

case "year" => {

field = Calendar.YEAR

diff = (cal2.getTime.getYear() - cal1.getTime.getYear()) / timeSpan + predictedN;

}

case "month" => {

field = Calendar.MONTH

diff = ((cal2.getTime.getYear() - cal1.getTime.getYear()) * 12 + (cal2.getTime.getMonth() - cal1.getTime.getMonth())) / timeSpan + predictedN

}

case "day" => {

field = Calendar.DATE

diff = (cal2.getTimeInMillis - cal1.getTimeInMillis) / (1000 * 60 * 60 * 24) / timeSpan + predictedN

}

case "hour" => {

field = Calendar.HOUR

diff = (cal2.getTimeInMillis - cal1.getTimeInMillis) / (1000 * 60 * 60) / timeSpan + predictedN

}

case "minute" => {

field = Calendar.MINUTE

diff = (cal2.getTimeInMillis - cal1.getTimeInMillis) / (1000 * 60) / timeSpan + predictedN;

}

}

var iDiff = 0L;

var dateArrayBuffer = new ArrayBuffer[String]()

while (iDiff <= diff) {

//保存日期

dateArrayBuffer += format.format(cal1.getTime)

cal1.add(field, timeSpan)

iDiff = iDiff + 1;

}

dateArrayBuffer

}

}

java 时间序列预测_基于spark的时间序列预测包Sparkts._的使用相关推荐

  1. 【LSTM时间序列数据】基于matlab LSTM时间序列数据预测【含Matlab源码 1949期】

    ⛄一.获取代码方式 获取代码方式1: 完整代码已上传我的资源:[LSTM时间序列数据]基于matlab LSTM时间序列数据预测[含Matlab源码 1949期] 获取代码方式2: 付费专栏Matla ...

  2. 时间序列预测专项——基于Prophet的业务预测

    时间序列预测专项 基于Prophet的业务预测 1. 趋势模型 1.1 饱和增长模型 1.2 分段线性模型 1.3 变点自动选择 2. 季节模型 3. 节假日模型 4. Prophet模型参数解读 5 ...

  3. 【股价预测】基于matlab SVM股票价格预测【含Matlab源码 180期】

    一.获取代码方式 获取代码方式1: 完整代码已上传我的资源:[股价预测]基于matlab SVM股票价格预测[含Matlab源码 180期] 点击上面蓝色字体,直接付费下载,即可. 获取代码方式2: ...

  4. 基于java的随机森林算法_基于Spark实现随机森林代码

    本文实例为大家分享了基于Spark实现随机森林的具体代码,供大家参考,具体内容如下 public class RandomForestClassficationTest extends TestCas ...

  5. 物联网传感器_基于传感器的物联网预测性维护,为什么必须对机器进行数字信号处理...

    物联网传感器 The industrial plants consist of several types of assets. Sensor based IoT is employed for as ...

  6. spark如何进行聚类可视化_基于Spark的出租车轨迹处理与可视化平台

    由于城市化进程加剧以及汽车数量增加, 城市交通问题日益严重[, 通过分析各种空间数据解决交通问题是当前研究的热点. 出租车提供广泛且灵活的交通运输服务, 是城市交通的重要组成部分. 出租车轨迹数据记录 ...

  7. 电影推荐系统 python简书_基于Spark的电影推荐系统(实战简介)

    ## 写在前面 一直不知道这个专栏该如何开始写,思来想去,还是暂时把自己对这个项目的一些想法 和大家分享 的形式来展现.有什么问题,欢迎大家一起留言讨论. 这个项目的源代码是在https://gith ...

  8. 【负荷预测】基于神经网络的负荷预测和价格预测(Matlab代码实现)

    目录 1 概述 2 基于神经网络的负荷预测(Matlab实现) 2.1 代码 2.2 结果  2.3 回归树模型的进一步改进  3 基于神经网络的价格预测(Matlab代码实现)  4 阅读全文(Ma ...

  9. 【韵律预测】基于BILSTM-CRF的韵律预测

    基于BILSTM-CRF的韵律预测 摘要 BLSTM-CRF模型结构 Embedding layer BLSTM-CRF model BLSTM CRF Character-level表示 CRF++ ...

最新文章

  1. android 禁用dlsym_Android7.0对dlopen的改变——读取私有.so结果变化
  2. mysql函数(五.流程控制函数)
  3. python cli_click python cli 开发包
  4. [素数拓展] 质因数的个数 [2007年清华大学计算机研究生机试真题]
  5. spring3,上传文件ApplicationHttpRequest cannot be ca...
  6. 大话存储系列21——存储系统内部IO 上
  7. memcache、redis
  8. 逢看必会的三子棋小游戏:原来可以这么简单
  9. 【Multisim仿真】74LS193+74LS138流水灯
  10. 上海软星解散的前因后果
  11. upnp+摄相头捕捉服务器端程序
  12. 千呼万唤始出来—2019 FLAG
  13. 云计算数据中心运维管理要点
  14. 单片机__LED点阵学习收获
  15. STM32理论 —— DAC、DMA
  16. c/c++实现五子棋
  17. laravel 系统服务-助手函数
  18. 什么叫黑链-SEO zuobi手法解析
  19. odb格式Linux,AD如何输出ODB++文件的格式?
  20. Simsci.Pro_II.8.2_USB加密锁,运行稳定全功能版本\

热门文章

  1. Maven依赖的Scope去除部署不需要的jar 包(打包)
  2. linux中tomcat安装
  3. 版本控制工具(CVS、SVN、GIT)简介
  4. Python变量赋值的秘密
  5. 日本研发高精度诊疗感应器,或颠覆疾病诊疗
  6. tomcat部署多个项目,通过域名解析访问,不同的网站
  7. 全面IT资源监控 构建新一代数字化校园——天津大学
  8. eNSP模拟器RIP2动态路由,DHCP服务,ACL流控,组合使用的拓扑网络
  9. 《构建之法》(第四、十七章)读书笔记
  10. web安全之SQL注入---第四章 如何进行SQL注入攻击