Alink教程(Java版)

第3.2.6节 读取分区格式数据


AkSourceBatchOp/StreamOp、CsvSourceBatchOp/StreamOp、ParquetSourceBatchOp/StreamOp等批式或流式数据源组件,都支持选择分区读取数据。


分区目录名格式为"分区名=值",例如: month=06/day=17;month=06/day=18。Alink将遍历目录下的分区名和分区值,构造分区表:

month

day

06

17

06

18

使用SQL语句查找分区,例如:AkSourceBatchOp.setPartitions("day = '17'"),分区选择语法参考《Flink SQL 内置函数》,分区值为String类型。

上节的最后,使用Export2FileSinkStreamOp组件生成了分区数据;本节将以AkSourceBatchOp/StreamOp为例,演示选择分区数据进行读取。

3.2.6.1 按分区读取批式数据


下面的例子中,我们先读取整个根文件夹的数据,随后,通过设置参数Partitions,选择不同的分区。"year='1997'"表示选择年份为1997的数据;使用更复杂的表达式"year='1997' AND month>='10'",表示1997年中月份数大于10的月份,即选择1997年的最后3个月。具体代码如下:

new AkSourceBatchOp()
	.setFilePath(LOCAL_DIR + "data_with_partitions")
	.lazyPrintStatistics("Statistics for data in the folder 'data_with_partitions' : ");

new AkSourceBatchOp()
	.setFilePath(LOCAL_DIR + "data_with_partitions")
	.setPartitions("year='1997'")
	.lazyPrintStatistics("Statistics for data of year=1997 : ");

new AkSourceBatchOp()
	.setFilePath(LOCAL_DIR + "data_with_partitions")
	.setPartitions("year='1997' AND month>='10'")
	.lazyPrintStatistics("Statistics for data of year 1997's last 3 months : ");

BatchOperator.execute();

输入各个数据集的统计结果如下,整个数据集有10000条记录;1997年的数据有52604条记录;1997年后三个月的数据有46493条记录。有的读者可能会有疑问,为什么后三个月的数据占据了绝大多数?因为数据集的记录是从9月份开始的。

Statistics for data in the folder 'data_with_partitions' : 
Summary: 
|colName| count|missing|     sum|    mean|   variance|min| max|
|-------|------|-------|--------|--------|-----------|---|----|
|user_id|100000|      0|46248475|462.4848|  71083.249|  1| 943|
|item_id|100000|      0|42553013|425.5301|109427.5525|  1|1682|
| rating|100000|      0|  352986|  3.5299|     1.2671|  1|   5|
|     ts|100000|      0|     NaN|     NaN|        NaN|NaN| NaN|

Statistics for data of year=1997 : 
Summary: 
|colName|count|missing|     sum|    mean|   variance|min| max|
|-------|-----|-------|--------|--------|-----------|---|----|
|user_id|52604|      0|24228648|460.5857| 73330.1483|  1| 943|
|item_id|52604|      0|21680120|412.1382|106729.2902|  1|1682|
| rating|52604|      0|  187718|  3.5685|     1.2213|  1|   5|
|     ts|52604|      0|     NaN|     NaN|        NaN|NaN| NaN|

Statistics for data of year 1997's last 3 months : 
Summary: 
|colName|count|missing|     sum|    mean|   variance|min| max|
|-------|-----|-------|--------|--------|-----------|---|----|
|user_id|46493|      0|21567932|463.8963|  72816.402|  1| 939|
|item_id|46493|      0|19225958|413.5237|106324.0118|  1|1682|
| rating|46493|      0|  166044|  3.5714|     1.2245|  1|   5|
|     ts|46493|      0|     NaN|     NaN|        NaN|NaN| NaN|


我们还可以使用更多的方式来选择分区,譬如:"day LIKE '3_'"表示第一位为字符“3”,第二位可以为任意的字符;"day LIKE '%3%'"表示字符串中要包含字符“3”。完整示例代码如下:

new AkSourceBatchOp()
	.setFilePath(LOCAL_DIR + "data_with_partitions")
	.setPartitions("day LIKE '3_'")
	.lazyPrint(10, ">>> day LIKE '3_'");

new AkSourceBatchOp()
	.setFilePath(LOCAL_DIR + "data_with_partitions")
	.setPartitions("day LIKE '%3%'")
	.print(10, ">>> day LIKE '%3%'");


运行结果如下,结果数据满足我们的预期。

>>> day LIKE '3_'
user_id|item_id|rating|ts 
-------|-------|------|---
51|496|4.0000|1997-12-31 00:17:35.0
51|182|3.0000|1997-12-31 00:19:50.0
443|313|4.0000|1997-12-31 01:56:04.0
443|948|1.0000|1997-12-31 02:00:44.0
149|333|1.0000|1997-12-31 04:09:51.0
149|310|2.0000|1997-12-31 04:11:29.0
149|325|2.0000|1997-12-31 04:13:54.0
705|97|3.0000|1997-12-31 05:52:45.0
705|193|3.0000|1997-12-31 05:55:03.0
185|939|3.0000|1997-12-31 07:24:09.0
>>> day LIKE '%3%'
user_id|item_id|rating|ts 
-------|-------|------|---
322|751|2.0000|1998-02-13 04:00:11.0
322|655|5.0000|1998-02-13 04:05:46.0
322|508|4.0000|1998-02-13 04:07:53.0
322|318|4.0000|1998-02-13 04:11:20.0
322|50|5.0000|1998-02-13 04:13:38.0
254|126|3.0000|1998-02-13 13:22:30.0
322|346|3.0000|1998-02-13 04:00:11.0
322|234|4.0000|1998-02-13 04:04:53.0
322|1019|4.0000|1998-02-13 04:07:53.0
322|521|5.0000|1998-02-13 04:10:44.0



3.2.6.2 按分区读取流式数据


对于流式组件,参数Partitions的设置与批式组件相同。这里,我们只举一个简单的例子,使用AkSourceStreamOp组件,选择日期为1号和2号的数据,对应的表达式为:"day IN('01', '02')"。具体代码如下:

new AkSourceStreamOp()
	.setFilePath(LOCAL_DIR + "data_with_partitions")
	.setPartitions("day IN('01', '02')")
	.sample(0.001)
	.print();
StreamOperator.execute();

运行结果如下,显示的数据都为1号或者2号的,也符合预期。

user_id|item_id|rating|ts 
-------|-------|------|---
92|707|4.0000|1997-10-01 04:59:22.0
605|223|5.0000|1997-12-02 06:24:59.0
305|192|2.0000|1998-02-01 16:54:35.0
100|1237|3.0000|1998-04-01 04:20:30.0
215|168|5.0000|1998-04-01 21:07:04.0
37|568|3.0000|1997-12-01 02:52:22.0
548|539|2.0000|1998-04-01 15:20:57.0
643|117|3.0000|1998-04-01 23:50:23.0
146|294|1.0000|1998-04-02 03:07:48.0
782|343|2.0000|1998-04-02 14:33:41.0