本章包括下面各节:
3.1 文件系统简介
3.1.1 本地文件系统
3.1.2 Hadoop文件系统
3.1.3 阿里云OSS文件系统
3.2 数据文件的读入与导出
3.2.1 CSV格式
3.2.2 TSV、LibSVM、Text格式
3.2.3 AK格式
详细内容请阅读纸质书《Alink权威指南:基于Flink的机器学习实例入门(Java)》,这里为本章对应的示例代码。
package com.alibaba.alink; import com.alibaba.alink.common.AlinkGlobalConfiguration; import com.alibaba.alink.common.io.filesystem.FilePath; import com.alibaba.alink.common.io.filesystem.HadoopFileSystem; import com.alibaba.alink.common.io.filesystem.LocalFileSystem; import com.alibaba.alink.common.io.filesystem.OssFileSystem; import com.alibaba.alink.common.io.plugin.PluginDownloader; import com.alibaba.alink.operator.batch.BatchOperator; import com.alibaba.alink.operator.batch.dataproc.vector.VectorNormalizeBatchOp; import com.alibaba.alink.operator.batch.sink.AkSinkBatchOp; import com.alibaba.alink.operator.batch.sink.CsvSinkBatchOp; import com.alibaba.alink.operator.batch.source.*; import com.alibaba.alink.operator.stream.StreamOperator; import com.alibaba.alink.operator.stream.sink.AkSinkStreamOp; import com.alibaba.alink.operator.stream.sink.CsvSinkStreamOp; import com.alibaba.alink.operator.stream.sink.Export2FileSinkStreamOp; import com.alibaba.alink.operator.stream.source.AkSourceStreamOp; import com.alibaba.alink.operator.stream.source.CsvSourceStreamOp; import com.alibaba.alink.operator.stream.source.ParquetSourceStreamOp; import com.alibaba.alink.operator.stream.source.TsvSourceStreamOp; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.ArrayUtils; import org.apache.flink.core.fs.FileStatus; import org.apache.flink.core.fs.FileSystem.WriteMode; import org.apache.flink.core.fs.Path; import org.apache.flink.table.functions.ScalarFunction; import java.io.File; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.util.Date; import java.util.List; /** * */ public class Chap03 { static final String LOCAL_DIR = Utils.ROOT_DIR + "filesys" + File.separator; static final String HADOOP_VERSION = "2.8.3"; static final String OSS_VERSION = "3.4.1"; static final String OSS_END_POINT = "*"; static final String OSS_BUCKET_NAME = "*"; static final String OSS_ACCESS_ID = "*"; static final String OSS_ACCESS_KEY = "*"; static final String OSS_PREFIX_URI = "oss://" + OSS_BUCKET_NAME + "/"; static final String HDFS_URI = "hdfs://10.*.*.*:9000/"; static final String IRIS_HTTP_URL = "http://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data"; static final String IRIS_SCHEMA_STR = "sepal_length double, sepal_width double, petal_length double, petal_width double, category string"; static final String ALINK_PLUGIN_DIR = "/Users/yangxu/Downloads/alink_plugin"; public static void main(String[] args) throws Exception { System.out.print("Flink version: "); System.out.println(AlinkGlobalConfiguration.getFlinkVersion()); System.out.print("Default Plugin Dir: "); System.out.println(new File(AlinkGlobalConfiguration.getPluginDir()).getCanonicalPath()); AlinkGlobalConfiguration.setPluginDir(ALINK_PLUGIN_DIR); System.out.print("Current Plugin Dir: "); System.out.println(new File(AlinkGlobalConfiguration.getPluginDir()).getCanonicalPath()); PluginDownloader downloader = AlinkGlobalConfiguration.getPluginDownloader(); List<String> pluginNames = downloader.listAvailablePlugins(); for (String pluginName : pluginNames) { List<String> versions = downloader.listAvailablePluginVersions(pluginName); System.out.println(pluginName + " => " + ArrayUtils.toString(versions)); } downloader.downloadPlugin("mysql", "5.1.27"); downloader.downloadAll(); BatchOperator.setParallelism(1); c_1_1(); c_1_2_1(); c_1_2_2(); c_1_3_1(); c_1_3_2(); c_2_1_1(); c_2_1_2(); c_2_2(); c_2_3_1(); c_2_4(); c_2_5_1(); c_2_5_2(); c_2_5_3(); c_2_6(); } static void c_1_1() throws Exception { LocalFileSystem local = new LocalFileSystem(); System.out.println(local.getHomeDirectory()); System.out.println(local.getKind()); if (!local.exists(LOCAL_DIR)) { local.mkdirs(LOCAL_DIR); } for (FileStatus status : local.listStatus(LOCAL_DIR)) { System.out.println(status.getPath().toUri() + " \t" + status.getLen() + " \t" + new Date(status.getModificationTime()) ); } String path = LOCAL_DIR + "hello.txt"; OutputStream outputStream = local.create(path, WriteMode.OVERWRITE); outputStream.write("Hello Alink!".getBytes()); outputStream.close(); FileStatus status = local.getFileStatus(path); System.out.println(status); System.out.println(status.getLen()); System.out.println(new Date(status.getModificationTime())); InputStream inputStream = local.open(path); String readString = IOUtils.toString(inputStream); System.out.println(readString); } static void c_1_2_1() throws Exception { HadoopFileSystem hdfs = new HadoopFileSystem(HADOOP_VERSION, HDFS_URI); final String hdfsDir = HDFS_URI + "user/yangxu/alink/data/temp/"; System.out.println(hdfs.getKind()); if (!hdfs.exists(hdfsDir)) { hdfs.mkdirs(hdfsDir); } String path = hdfsDir + "hello.txt"; if (hdfs.exists(path)) { hdfs.delete(path, true); } OutputStream outputStream = hdfs.create(path, WriteMode.NO_OVERWRITE); outputStream.write("Hello Alink!".getBytes()); outputStream.close(); InputStream inputStream = hdfs.open(path); String readString = IOUtils.toString(inputStream); System.out.println(readString); } static void c_1_2_2() throws Exception { LocalFileSystem local = new LocalFileSystem(); HadoopFileSystem hdfs = new HadoopFileSystem(HADOOP_VERSION, HDFS_URI); copy( hdfs.open(HDFS_URI + "user/yangxu/alink/data/temp/hello.txt"), local.create(LOCAL_DIR + "hello_1.txt", WriteMode.OVERWRITE) ); copy( local.open(LOCAL_DIR + "hello_1.txt"), hdfs.create(HDFS_URI + "user/yangxu/alink/data/temp/hello_2.txt", WriteMode.OVERWRITE) ); for (FileStatus status : hdfs.listStatus(HDFS_URI + "user/yangxu/alink/data/temp/")) { System.out.println(status.getPath().toUri() + " \t" + status.getLen() + " \t" + new Date(status.getModificationTime()) ); } } static void c_1_3_1() throws Exception { OssFileSystem oss = new OssFileSystem( OSS_VERSION, OSS_END_POINT, OSS_BUCKET_NAME, OSS_ACCESS_ID, OSS_ACCESS_KEY ); System.out.println(oss.getKind()); final String ossDir = OSS_PREFIX_URI + "alink/data/temp/"; if (!oss.exists(new Path(ossDir))) { oss.mkdirs(new Path(ossDir)); } String path = ossDir + "hello.txt"; OutputStream outputStream = oss.create(path, WriteMode.OVERWRITE); outputStream.write("Hello Alink!".getBytes()); outputStream.close(); InputStream inputStream = oss.open(path); String readString = IOUtils.toString(inputStream); System.out.println(readString); } static void c_1_3_2() throws Exception { LocalFileSystem local = new LocalFileSystem(); OssFileSystem oss = new OssFileSystem( OSS_VERSION, OSS_END_POINT, OSS_BUCKET_NAME, OSS_ACCESS_ID, OSS_ACCESS_KEY ); copy( oss.open(OSS_PREFIX_URI + "alink/data/temp/hello.txt"), local.create(LOCAL_DIR + "hello_1.txt", WriteMode.OVERWRITE) ); copy( local.open(LOCAL_DIR + "hello_1.txt"), oss.create(OSS_PREFIX_URI + "alink/data/temp/hello_2.txt", WriteMode.OVERWRITE) ); for (FileStatus status : oss.listStatus(new Path(OSS_PREFIX_URI + "alink/data/temp/"))) { System.out.println(status.getPath().toUri() + " \t" + status.getLen() + " \t" + new Date(status.getModificationTime()) ); } } static void copy(InputStream in, OutputStream out) throws IOException { byte[] buffer = new byte[1024 * 1024]; int len = in.read(buffer); while (len != -1) { out.write(buffer, 0, len); len = in.read(buffer); } in.close(); out.close(); } static void c_2_1_1() throws Exception { CsvSourceBatchOp source_local = new CsvSourceBatchOp() .setFilePath(LOCAL_DIR + "iris.data") .setSchemaStr("sepal_length double, sepal_width double, " + "petal_length double, petal_width double, category string"); source_local.firstN(5).print(); CsvSourceBatchOp source_url = new CsvSourceBatchOp() .setFilePath("http://archive.ics.uci.edu/ml/machine-learning-databases" + "/iris/iris.data") .setSchemaStr("sepal_length double, sepal_width double, " + "petal_length double, petal_width double, category string"); source_url.firstN(5).print(); CsvSourceStreamOp source_stream = new CsvSourceStreamOp() .setFilePath("http://archive.ics.uci.edu/ml/machine-learning-databases" + "/iris/iris.data") .setSchemaStr("sepal_length double, sepal_width double, " + "petal_length double, petal_width double, category string"); source_stream.filter("sepal_length < 4.5").print(); StreamOperator.execute(); CsvSourceBatchOp wine_url = new CsvSourceBatchOp() .setFilePath(LOCAL_DIR + "winequality-white.csv") .setSchemaStr("fixedAcidity double,volatileAcidity double,citricAcid double," + "residualSugar double, chlorides double,freeSulfurDioxide double," + "totalSulfurDioxide double,density double, pH double," + "sulphates double,alcohol double,quality double") .setFieldDelimiter(";") .setIgnoreFirstLine(true); wine_url.firstN(5).print(); } static void c_2_1_2() throws Exception { HadoopFileSystem hdfs = new HadoopFileSystem(HADOOP_VERSION, HDFS_URI); OssFileSystem oss = new OssFileSystem( OSS_VERSION, OSS_END_POINT, OSS_BUCKET_NAME, OSS_ACCESS_ID, OSS_ACCESS_KEY ); FilePath[] filePaths = new FilePath[]{ new FilePath(LOCAL_DIR + "iris.csv"), new FilePath(HDFS_URI + "user/yangxu/alink/data/temp/iris.csv", hdfs), new FilePath(OSS_PREFIX_URI + "alink/data/temp/iris.csv", oss) }; for (FilePath filePath : filePaths) { new CsvSourceBatchOp() .setFilePath(IRIS_HTTP_URL) .setSchemaStr(IRIS_SCHEMA_STR) .link( new CsvSinkBatchOp() .setFilePath(filePath) .setOverwriteSink(true) ); BatchOperator.execute(); System.out.println( new CsvSourceBatchOp() .setFilePath(filePath) .setSchemaStr(IRIS_SCHEMA_STR) .count() ); } for (FilePath filePath : filePaths) { new CsvSourceStreamOp() .setFilePath(IRIS_HTTP_URL) .setSchemaStr(IRIS_SCHEMA_STR) .link( new CsvSinkStreamOp() .setFilePath(filePath) .setOverwriteSink(true) ); StreamOperator.execute(); new CsvSourceStreamOp() .setFilePath(filePath) .setSchemaStr(IRIS_SCHEMA_STR) .filter("sepal_length < 4.5") .print(); StreamOperator.execute(); } } static void c_2_2() throws Exception { new TsvSourceBatchOp() .setFilePath(LOCAL_DIR + "u.data") .setSchemaStr("user_id long, item_id long, rating float, ts long") .firstN(5) .print(); new TextSourceBatchOp() .setFilePath(LOCAL_DIR + "iris.scale") .firstN(5) .print(); new LibSvmSourceBatchOp() .setFilePath(LOCAL_DIR + "iris.scale") .firstN(5) .lazyPrint(5, "< read by LibSvmSourceBatchOp >") .link( new VectorNormalizeBatchOp() .setSelectedCol("features") ) .print(); } static void c_2_3_1() throws Exception { HadoopFileSystem hdfs = new HadoopFileSystem(HADOOP_VERSION, HDFS_URI); OssFileSystem oss = new OssFileSystem( OSS_VERSION, OSS_END_POINT, OSS_BUCKET_NAME, OSS_ACCESS_ID, OSS_ACCESS_KEY ); FilePath[] filePaths = new FilePath[]{ new FilePath(LOCAL_DIR + "iris.ak"), new FilePath(HDFS_URI + "user/yangxu/alink/data/temp/iris.ak", hdfs), new FilePath(OSS_PREFIX_URI + "alink/data/temp/iris.ak", oss) }; for (FilePath filePath : filePaths) { new CsvSourceBatchOp() .setFilePath(IRIS_HTTP_URL) .setSchemaStr(IRIS_SCHEMA_STR) .link( new AkSinkBatchOp() .setFilePath(filePath) .setOverwriteSink(true) ); BatchOperator.execute(); System.out.println( new AkSourceBatchOp() .setFilePath(filePath) .count() ); } for (FilePath filePath : filePaths) { new CsvSourceStreamOp() .setFilePath(IRIS_HTTP_URL) .setSchemaStr(IRIS_SCHEMA_STR) .link( new AkSinkStreamOp() .setFilePath(filePath) .setOverwriteSink(true) ); StreamOperator.execute(); new AkSourceStreamOp() .setFilePath(filePath) .filter("sepal_length < 4.5") .print(); StreamOperator.execute(); } } static void c_2_4() throws Exception { new ParquetSourceBatchOp() .setFilePath("https://alink-test-data.oss-cn-hangzhou.aliyuncs.com/iris.parquet") .lazyPrintStatistics() .print(); new ParquetSourceStreamOp() .setFilePath("https://alink-test-data.oss-cn-hangzhou.aliyuncs.com/iris.parquet") .print(); StreamOperator.execute(); } static void c_2_5_1() throws Exception { StreamOperator<?> source = new TsvSourceStreamOp() .setFilePath("http://files.grouplens.org/datasets/movielens/ml-100k/u.data") .setSchemaStr("user_id long, item_id long, rating float, ts long") .udf("ts", "ts", new FromUnixTimestamp()); source.link( new Export2FileSinkStreamOp() .setFilePath(LOCAL_DIR + "with_local_time") .setWindowTime(5) .setOverwriteSink(true) ); source.link( new AkSinkStreamOp() .setFilePath(LOCAL_DIR + "ratings.ak") .setOverwriteSink(true) ); StreamOperator.execute(); new AkSourceBatchOp() .setFilePath(LOCAL_DIR + "with_local_time") .lazyPrintStatistics("Statistics for data in the folder 'with_local_time' : "); BatchOperator.execute(); } static void c_2_5_2() throws Exception { new AkSourceBatchOp() .setFilePath(LOCAL_DIR + "ratings.ak") .orderBy("ts", 1000000) .lazyPrintStatistics("Statistics for data in the file 'ratings.ak' : ") .link( new AkSinkBatchOp() .setFilePath(LOCAL_DIR + "ratings_ordered.ak") .setOverwriteSink(true) ); BatchOperator.execute(); new AkSourceStreamOp() .setFilePath(LOCAL_DIR + "ratings_ordered.ak") .link( new Export2FileSinkStreamOp() .setFilePath(LOCAL_DIR + "with_ts_time") .setTimeCol("ts") .setWindowTime(3600 * 24) .setOverwriteSink(true) ); StreamOperator.execute(); new AkSourceBatchOp() .setFilePath(LOCAL_DIR + "with_ts_time") .lazyPrintStatistics("Statistics for data in the folder 'with_ts_time' : "); new AkSourceBatchOp() .setFilePath(LOCAL_DIR + "with_ts_time" + File.separator + "199709210000000") .print(); } static void c_2_5_3() throws Exception { new AkSourceStreamOp() .setFilePath(LOCAL_DIR + "ratings_ordered.ak") .link( new Export2FileSinkStreamOp() .setFilePath(LOCAL_DIR + "data_with_partitions") .setTimeCol("ts") .setWindowTime(3600 * 24) .setPartitionsFormat("year=yyyy/month=MM/day=dd") .setOverwriteSink(true) ); StreamOperator.execute(); } static void c_2_6() throws Exception { new AkSourceBatchOp() .setFilePath(LOCAL_DIR + "data_with_partitions") .lazyPrintStatistics("Statistics for data in the folder 'data_with_partitions' : "); new AkSourceBatchOp() .setFilePath(LOCAL_DIR + "data_with_partitions") .setPartitions("year='1997'") .lazyPrintStatistics("Statistics for data of year=1997 : "); new AkSourceBatchOp() .setFilePath(LOCAL_DIR + "data_with_partitions") .setPartitions("year='1997' AND month>='10'") .lazyPrintStatistics("Statistics for data of year 1997's last 3 months : "); BatchOperator.execute(); new AkSourceBatchOp() .setFilePath(LOCAL_DIR + "data_with_partitions") .setPartitions("day LIKE '3_'") .lazyPrint(10, ">>> day LIKE '3_'"); new AkSourceBatchOp() .setFilePath(LOCAL_DIR + "data_with_partitions") .setPartitions("day LIKE '%3%'") .print(10, ">>> day LIKE '%3%'"); new AkSourceStreamOp() .setFilePath(LOCAL_DIR + "data_with_partitions") .setPartitions("day IN('01', '02')") .sample(0.001) .print(); StreamOperator.execute(); } public static class FromUnixTimestamp extends ScalarFunction { public java.sql.Timestamp eval(Long ts) { return new java.sql.Timestamp(ts * 1000); } } }