在实际应用中,我们常需要将某个时间间隔(1小时,5分钟,10秒等间隔)的数据输出到一个文件,该数据文件随后可以被放入批式处理、增量训练等流程。
Alink提供了Export2FileSinkStreamOp组件,可以按指定的时间间隔,将数据保存到本地或远程的文件系统,每个数据文件都是AK格式。下面先介绍组件的参数:
下面会以是否指定TimeCol参数,构建两个实例,帮助读者了解组件的功能。
我们以MovieLens的一个评分数据为例,关于该数据的介绍,请参见24.3节,这里不再重复。使用TsvSourceStreamOp组件,可以直接从网络地址读取数据,具体代码如下:
source = TsvSourceStreamOp()\ .setFilePath("http://files.grouplens.org/datasets/movielens/ml-100k/u.data")\ .setSchemaStr("user_id long, item_id long, rating float, ts long")\ .link( UDFStreamOp()\ .setFunc(from_unix_timestamp)\ .setSelectedCols(["ts"])\ .setOutputCol("ts") )
其中用户自定义函数FromUnixTimestamp的定义如下,该函数用于将原始数据中的long型表示时间的数据转换为Timestamp格式。
import datetime @udf(input_types=[DataTypes.BIGINT()], result_type=DataTypes.TIMESTAMP(3)) def from_unix_timestamp(ts): return datetime.datetime.fromtimestamp(ts)
然后,将流式source连接Export2FileSinkStreamOp组件,并设置参数,具体代码如下,这里没有设置TimeCol参数,意味着组件会按本地时间进行数据保存,并按WindowTime参数的设置,每隔5秒保存一次文件。
source.link( Export2FileSinkStreamOp()\ .setFilePath(LOCAL_DIR + "with_local_time")\ .setWindowTime(5)\ .setOverwriteSink(True) )
我们还连接了一个流式的Sink组件,用于准备后面的实验,具体代码如下。
source.link( AkSinkStreamOp()\ .setFilePath(LOCAL_DIR + "ratings.ak")\ .setOverwriteSink(True) )
执行流式任务,我们看到新建了名称为"with_local_time"的文件夹,具体内容详见下面的截图。其中包含了5个文件,都是AK格式,文件名是以间隔区间的结束时间点命名的,譬如:“202204211524050”对应时间:2022年4月21日15:24:05.0
最后,再做个简单的验证,统计一下"with_local_time"文件夹下的数据,详细代码如下:
AkSourceBatchOp()\ .setFilePath(LOCAL_DIR + "with_local_time")\ .lazyPrintStatistics("Statistics for data in the folder 'with_local_time' : ") BatchOperator.execute()
运行结果如下,共100000条数据,与原始数据集的条数相同。
Statistics for data in the folder 'with_local_time' : Summary: |colName| count|missing| sum| mean| variance|min| max| |-------|------|-------|--------|--------|-----------|---|----| |user_id|100000| 0|46248475|462.4848| 71083.249| 1| 943| |item_id|100000| 0|42553013|425.5301|109427.5525| 1|1682| | rating|100000| 0| 352986| 3.5299| 1.2671| 1| 5| | ts|100000| 0| NaN| NaN| NaN|NaN| NaN|
在实验前,先做一个数据准备工作,ratings.ak中的各条数据,从其ts时间列来看,几乎是随机分布在数据集中,我们相对其进行排序,并存入ratings_ordered.ak文件。具体代码如下:
AkSourceBatchOp()\ .setFilePath(LOCAL_DIR + "ratings.ak")\ .orderBy("ts", 1000000)\ .lazyPrintStatistics("Statistics for data in the file 'ratings.ak' : ")\ .link( AkSinkBatchOp()\ .setFilePath(LOCAL_DIR + "ratings_ordered.ak")\ .setOverwriteSink(True) ) BatchOperator.execute()
以流的方式读入准备好的数据,然后连接Export2FileSinkStreamOp组件,设置时间列为 ts 列,并设置时间间隔为3600X24秒,即1天。具体代码如下:
AkSourceStreamOp()\ .setFilePath(LOCAL_DIR + "ratings_ordered.ak")\ .link( Export2FileSinkStreamOp()\ .setFilePath(LOCAL_DIR + "with_ts_time")\ .setTimeCol("ts")\ .setWindowTime(3600 * 24)\ .setOverwriteSink(True) ) StreamOperator.execute()
访问"with_local_time"文件夹,查看运行结果,具体内容详见下面的截图,每个文件保存了一天的数据。
再读取数据,进行深入验证,一方面读取整个文件夹的数据,进行统计;另一方面,选择一个数据文件,打印输出,查看起时间列的信息。详细代码如下:
AkSourceBatchOp()\ .setFilePath(LOCAL_DIR + "with_ts_time")\ .lazyPrintStatistics("Statistics for data in the folder 'with_ts_time' : ") AkSourceBatchOp()\ .setFilePath(LOCAL_DIR + "with_ts_time" + os.sep + "199709210000000")\ .print()
运行结果如下,"with_local_time"文件夹下包含100000条数据,而且各项统计指标也与 ratings.ak 的也相同;
Statistics for data in the folder 'with_ts_time' : Summary: |colName| count|missing| sum| mean| variance|min| max| |-------|------|-------|--------|--------|-----------|---|----| |user_id|100000| 0|46248475|462.4848| 71083.249| 1| 943| |item_id|100000| 0|42553013|425.5301|109427.5525| 1|1682| | rating|100000| 0| 352986| 3.5299| 1.2671| 1| 5| | ts|100000| 0| NaN| NaN| NaN|NaN| NaN|
文件199709210000000中的各数据显示如下图所示,都为1997-09-20的,也符合预期。
在文件系统上存储长期的数据,经常将时间作为分区名称,通过选择分区名称,选择部分数据进行操作。Export2FileSinkStreamOp组件可以通过指定参数PartitionsFormat来定义输出的分区格式。如下面代码所示,"year=yyyy/month=MM/day=dd"表明输出三级文件夹,第一级文件夹的名称以“year=”开头,后面是4位的年份信息;第二级文件夹的名称以“month=”开头,后面是2位的月份信息,譬如:1月表示为“01”,12月表示为“12”;第三级文件夹的名称以“day=”开头,后面是2位的天信息,譬如:5号表示为“05”,31号表示为“31”。
AkSourceStreamOp()\ .setFilePath(LOCAL_DIR + "ratings_ordered.ak")\ .link( Export2FileSinkStreamOp()\ .setFilePath(LOCAL_DIR + "data_with_partitions")\ .setTimeCol("ts")\ .setWindowTime(3600 * 24)\ .setPartitionsFormat("year=yyyy/month=MM/day=dd")\ .setOverwriteSink(True) ) StreamOperator.execute()
运行结束后,我们可以在文件夹看到如下结构: