Java 类名:com.alibaba.alink.operator.batch.sink.AkSinkBatchOp
Python 类名:AkSinkBatchOp
将一个批式数据,以Ak文件格式写出到文件系统。Ak文件格式是Alink 自定义的一种文件格式,能够将数据的Schema保留输出的文件格式。
名称 | 中文名称 | 描述 | 类型 | 是否必须? | 取值范围 | 默认值 |
---|---|---|---|---|---|---|
filePath | 文件路径 | 文件路径 | String | ✓ | ||
numFiles | 文件数目 | 文件数目 | Integer | 1 | ||
overwriteSink | 是否覆写已有数据 | 是否覆写已有数据 | Boolean | false | ||
partitionCols | 分区列 | 创建分区使用的列名 | String[] | null |
** 以下代码仅用于示意,可能需要修改部分代码或者配置环境后才能正常运行!**
df = pd.DataFrame([ [2, 1, 1], [3, 2, 1], [4, 3, 2], [2, 4, 1], [2, 2, 1], [4, 3, 2], [1, 2, 1], [5, 3, 3]]) batchData = BatchOperator.fromDataframe(df, schemaStr='f0 int, f1 int, label int') filePath = "/tmp/test_alink_file_sink"; # write file to local disk batchData.link(AkSinkBatchOp()\ .setFilePath(FilePath(filePath))\ .setOverwriteSink(True)\ .setNumFiles(1)) BatchOperator.execute()
import org.apache.flink.types.Row; import com.alibaba.alink.common.io.filesystem.FilePath; import com.alibaba.alink.common.io.filesystem.HadoopFileSystem; import com.alibaba.alink.operator.batch.BatchOperator; import com.alibaba.alink.operator.batch.sink.AkSinkBatchOp; import com.alibaba.alink.operator.batch.source.MemSourceBatchOp; import org.junit.Test; import java.util.Arrays; import java.util.List; public class AkSinkBatchOpTest { @Test public void testAkSinkBatchOp() throws Exception { List <Row> df = Arrays.asList( Row.of(2, 1, 1), Row.of(3, 2, 1), Row.of(4, 3, 2), Row.of(2, 4, 1), Row.of(2, 2, 1), Row.of(4, 3, 2), Row.of(1, 2, 1) ); BatchOperator <?> batchData = new MemSourceBatchOp(df, "f0 int, f1 int, label int"); String filePath = "/tmp/test_alink_file_sink"; batchData.link(new AkSinkBatchOp() .setFilePath(new FilePath(filePath)) .setOverwriteSink(true) .setNumFiles(1)); String hdfsFilePath = "alink_fs_test/test_alink_file_sink"; HadoopFileSystem fs = new HadoopFileSystem("2.8.3", "hdfs://10.101.201.169:9000"); batchData.link(new AkSinkBatchOp() .setFilePath(new FilePath(hdfsFilePath, fs)) .setOverwriteSink(true) .setNumFiles(1)); BatchOperator.execute(); } }