Java 类名:com.alibaba.alink.operator.stream.source.TsvSourceStreamOp
Python 类名:TsvSourceStreamOp
按行读取以tab为分隔符的Tsv文件。
名称 | 中文名称 | 描述 | 类型 | 是否必须? | 取值范围 | 默认值 |
---|---|---|---|---|---|---|
filePath | 文件路径 | 文件路径 | String | ✓ | ||
schemaStr | Schema | Schema。格式为“colname coltype[, colname2, coltype2[, …]]”,例如“f0 string, f1 bigint, f2 double” | String | ✓ | ||
ignoreFirstLine | 是否忽略第一行数据 | 是否忽略第一行数据 | Boolean | false | ||
partitions | 分区名 | 1)单级、单个分区示例:ds=20190729;2)多级分区之间用“ / ”分隔,例如:ds=20190729/dt=12; 3)多个分区之间用“,”分隔,例如:ds=20190729,ds=20190730 | String | null | ||
skipBlankLine | 是否忽略空行 | 是否忽略空行 | Boolean | true |
from pyalink.alink import * import pandas as pd useLocalEnv(1) df = pd.DataFrame([ ["0L", "1L", 0.6], ["2L", "2L", 0.8], ["2L", "4L", 0.6], ["3L", "1L", 0.6], ["3L", "2L", 0.3], ["3L", "4L", 0.4] ]) source = StreamOperator.fromDataframe(df, schemaStr='uid string, iid string, label double') filepath = "/tmp/abc.tsv" tsvSink = TsvSinkStreamOp()\ .setFilePath(filepath)\ .setOverwriteSink(True) source.link(tsvSink) StreamOperator.execute() tsvSource = TsvSourceStreamOp().setFilePath(filepath).setSchemaStr("f string"); tsvSource.print() StreamOperator.execute()
import org.apache.flink.types.Row; import com.alibaba.alink.operator.stream.StreamOperator; import com.alibaba.alink.operator.stream.sink.TsvSinkStreamOp; import com.alibaba.alink.operator.stream.source.MemSourceStreamOp; import com.alibaba.alink.operator.stream.source.TsvSourceStreamOp; import org.junit.Test; import java.util.Arrays; import java.util.List; public class TsvSourceStreamOpTest { @Test public void testTsvSourceStreamOp() throws Exception { List <Row> df = Arrays.asList( Row.of("0L", "1L", 0.6), Row.of("2L", "2L", 0.8), Row.of("2L", "4L", 0.6), Row.of("3L", "1L", 0.6), Row.of("3L", "2L", 0.3), Row.of("3L", "4L", 0.4) ); StreamOperator <?> source = new MemSourceStreamOp(df, "uid string, iid string, label double"); String filepath = "/tmp/abc.tsv"; StreamOperator <?> tsvSink = new TsvSinkStreamOp() .setFilePath(filepath) .setOverwriteSink(true); source.link(tsvSink); StreamOperator.execute(); StreamOperator <?> tsvSource = new TsvSourceStreamOp().setFilePath(filepath).setSchemaStr("f string"); tsvSource.print(); StreamOperator.execute(); } }
f |
---|
3L |
0L |
3L |
3L |
2L |
2L |