Java 类名:com.alibaba.alink.operator.stream.timeseries.ProphetPredictStreamOp
Python 类名:ProphetPredictStreamOp
给定prophet模型(通过ProphetTrainBatchOp生成),使用Prophet进行时间序列预测。
参考文档 https://www.yuque.com/pinshu/alink_guide/xbp5ky
Prophet是facebook开源的一个时间序列预测算法, github地址:https://github.com/facebook/prophet.
Prophet适用于具有明显的内在规律的数据, 例如:
参考文档 https://www.yuque.com/pinshu/alink_guide/xbp5ky
名称 | 中文名称 | 描述 | 类型 | 是否必须? | 取值范围 | 默认值 |
---|---|---|---|---|---|---|
predictionCol | 预测结果列名 | 预测结果列名 | String | ✓ | ||
valueCol | value列,类型为MTable | value列,类型为MTable | String | ✓ | 所选列类型为 [M_TABLE, STRING] | |
cap | cap | cap | Double | null | ||
changePointPriorScale | changepoint_prior_scale | changepoint_prior_scale | Double | 0.05 | ||
changePointRange | change_point_range | change_point_range | Double | 0.8 | ||
changePoints | changepoints | changepoints | String | null | ||
dailySeasonality | daily_seasonality | daily_seasonality | String | “auto” | ||
floor | floor | floor | Double | null | ||
growth | growth | growth | String | “LINEAR”, “LOGISTIC”, “FLAT” | “LINEAR” | |
holidays | 节假日 | 节假日,格式是 playoff:2008-01-13,2009-01-03 superbowl: 2010-02-07,2014-02-02 | String | null | ||
holidaysPriorScale | holidays_prior_scale | holidays_prior_scale | Double | 10.0 | ||
includeHistory | include_history | include_history | Boolean | false | ||
intervalWidth | interval_width | interval_width | Double | 0.8 | ||
mcmcSamples | mcmc_samples | mcmc_samples | Integer | 0 | ||
modelFilePath | 模型的文件路径 | 模型的文件路径 | String | null | ||
nChangePoint | n_change_point | n_change_point | Integer | 25 | ||
predictNum | 预测条数 | 预测条数 | Integer | 1 | ||
predictionDetailCol | 预测详细信息列名 | 预测详细信息列名 | String | |||
reservedCols | 算法保留列名 | 算法保留列 | String[] | null | ||
seasonalityMode | seasonality_mode | seasonality_mode | String | “MULTIPLICATIVE”, “ADDITIVE” | “ADDITIVE” | |
seasonalityPriorScale | seasonality_prior_scale | seasonality_prior_scale | Double | 10.0 | ||
stanInit | 初始值 | 初始值 | String | null | ||
uncertaintySamples | 用来计算指标的采样数目 | 用来计算指标的采样数目,设置成0,不计算指标。 | Integer | 1000 | ||
weeklySeasonality | weekly_seasonality | weekly_seasonality | String | “auto” | ||
yearlySeasonality | yearly_seasonality | yearly_seasonality | String | “auto” | ||
numThreads | 组件多线程线程个数 | 组件多线程线程个数 | Integer | 1 | ||
modelStreamFilePath | 模型流的文件路径 | 模型流的文件路径 | String | null | ||
modelStreamScanInterval | 扫描模型路径的时间间隔 | 描模型路径的时间间隔,单位秒 | Integer | 10 | ||
modelStreamStartTime | 模型流的起始时间 | 模型流的起始时间。默认从当前时刻开始读。使用yyyy-mm-dd hh:mm:ss.fffffffff格式,详见Timestamp.valueOf(String s) | String | null |
from pyalink.alink import * import pandas as pd useLocalEnv(1) import time, datetime import numpy as np import pandas as pd data = pd.DataFrame([ [1, datetime.datetime.fromtimestamp(1000), 10.0], [1, datetime.datetime.fromtimestamp(2000), 11.0], [1, datetime.datetime.fromtimestamp(3000), 12.0], [1, datetime.datetime.fromtimestamp(4000), 13.0], [1, datetime.datetime.fromtimestamp(5000), 14.0], [1, datetime.datetime.fromtimestamp(6000), 15.0], [1, datetime.datetime.fromtimestamp(7000), 16.0], [1, datetime.datetime.fromtimestamp(8000), 17.0], [1, datetime.datetime.fromtimestamp(9000), 18.0], [1, datetime.datetime.fromtimestamp(10000), 19.0] ]) streamSource = dataframeToOperator(data, schemaStr='id int, ds1 timestamp, y1 double', op_type='stream') over = OverCountWindowStreamOp()\ .setTimeCol("ds1")\ .setPrecedingRows(4)\ .setClause("mtable_agg_preceding(ds1,y1) as tensor") streamPred = ProphetStreamOp()\ .setValueCol("tensor")\ .setPredictNum(1)\ .setPredictionCol("pred")\ .setPredictionDetailCol("pred_detail") valueOp = LookupVectorInTimeSeriesStreamOp()\ .setTimeSeriesCol("pred")\ .setTimeCol("ds1")\ .setReservedCols(["ds1", "tensor", "pred"])\ .setOutputCol("y_hat") streamSource\ .link(over)\ .link(streamPred)\ .link(valueOp)\ .print() StreamOperator.execute()
package com.alibaba.alink.operator.stream.timeseries; import org.apache.flink.types.Row; import com.alibaba.alink.operator.batch.source.MemSourceBatchOp; import com.alibaba.alink.operator.batch.timeseries.ProphetTrainBatchOp; import com.alibaba.alink.operator.stream.StreamOperator; import com.alibaba.alink.operator.stream.feature.OverCountWindowStreamOp; import com.alibaba.alink.operator.stream.source.MemSourceStreamOp; import org.junit.Test; import java.sql.Timestamp; import java.util.Arrays; public class ProphetPredictStreamOpTest { @Test public void testModel() throws Exception { Row[] rowsData = new Row[] { Row.of("1", new Timestamp(117, 11, 1, 0, 0, 0, 0), "9.59076113897809 9.59076113897809"), Row.of("1", new Timestamp(117, 11, 2, 0, 0, 0, 0), "8.51959031601596 8.51959031601596"), Row.of("2", new Timestamp(117, 11, 3, 0, 0, 0, 0), "9.59076113897809 8.51959031601596"), Row.of("1", new Timestamp(117, 11, 4, 0, 0, 0, 0), "8.18367658262066 8.51959031601596"), Row.of("2", new Timestamp(117, 11, 5, 0, 0, 0, 0), "8.51959031601596 8.51959031601596"), Row.of("1", new Timestamp(117, 11, 6, 0, 0, 0, 0), "8.07246736935477 8.51959031601596"), Row.of("2", new Timestamp(117, 11, 7, 0, 0, 0, 0), "8.18367658262066 8.51959031601596"), Row.of("2", new Timestamp(117, 11, 8, 0, 0, 0, 0), "8.18367658262066 8.51959031601596"), Row.of("2", new Timestamp(117, 11, 9, 0, 0, 0, 0), "8.18367658262066 8.51959031601596"), Row.of("2", new Timestamp(117, 11, 10, 0, 0, 0, 0), "8.18367658262066 8.51959031601596"), Row.of("2", new Timestamp(117, 11, 11, 0, 0, 0, 0), "8.18367658262066 8.51959031601596"), Row.of("2", new Timestamp(117, 11, 12, 0, 0, 0, 0), "8.18367658262066 8.51959031601596"), Row.of("2", new Timestamp(117, 11, 13, 0, 0, 0, 0), "8.18367658262066 8.51959031601596"), }; String[] colNames = new String[] {"id", "ds1", "y1"}; //train batch model. MemSourceBatchOp source = new MemSourceBatchOp(Arrays.asList(rowsData), colNames); ProphetTrainBatchOp model = new ProphetTrainBatchOp() .setTimeCol("ds1") .setValueCol("y1"); source.link(model); MemSourceStreamOp streamSource = new MemSourceStreamOp(Arrays.asList(rowsData), colNames); OverCountWindowStreamOp over = new OverCountWindowStreamOp() .setTimeCol("ds1") .setPrecedingRows(4) .setClause("mtable_agg_preceding(ds1,y1) as tensor"); ProphetPredictStreamOp streamPred = new ProphetPredictStreamOp(model) .setValueCol("tensor") .setPredictNum(1) .setPredictionCol("pred") .setPredictionDetailCol("pred_detail"); LookupVectorInTimeSeriesStreamOp valueOp = new LookupVectorInTimeSeriesStreamOp() .setTimeSeriesCol("pred") .setTimeCol("ds1") .setReservedCols("ds1", "tensor", "pred") .setOutputCol("y_hat"); streamSource .link(over) .link(streamPred) .link(valueOp) .print(); StreamOperator.execute(); } }
id | data | predict |
---|---|---|
1 | {“data”:{“ts”:[“1970-01-01 08:00:00.001”,“1970-01-01 08:00:00.002”,“1970-01-01 08:00:00.003”,“1970-01-01 08:00:00.004”,“1970-01-01 08:00:00.005”,“1970-01-01 08:00:00.006”,“1970-01-01 08:00:00.007”,“1970-01-01 08:00:00.008”,“1970-01-01 08:00:00.009”,“1970-01-01 08:00:00.01”],“val”:[10.0,11.0,12.0,13.0,14.0,15.0,16.0,17.0,18.0,19.0]},“schema”:“ts TIMESTAMP,val DOUBLE”} | {“data”:{“ts”:[“1970-01-01 08:00:00.011”,“1970-01-01 08:00:00.012”,“1970-01-01 08:00:00.013”,“1970-01-01 08:00:00.014”,“1970-01-01 08:00:00.015”,“1970-01-01 08:00:00.016”,“1970-01-01 08:00:00.017”,“1970-01-01 08:00:00.018”,“1970-01-01 08:00:00.019”,“1970-01-01 08:00:00.02”,“1970-01-01 08:00:00.021”,“1970-01-01 08:00:00.022”],“val”:[20.0,21.0,22.0,23.0,24.0,25.0,26.0,27.0,28.0,29.0,30.0,31.0]},“schema”:“ts TIMESTAMP,val DOUBLE”} |
t