Alink教程(Java版)

第3.2.5节 定时输出流式数据


在实际应用中,我们常需要将某个时间间隔(1小时,5分钟,10秒等间隔)的数据输出到一个文件,该数据文件随后可以被放入批式处理、增量训练等流程。

Alink提供了Export2FileSinkStreamOp组件,可以按指定的时间间隔,将数据保存到本地或远程的文件系统,每个数据文件都是AK格式。下面先介绍组件的参数:

    • 参数WindowTime,设置间隔时间,以秒为单位,是必填参数。
    • 参数FilePath,保存数据的文件夹名称,是必填参数
    • 参数TimeCol,为时间列名称,是可选参数。如果没有设置该参数,会以流式任务运行节点的本地时间,计算间隔、输出数据文件。
    • 参数OverwriteSink,为布尔型参数,默认值为false,即当发现参数“FilePath”指定的文件夹存在时会报错;如果设置为true,则会将文件夹中的文件全部删除,再写入新的数据文件。

下面会以是否指定TimeCol参数,构建两个实例,帮助读者了解组件的功能。

3.2.5.1 按本地时间定时输出流式数据


我们以MovieLens的一个评分数据为例,关于该数据的介绍,请参见24.3节,这里不再重复。使用TsvSourceStreamOp组件,可以直接从网络地址读取数据,具体代码如下:

StreamOperator <?> source = new TsvSourceStreamOp()
	.setFilePath("http://files.grouplens.org/datasets/movielens/ml-100k/u.data")
	.setSchemaStr("user_id long, item_id long, rating float, ts long")
	.udf("ts", "ts", new FromUnixTimestamp());

其中用户自定义函数FromUnixTimestamp的定义如下,该函数用于将原始数据中的long型表示时间的数据转换为Timestamp格式。

public static class FromUnixTimestamp extends ScalarFunction {

	public java.sql.Timestamp eval(Long ts) {
		return new java.sql.Timestamp(ts * 1000);
	}

}


然后,将流式source连接Export2FileSinkStreamOp组件,并设置参数,具体代码如下,这里没有设置TimeCol参数,意味着组件会按本地时间进行数据保存,并按WindowTime参数的设置,每隔5秒保存一次文件。

source.link(
	new Export2FileSinkStreamOp()
		.setFilePath(LOCAL_DIR + "with_local_time")
		.setWindowTime(5)
		.setOverwriteSink(true)
);


我们还连接了一个流式的Sink组件,用于准备后面的实验,具体代码如下。

source.link(
	new AkSinkStreamOp()
		.setFilePath(LOCAL_DIR + "ratings.ak")
		.setOverwriteSink(true)
);


执行流式任务,我们看到新建了名称为"with_local_time"的文件夹,具体内容详见下面的截图。其中包含了5个文件,都是AK格式,文件名是以间隔区间的结束时间点命名的,譬如:“202204211524050”对应时间:2022年4月21日15:24:05.0


最后,再做个简单的验证,统计一下"with_local_time"文件夹下的数据,详细代码如下:

new AkSourceBatchOp()
	.setFilePath(LOCAL_DIR + "with_local_time")
	.lazyPrintStatistics("Statistics for data in the folder 'with_local_time' : ");
BatchOperator.execute();


运行结果如下,共100000条数据,与原始数据集的条数相同。

Statistics for data in the folder 'with_local_time' : 
Summary: 
|colName| count|missing|     sum|    mean|   variance|min| max|
|-------|------|-------|--------|--------|-----------|---|----|
|user_id|100000|      0|46248475|462.4848|  71083.249|  1| 943|
|item_id|100000|      0|42553013|425.5301|109427.5525|  1|1682|
| rating|100000|      0|  352986|  3.5299|     1.2671|  1|   5|
|     ts|100000|      0|     NaN|     NaN|        NaN|NaN| NaN|



3.2.5.2 按数据的时间列,定时输出流式数据


在实验前,先做一个数据准备工作,ratings.ak中的各条数据,从其ts时间列来看,几乎是随机分布在数据集中,我们相对其进行排序,并存入ratings_ordered.ak文件。具体代码如下:

new AkSourceBatchOp()
    .setFilePath(LOCAL_DIR + "ratings.ak")
    .orderBy("ts", 1000000)
    .lazyPrintStatistics("Statistics for data in the file 'ratings.ak' : ")
    .link(
    new AkSinkBatchOp()
    .setFilePath(LOCAL_DIR + "ratings_ordered.ak")
    .setOverwriteSink(true)
);
BatchOperator.execute();

以流的方式读入准备好的数据,然后连接Export2FileSinkStreamOp组件,设置时间列为 ts 列,并设置时间间隔为3600X24秒,即1天。具体代码如下:

new AkSourceStreamOp()
    .setFilePath(LOCAL_DIR + "ratings_ordered.ak")
    .link(
    new Export2FileSinkStreamOp()
    .setFilePath(LOCAL_DIR + "with_ts_time")
    .setTimeCol("ts")
    .setWindowTime(3600 * 24)
    .setOverwriteSink(true)
);
StreamOperator.execute();


访问"with_local_time"文件夹,查看运行结果,具体内容详见下面的截图,每个文件保存了一天的数据。


再读取数据,进行深入验证,一方面读取整个文件夹的数据,进行统计;另一方面,选择一个数据文件,打印输出,查看起时间列的信息。详细代码如下:

new AkSourceBatchOp()
    .setFilePath(LOCAL_DIR + "with_ts_time")
    .lazyPrintStatistics("Statistics for data in the folder 'with_ts_time' : ");

new AkSourceBatchOp()
    .setFilePath(LOCAL_DIR + "with_ts_time" + File.separator + "199709210000000")
	.print();


运行结果如下,"with_local_time"文件夹下包含100000条数据,而且各项统计指标也与 ratings.ak 的也相同;文件199709210000000中的各数据都为1997-09-20的,也符合预期。

Statistics for data in the folder 'with_ts_time' : 
Summary: 
|colName| count|missing|     sum|    mean|   variance|min| max|
|-------|------|-------|--------|--------|-----------|---|----|
|user_id|100000|      0|46248475|462.4848|  71083.249|  1| 943|
|item_id|100000|      0|42553013|425.5301|109427.5525|  1|1682|
| rating|100000|      0|  352986|  3.5299|     1.2671|  1|   5|
|     ts|100000|      0|     NaN|     NaN|        NaN|NaN| NaN|

user_id|item_id|rating|ts
-------|-------|------|---
259|185|4.0000|1997-09-20 11:06:21.0
259|357|5.0000|1997-09-20 11:18:05.0
712|1040|4.0000|1997-09-20 12:28:02.0
712|553|5.0000|1997-09-20 12:30:50.0
712|174|5.0000|1997-09-20 12:33:15.0
712|462|3.0000|1997-09-20 12:34:45.0
712|365|3.0000|1997-09-20 12:37:14.0
712|393|3.0000|1997-09-20 12:38:40.0
712|404|3.0000|1997-09-20 12:41:07.0
712|95|4.0000|1997-09-20 12:42:32.0
 ......
259|173|4.0000|1997-09-20 11:07:23.0
851|687|2.0000|1997-09-20 12:02:48.0
712|510|2.0000|1997-09-20 12:29:09.0
712|173|5.0000|1997-09-20 12:31:41.0
712|692|5.0000|1997-09-20 12:33:15.0
712|195|3.0000|1997-09-20 12:34:45.0
712|585|4.0000|1997-09-20 12:37:14.0
712|584|4.0000|1997-09-20 12:39:02.0
712|655|5.0000|1997-09-20 12:41:07.0
712|747|3.0000|1997-09-20 12:42:32.0



3.2.5.3 输出带分区的数据

在文件系统上存储长期的数据,经常将时间作为分区名称,通过选择分区名称,选择部分数据进行操作。Export2FileSinkStreamOp组件可以通过指定参数PartitionsFormat来定义输出的分区格式。如下面代码所示,"year=yyyy/month=MM/day=dd"表明输出三级文件夹,第一级文件夹的名称以“year=”开头,后面是4位的年份信息;第二级文件夹的名称以“month=”开头,后面是2位的月份信息,譬如:1月表示为“01”,12月表示为“12”;第三级文件夹的名称以“day=”开头,后面是2位的天信息,譬如:5号表示为“05”,31号表示为“31”。

new AkSourceStreamOp()
	.setFilePath(LOCAL_DIR + "ratings_ordered.ak")
	.link(
		new Export2FileSinkStreamOp()
			.setFilePath(LOCAL_DIR + "data_with_partitions")
			.setTimeCol("ts")
			.setWindowTime(3600 * 24)
			.setPartitionsFormat("year=yyyy/month=MM/day=dd")
			.setOverwriteSink(true)
	);
StreamOperator.execute();


运行结束后,我们可以在文件夹看到如下结构: