时间序列算法在实际问题中使用的越来越多,Alink提供了多种时间序列算法,既可以用在批数据上,也可以用在流式数据上。可以使单变量,也可以是多元变量。支持算法如下图,算法在解决销量预测等实际问题中使用的越来序算法在解决销量预测等实际问题中使用的越
# ts_string是String类型,数据形式是'2021-12-03 00:00:00' sourceOp.select('TO_TIMESTAMP(ts_string) as ts, id, val')
sourceOp.select('CURRENT_TIMESTAMP as ts, id, val')
GroupByBatchOp() .setGroupByPredicate("id") .setSelectClause("id, mtable_agg(ts, val) as data")
无模型,以Arima为例,
ArimaBatchOp() .setValueCol("data") .setOrder([1, 2, 1]) .setPredictNum(12) .setPredictionCol("predict")
有模型,以DeepAr为例,需要先训练模型,再用模型进行预测
deepARTrainBatchOp = DeepARTrainBatchOp()\ .setTimeCol("ts")\ .setSelectedCol("series")\ .setNumEpochs(10)\ .setWindow(2)\ .setStride(1)\ .linkFrom(batch_source) deepARPredictBatchOp = DeepARPredictBatchOp(deepARTrainBatchOp)\ .setPredictNum(2)\ .setValueCol("data")\ .setPredictionCol("pred")
FlattenMTableBatchOp() .setReservedCols(["id", "predict"]) .setSelectedCol("predict") .setSchemaStr("ts timestamp, val double")
或者使用查询, 将ts列时间对应的结果查询出来
LookupValueInTimeSeriesBatchOp() .setTimeCol("ts") .setTimeSeriesCol("predict") .setOutputCol("out") .setReservedCols(["id","ts"])
# ts_string是String类型,数据形式是'2021-12-03 00:00:00' sourceOp.select('TO_TIMESTAMP(ts_string) as ts, id, val')
sourceOp.select('CURRENT_TIMESTAMP as ts, id, val')
over = OverCountWindowStreamOp()\ .setTimeCol("ts")\ .setPrecedingRows(4)\ .setClause("mtable_agg_preceding(ts,val) as mtable_data")
Agg参考文档 http://alinklab.cn/tutorial/appendix_aggregate_function.html
窗口参考文档 https://www.yuque.com/pinshu/alink_guide/dffffm
无模型以Prophet为例,
tsOp = ProphetStreamOp()\ .setValueCol("mtable_data")\ .setPredictNum(1)\ .setPredictionCol("pred")\ .setPredictionDetailCol("pred_detail")
有模型,以DeepAr为例,需要先训练模型,再用模型进行预测
deepARTrainBatchOp = DeepARTrainBatchOp()\ .setTimeCol("ts")\ .setSelectedCol("series")\ .setNumEpochs(10)\ .setWindow(2)\ .setStride(1)\ .linkFrom(batch_source) deepARPredictStreamOp = DeepARPredictBatchOp(deepARTrainStreamOp)\ .setPredictNum(2)\ .setValueCol("data")\ .setPredictionCol("pred")
flatten = FlattenMTableStreamOp()\ .setReservedCols(["id"])\ .setSelectedCol("mt")\ .setSchemaStr('ts TIMESTAMP, val double')
LookupVectorInTimeSeriesStreamOp()\ .setTimeCol("ts") .setTimeSeriesCol("predict") .setOutputCol("out")
使用EvalTimeSeriesBatchOp进行评估,支持的评估指标如下,其中yi是真实值,fi是预测值,y_hat是真实值均值,N是数据条数。
指标 | 名称 | 解释 | |
MSE | 均方误差 | Mean Squared Error | MSE = SSE/N |
MAE | 平均绝对误差 | Mean Absolute Error | MAE = SAE/N |
RMSE | 均方根误差 | Root Mean Squared Error | RMSE = sqrt(MSE) |
SSE | 残差平方和 | Sum of Squares for Error | SSE = sum(yi-fi)^2 |
SST | 离差平方和 | Sum of Squared for Total | SST = sum(yi-y_hat)^2 |
SSR | 回归平方和 | Sum of Squares for Regression | SSR = sum(fi_y_hat)^2 |
MAPE | 平均绝对百分误差 | Mean Absolute Percentage Error | MAPE = sum|(fi-yi)/yi|*100/N |
SMAPE | 对称平均绝对百分误差 | Symmetric Mean Absolute Percentage Error | SMAPE = sum( |(fi-yi)|/(|yi|+|fi|) )*200/N |
ND | 标准方差 | Normalized Deviation | ND = SAE / sum(|yi|) |