Java 类名:com.alibaba.alink.operator.stream.sink.TFRecordDatasetSinkStreamOp
Python 类名:TFRecordDatasetSinkStreamOp
写出 TFRecordDataset 文件(TFRecordDataset 的介绍可以参考 TensorFlow 文档:https://www.tensorflow.org/tutorials/load_data/tfrecord )。
需要指定文件路径 filePath,默认为单并行度写出单个文件。
如果希望并行的写出文件,那么需要设置参数numFiles,得到的是一个包含多个 TFRecordDataset 文件的目录。
TFRecord 中, Feature 允许的数据类型仅有 float, int64, bytes。
其中数据类型为 bytes时,存储的数据实际上为 ByteString 的列表;为 float, int64 时存储的数据为 float 或 int64 的列表。
数据写出时会根据在 Alink 中的类型进行转换, 其他类型请先使用类型转换组件进行转换:
名称 | 中文名称 | 描述 | 类型 | 是否必须? | 取值范围 | 默认值 |
---|---|---|---|---|---|---|
filePath | 文件路径 | 文件路径 | String | ✓ | ||
numFiles | 文件数目 | 文件数目 | Integer | 1 | ||
overwriteSink | 是否覆写已有数据 | 是否覆写已有数据 | Boolean | false |
schemaStr = "sepal_length double, sepal_width double, petal_length double, petal_width double, category string" source = CsvSourceStreamOp() \ .setFilePath("https://alink-release.oss-cn-beijing.aliyuncs.com/data-files/iris.csv") \ .setSchemaStr(schemaStr) sink = TFRecordDatasetSinkStreamOp() \ .setFilePath("/tmp/iris.tfrecord") \ .setOverwriteSink(True) \ .linkFrom(source) StreamOperator.execute()
import com.alibaba.alink.operator.stream.StreamOperator; import com.alibaba.alink.operator.stream.sink.TFRecordDatasetSinkStreamOp; import com.alibaba.alink.operator.stream.source.CsvSourceStreamOp; import org.junit.Test; public class TFRecordDatasetSinkStreamOpTest { @Test public void testTFRecordDatasetSinkStreamOp() throws Exception { String schemaStr = "sepal_length double, sepal_width double, petal_length double, petal_width double, category string"; StreamOperator <?> source = new CsvSourceStreamOp() .setFilePath("https://alink-release.oss-cn-beijing.aliyuncs.com/data-files/iris.csv") .setSchemaStr(schemaStr); StreamOperator <?> sink = new TFRecordDatasetSinkStreamOp() .setFilePath("/tmp/iris.tfrecord") .setOverwriteSink(true) .linkFrom(source); StreamOperator.execute(); } }