在实际应用中,可能遇到这样的场景:在Flink任务中用户想要在现有的Flink流式任务中,嵌入Alink模型进行预测,这就涉及到Flink与Alink的数据转换问题。本节将通过一个示例,展示各种转换操作。
在本节的转换中,大家一定要使用Alink MLEnvironment,并通过它获取相应的StreamExecutionEnvironment和StreamTableEnvironment。代码如下:
MLEnvironment mlEnv = MLEnvironmentFactory.getDefault(); StreamExecutionEnvironment env = mlEnv.getStreamExecutionEnvironment(); StreamTableEnvironment tenv = mlEnv.getStreamTableEnvironment();
示例代码如下,使用Map类型构造数据。
DataStreamSource<Map<String, Object>> inputDataStreamMap = env.addSource( new SourceFunction<Map <String, Object>>() { @Override public void run(SourceContext <Map <String, Object>> out) throws Exception { Map <String, Object> item = new HashMap<>(); item.put("name", "a"); item.put("val", 110); out.collect(item); Map <String, Object> item1 = new HashMap <>(); item1.put("name", "b"); item1.put("val", 111); out.collect(item1); Map <String, Object> item2 = new HashMap <>(); item2.put("name", "c"); item2.put("val", 113); out.collect(item2); } @Override public void cancel() {} }); inputDataStreamMap.print();
输出信息如下:
10> {val=113, name=c} 8> {val=110, name=a} 9> {val=111, name=b}
使用Flink DataStream<T>,T可以是任意泛型,但向Flink Table或Alink StreamOperator转换时,只能为Flink DataStream<Row>。其转换方法很直接,写个MapFunction就可以,示例代码如下所示:
DataStream<Row> inputDataStreamRow = inputDataStreamMap.map(new MapFunction<Map <String, Object>, Row>() { @Override public Row map(Map <String, Object> value) throws Exception { return Row.of(value.get("name"), value.get("val")); } }); inputDataStreamRow.print();
输出信息如下:
1> b,111 2> c,113 12> a,110
需要使用Alink提供的工具函数DataStreamConversionUtil.toTable(),各参数的意义比较明显,不再详细解释。具体代码如下:
Table inputTable = DataStreamConversionUtil.toTable(mlEnv, inputDataStreamRow, new String[] {"name", "val"}, new TypeInformation<?>[] {AlinkTypes.STRING, AlinkTypes.INT}); inputTable.printSchema();
打印Schema信息如下:
root |-- name: STRING |-- val: INT
使用组件TableSourceStreamOp,可以实现Flink Table 到 Alink StreamOperator的转换。代码如下:
TableSourceStreamOp inputStreamOp = new TableSourceStreamOp(inputTable);
基于Alink StreamOperator,我们可以应用所有Alink算法组件,简单示例如下,对val列进行加1的操作,并增加一列,具体代码如下:
StreamOperator<?> outputStreamOp = inputStreamOp .select("name, val + 1 AS val, 'output' AS type"); outputStreamOp.print();
输出结果为:
name|val|type ----|---|---- a|111|output b|112|output c|114|output
Alink StreamOperator中有getOutputTable方法,可直接转换,代码如下:
Table outputTable = outputStreamOp.getOutputTable(); outputTable.printSchema();
打印Schema信息如下:
root |-- name: STRING |-- val: INT |-- type: STRING