该文档涉及的组件

向量聚合 (VectorAssemblerStreamOp)

Java 类名:com.alibaba.alink.operator.stream.dataproc.vector.VectorAssemblerStreamOp

Python 类名:VectorAssemblerStreamOp

功能介绍

特征拼接组件。支持table中的多个 vector 列和数值列合并成一个vector 列。

参数说明

名称 中文名称 描述 类型 是否必须? 取值范围 默认值
outputCol 输出结果列列名 输出结果列列名,必选 String
selectedCols 选择的列名 计算列对应的列名列表 String[] 所选列类型为 [DENSE_VECTOR, SPARSE_VECTOR, STRING, VECTOR]
handleInvalidMethod 处理无效值的方法 处理无效值的方法,可取 error, skip String “ERROR”, “SKIP” “ERROR”
reservedCols 算法保留列名 算法保留列 String[] null
numThreads 组件多线程线程个数 组件多线程线程个数 Integer 1

代码示例

Python 代码

from pyalink.alink import *

import pandas as pd

useLocalEnv(1)

df = pd.DataFrame([
    [2, 1, 1],
    [3, 2, 1],
    [4, 3, 2],
    [2, 4, 1],
    [2, 2, 1],
    [4, 3, 2],
    [1, 2, 1],
    [5, 3, 3]
])

data = StreamOperator.fromDataframe(df, schemaStr="f0 int, f1 int, f2 int")

colnames = ["f0","f1","f2"]
VectorAssemblerStreamOp().setSelectedCols(colnames)\
.setOutputCol("out").linkFrom(data).print()
StreamOperator.execute()

Java 代码

import org.apache.flink.types.Row;

import com.alibaba.alink.operator.stream.StreamOperator;
import com.alibaba.alink.operator.stream.dataproc.vector.VectorAssemblerStreamOp;
import com.alibaba.alink.operator.stream.source.MemSourceStreamOp;
import org.junit.Test;

import java.util.Arrays;
import java.util.List;

public class VectorAssemblerStreamOpTest {
	@Test
	public void testVectorAssemblerStreamOp() throws Exception {
		List <Row> df = Arrays.asList(
			Row.of(2, 1, 1),
			Row.of(3, 2, 1),
			Row.of(4, 3, 2),
			Row.of(2, 4, 1),
			Row.of(2, 2, 1),
			Row.of(4, 3, 2),
			Row.of(1, 2, 1),
			Row.of(5, 3, 3)
		);
		StreamOperator <?> data = new MemSourceStreamOp(df, "f0 int, f1 int, f2 int");
		new VectorAssemblerStreamOp().setSelectedCols("f0", "f1", "f2")
			.setOutputCol("out").linkFrom(data).print();
		StreamOperator.execute();
	}
}

运行结果

f0 f1 f2 out
2 1 1 2.0,1.0,1.0
3 2 1 3.0,2.0,1.0
4 3 2 4.0,3.0,2.0
2 4 1 2.0,4.0,1.0
2 2 1 2.0,2.0,1.0
4 3 2 4.0,3.0,2.0
1 2 1 1.0,2.0,1.0
5 3 3 5.0,3.0,3.0