实际应用中常碰到这种场景,如下图所示,使用了流式预测组件,但过了一段时间,离线训练出新的模型,希望更新模型,但不想重启流式预测应用。Alink对于这个场景的解决方案就是,流式预测组件使用模型流,才通过相应组件或其它方法,将离线训练好的模型导入到模型流中。
Alink提供了将批式模型写出到模型流的组件:AppendModelStreamFileSinkBatchOp,可以将模型按照给定的时间戳,插入模型流。其参数说明如下:
名称 | 描述 | 是否必须? | 默认值 |
filePath | 文件路径 | ✓ | |
modelTime | 模型时间戳。默认当前时间。 使用yyyy-mm-dd hh:mm:ss.fffffffff格式,详见Timestamp.valueOf(String s) | null | |
numFiles | 文件数目 | 1 | |
numKeepModel | 实时写出模型的数目上限 | 2147483647 |
本节会多次用到数据http://alink-release.oss-cn-beijing.aliyuncs.com/data-files/avazu-small.csv,需要事先下载到本地,本地文件路径为DATA_DIR + "avazu-small.csv"。
我们使用批式训练组件LogisticRegressionTrainBatchOp进行训练,但是每次只使用部分训练数据,第一次使用十分之一,每次增加十分之一,直到第十次使用全量训练数据。每次训练后将结果传给组件AppendModelStreamFileSinkBatchOp,写出到模型流文件夹(路径为DATA_DIR + FTRL_MODEL_STREAM_DIR),如下面代码所示:
AlinkGlobalConfiguration.setPrintProcessInfo(true); for (int i = 1; i <= 10; i++) { new CsvSourceBatchOp() .setFilePath(DATA_DIR + "avazu-small.csv") .setSchemaStr(SCHEMA_STRING) .sample(0.1 * i) .link( new LogisticRegressionTrainBatchOp() .setFeatureCols(NUMERICAL_COL_NAMES) .setLabelCol(LABEL_COL_NAME) ) .link( new AppendModelStreamFileSinkBatchOp() .setFilePath(DATA_DIR + FTRL_MODEL_STREAM_DIR) .setNumKeepModel(10) ); BatchOperator.execute(); System.out.println("\nTrain " + String.valueOf(i) + " models.\n"); Thread.sleep(2000); }
这里开头使用AlinkGlobalConfiguration.setPrintProcessInfo方法,设置打印输出中间处理过程的信息,便于我们观察模型训练的过程;中间输出完成训练的模型个数;也使用Thread.sleep方法控制训练的间隔。
运行结束后,打开模型流文件夹(路径为DATA_DIR + FTRL_MODEL_STREAM_DIR),显示如图所示,十个模型文件夹,分别以写入时的时间戳字符串命名。