在实际应用中,可能遇到这样的场景:在Flink任务中用户想要在现有的Flink流式任务中,嵌入Alink模型进行预测,这就涉及到Flink与Alink的数据转换问题。本节将通过一个示例,展示各种转换操作。
在本节的转换中,大家一定要使用Alink MLEnvironment,并通过它获取相应的StreamExecutionEnvironment和StreamTableEnvironment。代码如下:
MLEnvironment mlEnv = MLEnvironmentFactory.getDefault(); StreamExecutionEnvironment env = mlEnv.getStreamExecutionEnvironment(); StreamTableEnvironment tenv = mlEnv.getStreamTableEnvironment();
示例代码如下,使用Map类型构造数据。
DataStreamSource<Map<String, Object>> inputDataStreamMap = env.addSource(
new SourceFunction<Map <String, Object>>() {
@Override
public void run(SourceContext <Map <String, Object>> out) throws Exception {
Map <String, Object> item = new HashMap<>();
item.put("name", "a");
item.put("val", 110);
out.collect(item);
Map <String, Object> item1 = new HashMap <>();
item1.put("name", "b");
item1.put("val", 111);
out.collect(item1);
Map <String, Object> item2 = new HashMap <>();
item2.put("name", "c");
item2.put("val", 113);
out.collect(item2);
}
@Override
public void cancel() {}
});
inputDataStreamMap.print();输出信息如下:
10> {val=113, name=c}
8> {val=110, name=a}
9> {val=111, name=b}使用Flink DataStream<T>,T可以是任意泛型,但向Flink Table或Alink StreamOperator转换时,只能为Flink DataStream<Row>。其转换方法很直接,写个MapFunction就可以,示例代码如下所示:
DataStream<Row> inputDataStreamRow = inputDataStreamMap.map(new MapFunction<Map <String, Object>, Row>() {
@Override
public Row map(Map <String, Object> value) throws Exception {
return Row.of(value.get("name"), value.get("val"));
}
});
inputDataStreamRow.print();输出信息如下:
1> b,111 2> c,113 12> a,110
需要使用Alink提供的工具函数DataStreamConversionUtil.toTable(),各参数的意义比较明显,不再详细解释。具体代码如下:
Table inputTable = DataStreamConversionUtil.toTable(mlEnv, inputDataStreamRow, new String[] {"name", "val"},
new TypeInformation<?>[] {AlinkTypes.STRING, AlinkTypes.INT});
inputTable.printSchema();打印Schema信息如下:
root |-- name: STRING |-- val: INT
使用组件TableSourceStreamOp,可以实现Flink Table 到 Alink StreamOperator的转换。代码如下:
TableSourceStreamOp inputStreamOp = new TableSourceStreamOp(inputTable);
基于Alink StreamOperator,我们可以应用所有Alink算法组件,简单示例如下,对val列进行加1的操作,并增加一列,具体代码如下:
StreamOperator<?> outputStreamOp = inputStreamOp
.select("name, val + 1 AS val, 'output' AS type");
outputStreamOp.print();输出结果为:
name|val|type ----|---|---- a|111|output b|112|output c|114|output
Alink StreamOperator中有getOutputTable方法,可直接转换,代码如下:
Table outputTable = outputStreamOp.getOutputTable(); outputTable.printSchema();
打印Schema信息如下:
root |-- name: STRING |-- val: INT |-- type: STRING