Alink教程(Java版)
该文档涉及的组件

第5.3.4节 Flink与Alink的数据转换

在实际应用中,可能遇到这样的场景:在Flink任务中用户想要在现有的Flink流式任务中,嵌入Alink模型进行预测,这就涉及到Flink与Alink的数据转换问题。本节将通过一个示例,展示各种转换操作。

Alink MLEnvironment

在本节的转换中,大家一定要使用Alink MLEnvironment,并通过它获取相应的StreamExecutionEnvironment和StreamTableEnvironment。代码如下:

MLEnvironment mlEnv = MLEnvironmentFactory.getDefault();
StreamExecutionEnvironment env = mlEnv.getStreamExecutionEnvironment();
StreamTableEnvironment tenv = mlEnv.getStreamTableEnvironment();



构造一个Flink DataStream数据

示例代码如下,使用Map类型构造数据。

DataStreamSource<Map<String, Object>> inputDataStreamMap = env.addSource(
	new SourceFunction<Map <String, Object>>() {
		@Override
		public void run(SourceContext <Map <String, Object>> out) throws Exception {
			Map <String, Object> item = new HashMap<>();
			item.put("name", "a");
			item.put("val", 110);
			out.collect(item);

			Map <String, Object> item1 = new HashMap <>();
			item1.put("name", "b");
			item1.put("val", 111);
			out.collect(item1);

			Map <String, Object> item2 = new HashMap <>();
			item2.put("name", "c");
			item2.put("val", 113);
			out.collect(item2);
		}

		@Override
		public void cancel() {}
	});
inputDataStreamMap.print();

输出信息如下:

10> {val=113, name=c}
8> {val=110, name=a}
9> {val=111, name=b}



Flink DataStream<T> 转 Flink DataStream<Row>

使用Flink DataStream<T>,T可以是任意泛型,但向Flink Table或Alink StreamOperator转换时,只能为Flink DataStream<Row>。其转换方法很直接,写个MapFunction就可以,示例代码如下所示:

DataStream<Row> inputDataStreamRow = inputDataStreamMap.map(new MapFunction<Map <String, Object>, Row>() {
	@Override
	public Row map(Map <String, Object> value) throws Exception {
		return Row.of(value.get("name"), value.get("val"));
	}
});
inputDataStreamRow.print();

输出信息如下:

1> b,111
2> c,113
12> a,110



Flink DataStream<Row> 转 Flink Table

需要使用Alink提供的工具函数DataStreamConversionUtil.toTable(),各参数的意义比较明显,不再详细解释。具体代码如下:

Table inputTable = DataStreamConversionUtil.toTable(mlEnv, inputDataStreamRow, new String[] {"name", "val"},
	new TypeInformation<?>[] {AlinkTypes.STRING, AlinkTypes.INT});
inputTable.printSchema();


打印Schema信息如下:

root
 |-- name: STRING
 |-- val: INT



Flink Table 转 Alink StreamOperator

使用组件TableSourceStreamOp,可以实现Flink Table 到 Alink StreamOperator的转换。代码如下:

TableSourceStreamOp inputStreamOp = new TableSourceStreamOp(inputTable);


基于Alink StreamOperator,我们可以应用所有Alink算法组件,简单示例如下,对val列进行加1的操作,并增加一列,具体代码如下:

StreamOperator<?> outputStreamOp = inputStreamOp
	.select("name, val + 1 AS val, 'output' AS type");

outputStreamOp.print();

输出结果为:

name|val|type
----|---|----
a|111|output
b|112|output
c|114|output



Alink StreamOperator 转 Flink Table

Alink StreamOperator中有getOutputTable方法,可直接转换,代码如下:

Table outputTable = outputStreamOp.getOutputTable();
outputTable.printSchema();

打印Schema信息如下:

root
 |-- name: STRING
 |-- val: INT
 |-- type: STRING