本章包括下面各节:
3.1 文件系统简介
3.1.1 本地文件系统
3.1.2 Hadoop文件系统
3.1.3 阿里云OSS文件系统
3.2 数据文件的读入与导出
3.2.1 CSV格式
3.2.2 TSV、LibSVM、Text格式
3.2.3 AK格式
详细内容请阅读纸质书《Alink权威指南:基于Flink的机器学习实例入门(Java)》,这里为本章对应的示例代码。
package com.alibaba.alink;
import com.alibaba.alink.common.AlinkGlobalConfiguration;
import com.alibaba.alink.common.io.filesystem.FilePath;
import com.alibaba.alink.common.io.filesystem.HadoopFileSystem;
import com.alibaba.alink.common.io.filesystem.LocalFileSystem;
import com.alibaba.alink.common.io.filesystem.OssFileSystem;
import com.alibaba.alink.common.io.plugin.PluginDownloader;
import com.alibaba.alink.operator.batch.BatchOperator;
import com.alibaba.alink.operator.batch.dataproc.vector.VectorNormalizeBatchOp;
import com.alibaba.alink.operator.batch.sink.AkSinkBatchOp;
import com.alibaba.alink.operator.batch.sink.CsvSinkBatchOp;
import com.alibaba.alink.operator.batch.source.*;
import com.alibaba.alink.operator.stream.StreamOperator;
import com.alibaba.alink.operator.stream.sink.AkSinkStreamOp;
import com.alibaba.alink.operator.stream.sink.CsvSinkStreamOp;
import com.alibaba.alink.operator.stream.sink.Export2FileSinkStreamOp;
import com.alibaba.alink.operator.stream.source.AkSourceStreamOp;
import com.alibaba.alink.operator.stream.source.CsvSourceStreamOp;
import com.alibaba.alink.operator.stream.source.ParquetSourceStreamOp;
import com.alibaba.alink.operator.stream.source.TsvSourceStreamOp;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem.WriteMode;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.functions.ScalarFunction;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Date;
import java.util.List;
/**
*
*/
public class Chap03 {
static final String LOCAL_DIR = Utils.ROOT_DIR + "filesys" + File.separator;
static final String HADOOP_VERSION = "2.8.3";
static final String OSS_VERSION = "3.4.1";
static final String OSS_END_POINT = "*";
static final String OSS_BUCKET_NAME = "*";
static final String OSS_ACCESS_ID = "*";
static final String OSS_ACCESS_KEY = "*";
static final String OSS_PREFIX_URI = "oss://" + OSS_BUCKET_NAME + "/";
static final String HDFS_URI = "hdfs://10.*.*.*:9000/";
static final String IRIS_HTTP_URL =
"http://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data";
static final String IRIS_SCHEMA_STR =
"sepal_length double, sepal_width double, petal_length double, petal_width double, category string";
static final String ALINK_PLUGIN_DIR = "/Users/yangxu/Downloads/alink_plugin";
public static void main(String[] args) throws Exception {
System.out.print("Flink version: ");
System.out.println(AlinkGlobalConfiguration.getFlinkVersion());
System.out.print("Default Plugin Dir: ");
System.out.println(new File(AlinkGlobalConfiguration.getPluginDir()).getCanonicalPath());
AlinkGlobalConfiguration.setPluginDir(ALINK_PLUGIN_DIR);
System.out.print("Current Plugin Dir: ");
System.out.println(new File(AlinkGlobalConfiguration.getPluginDir()).getCanonicalPath());
PluginDownloader downloader = AlinkGlobalConfiguration.getPluginDownloader();
List<String> pluginNames = downloader.listAvailablePlugins();
for (String pluginName : pluginNames) {
List<String> versions = downloader.listAvailablePluginVersions(pluginName);
System.out.println(pluginName + " => " + ArrayUtils.toString(versions));
}
downloader.downloadPlugin("mysql", "5.1.27");
downloader.downloadAll();
BatchOperator.setParallelism(1);
c_1_1();
c_1_2_1();
c_1_2_2();
c_1_3_1();
c_1_3_2();
c_2_1_1();
c_2_1_2();
c_2_2();
c_2_3_1();
c_2_4();
c_2_5_1();
c_2_5_2();
c_2_5_3();
c_2_6();
}
static void c_1_1() throws Exception {
LocalFileSystem local = new LocalFileSystem();
System.out.println(local.getHomeDirectory());
System.out.println(local.getKind());
if (!local.exists(LOCAL_DIR)) {
local.mkdirs(LOCAL_DIR);
}
for (FileStatus status : local.listStatus(LOCAL_DIR)) {
System.out.println(status.getPath().toUri()
+ " \t" + status.getLen()
+ " \t" + new Date(status.getModificationTime())
);
}
String path = LOCAL_DIR + "hello.txt";
OutputStream outputStream = local.create(path, WriteMode.OVERWRITE);
outputStream.write("Hello Alink!".getBytes());
outputStream.close();
FileStatus status = local.getFileStatus(path);
System.out.println(status);
System.out.println(status.getLen());
System.out.println(new Date(status.getModificationTime()));
InputStream inputStream = local.open(path);
String readString = IOUtils.toString(inputStream);
System.out.println(readString);
}
static void c_1_2_1() throws Exception {
HadoopFileSystem hdfs = new HadoopFileSystem(HADOOP_VERSION, HDFS_URI);
final String hdfsDir = HDFS_URI + "user/yangxu/alink/data/temp/";
System.out.println(hdfs.getKind());
if (!hdfs.exists(hdfsDir)) {
hdfs.mkdirs(hdfsDir);
}
String path = hdfsDir + "hello.txt";
if (hdfs.exists(path)) {
hdfs.delete(path, true);
}
OutputStream outputStream = hdfs.create(path, WriteMode.NO_OVERWRITE);
outputStream.write("Hello Alink!".getBytes());
outputStream.close();
InputStream inputStream = hdfs.open(path);
String readString = IOUtils.toString(inputStream);
System.out.println(readString);
}
static void c_1_2_2() throws Exception {
LocalFileSystem local = new LocalFileSystem();
HadoopFileSystem hdfs = new HadoopFileSystem(HADOOP_VERSION, HDFS_URI);
copy(
hdfs.open(HDFS_URI + "user/yangxu/alink/data/temp/hello.txt"),
local.create(LOCAL_DIR + "hello_1.txt", WriteMode.OVERWRITE)
);
copy(
local.open(LOCAL_DIR + "hello_1.txt"),
hdfs.create(HDFS_URI + "user/yangxu/alink/data/temp/hello_2.txt", WriteMode.OVERWRITE)
);
for (FileStatus status : hdfs.listStatus(HDFS_URI + "user/yangxu/alink/data/temp/")) {
System.out.println(status.getPath().toUri()
+ " \t" + status.getLen()
+ " \t" + new Date(status.getModificationTime())
);
}
}
static void c_1_3_1() throws Exception {
OssFileSystem oss =
new OssFileSystem(
OSS_VERSION,
OSS_END_POINT,
OSS_BUCKET_NAME,
OSS_ACCESS_ID,
OSS_ACCESS_KEY
);
System.out.println(oss.getKind());
final String ossDir = OSS_PREFIX_URI + "alink/data/temp/";
if (!oss.exists(new Path(ossDir))) {
oss.mkdirs(new Path(ossDir));
}
String path = ossDir + "hello.txt";
OutputStream outputStream = oss.create(path, WriteMode.OVERWRITE);
outputStream.write("Hello Alink!".getBytes());
outputStream.close();
InputStream inputStream = oss.open(path);
String readString = IOUtils.toString(inputStream);
System.out.println(readString);
}
static void c_1_3_2() throws Exception {
LocalFileSystem local = new LocalFileSystem();
OssFileSystem oss =
new OssFileSystem(
OSS_VERSION,
OSS_END_POINT,
OSS_BUCKET_NAME,
OSS_ACCESS_ID,
OSS_ACCESS_KEY
);
copy(
oss.open(OSS_PREFIX_URI + "alink/data/temp/hello.txt"),
local.create(LOCAL_DIR + "hello_1.txt", WriteMode.OVERWRITE)
);
copy(
local.open(LOCAL_DIR + "hello_1.txt"),
oss.create(OSS_PREFIX_URI + "alink/data/temp/hello_2.txt", WriteMode.OVERWRITE)
);
for (FileStatus status : oss.listStatus(new Path(OSS_PREFIX_URI + "alink/data/temp/"))) {
System.out.println(status.getPath().toUri()
+ " \t" + status.getLen()
+ " \t" + new Date(status.getModificationTime())
);
}
}
static void copy(InputStream in, OutputStream out) throws IOException {
byte[] buffer = new byte[1024 * 1024];
int len = in.read(buffer);
while (len != -1) {
out.write(buffer, 0, len);
len = in.read(buffer);
}
in.close();
out.close();
}
static void c_2_1_1() throws Exception {
CsvSourceBatchOp source_local = new CsvSourceBatchOp()
.setFilePath(LOCAL_DIR + "iris.data")
.setSchemaStr("sepal_length double, sepal_width double, "
+ "petal_length double, petal_width double, category string");
source_local.firstN(5).print();
CsvSourceBatchOp source_url = new CsvSourceBatchOp()
.setFilePath("http://archive.ics.uci.edu/ml/machine-learning-databases"
+ "/iris/iris.data")
.setSchemaStr("sepal_length double, sepal_width double, "
+ "petal_length double, petal_width double, category string");
source_url.firstN(5).print();
CsvSourceStreamOp source_stream = new CsvSourceStreamOp()
.setFilePath("http://archive.ics.uci.edu/ml/machine-learning-databases"
+ "/iris/iris.data")
.setSchemaStr("sepal_length double, sepal_width double, "
+ "petal_length double, petal_width double, category string");
source_stream.filter("sepal_length < 4.5").print();
StreamOperator.execute();
CsvSourceBatchOp wine_url = new CsvSourceBatchOp()
.setFilePath(LOCAL_DIR + "winequality-white.csv")
.setSchemaStr("fixedAcidity double,volatileAcidity double,citricAcid double,"
+ "residualSugar double, chlorides double,freeSulfurDioxide double,"
+ "totalSulfurDioxide double,density double, pH double,"
+ "sulphates double,alcohol double,quality double")
.setFieldDelimiter(";")
.setIgnoreFirstLine(true);
wine_url.firstN(5).print();
}
static void c_2_1_2() throws Exception {
HadoopFileSystem hdfs = new HadoopFileSystem(HADOOP_VERSION, HDFS_URI);
OssFileSystem oss =
new OssFileSystem(
OSS_VERSION,
OSS_END_POINT,
OSS_BUCKET_NAME,
OSS_ACCESS_ID,
OSS_ACCESS_KEY
);
FilePath[] filePaths = new FilePath[]{
new FilePath(LOCAL_DIR + "iris.csv"),
new FilePath(HDFS_URI + "user/yangxu/alink/data/temp/iris.csv", hdfs),
new FilePath(OSS_PREFIX_URI + "alink/data/temp/iris.csv", oss)
};
for (FilePath filePath : filePaths) {
new CsvSourceBatchOp()
.setFilePath(IRIS_HTTP_URL)
.setSchemaStr(IRIS_SCHEMA_STR)
.link(
new CsvSinkBatchOp()
.setFilePath(filePath)
.setOverwriteSink(true)
);
BatchOperator.execute();
System.out.println(
new CsvSourceBatchOp()
.setFilePath(filePath)
.setSchemaStr(IRIS_SCHEMA_STR)
.count()
);
}
for (FilePath filePath : filePaths) {
new CsvSourceStreamOp()
.setFilePath(IRIS_HTTP_URL)
.setSchemaStr(IRIS_SCHEMA_STR)
.link(
new CsvSinkStreamOp()
.setFilePath(filePath)
.setOverwriteSink(true)
);
StreamOperator.execute();
new CsvSourceStreamOp()
.setFilePath(filePath)
.setSchemaStr(IRIS_SCHEMA_STR)
.filter("sepal_length < 4.5")
.print();
StreamOperator.execute();
}
}
static void c_2_2() throws Exception {
new TsvSourceBatchOp()
.setFilePath(LOCAL_DIR + "u.data")
.setSchemaStr("user_id long, item_id long, rating float, ts long")
.firstN(5)
.print();
new TextSourceBatchOp()
.setFilePath(LOCAL_DIR + "iris.scale")
.firstN(5)
.print();
new LibSvmSourceBatchOp()
.setFilePath(LOCAL_DIR + "iris.scale")
.firstN(5)
.lazyPrint(5, "< read by LibSvmSourceBatchOp >")
.link(
new VectorNormalizeBatchOp()
.setSelectedCol("features")
)
.print();
}
static void c_2_3_1() throws Exception {
HadoopFileSystem hdfs = new HadoopFileSystem(HADOOP_VERSION, HDFS_URI);
OssFileSystem oss =
new OssFileSystem(
OSS_VERSION,
OSS_END_POINT,
OSS_BUCKET_NAME,
OSS_ACCESS_ID,
OSS_ACCESS_KEY
);
FilePath[] filePaths = new FilePath[]{
new FilePath(LOCAL_DIR + "iris.ak"),
new FilePath(HDFS_URI + "user/yangxu/alink/data/temp/iris.ak", hdfs),
new FilePath(OSS_PREFIX_URI + "alink/data/temp/iris.ak", oss)
};
for (FilePath filePath : filePaths) {
new CsvSourceBatchOp()
.setFilePath(IRIS_HTTP_URL)
.setSchemaStr(IRIS_SCHEMA_STR)
.link(
new AkSinkBatchOp()
.setFilePath(filePath)
.setOverwriteSink(true)
);
BatchOperator.execute();
System.out.println(
new AkSourceBatchOp()
.setFilePath(filePath)
.count()
);
}
for (FilePath filePath : filePaths) {
new CsvSourceStreamOp()
.setFilePath(IRIS_HTTP_URL)
.setSchemaStr(IRIS_SCHEMA_STR)
.link(
new AkSinkStreamOp()
.setFilePath(filePath)
.setOverwriteSink(true)
);
StreamOperator.execute();
new AkSourceStreamOp()
.setFilePath(filePath)
.filter("sepal_length < 4.5")
.print();
StreamOperator.execute();
}
}
static void c_2_4() throws Exception {
new ParquetSourceBatchOp()
.setFilePath("https://alink-test-data.oss-cn-hangzhou.aliyuncs.com/iris.parquet")
.lazyPrintStatistics()
.print();
new ParquetSourceStreamOp()
.setFilePath("https://alink-test-data.oss-cn-hangzhou.aliyuncs.com/iris.parquet")
.print();
StreamOperator.execute();
}
static void c_2_5_1() throws Exception {
StreamOperator<?> source = new TsvSourceStreamOp()
.setFilePath("http://files.grouplens.org/datasets/movielens/ml-100k/u.data")
.setSchemaStr("user_id long, item_id long, rating float, ts long")
.udf("ts", "ts", new FromUnixTimestamp());
source.link(
new Export2FileSinkStreamOp()
.setFilePath(LOCAL_DIR + "with_local_time")
.setWindowTime(5)
.setOverwriteSink(true)
);
source.link(
new AkSinkStreamOp()
.setFilePath(LOCAL_DIR + "ratings.ak")
.setOverwriteSink(true)
);
StreamOperator.execute();
new AkSourceBatchOp()
.setFilePath(LOCAL_DIR + "with_local_time")
.lazyPrintStatistics("Statistics for data in the folder 'with_local_time' : ");
BatchOperator.execute();
}
static void c_2_5_2() throws Exception {
new AkSourceBatchOp()
.setFilePath(LOCAL_DIR + "ratings.ak")
.orderBy("ts", 1000000)
.lazyPrintStatistics("Statistics for data in the file 'ratings.ak' : ")
.link(
new AkSinkBatchOp()
.setFilePath(LOCAL_DIR + "ratings_ordered.ak")
.setOverwriteSink(true)
);
BatchOperator.execute();
new AkSourceStreamOp()
.setFilePath(LOCAL_DIR + "ratings_ordered.ak")
.link(
new Export2FileSinkStreamOp()
.setFilePath(LOCAL_DIR + "with_ts_time")
.setTimeCol("ts")
.setWindowTime(3600 * 24)
.setOverwriteSink(true)
);
StreamOperator.execute();
new AkSourceBatchOp()
.setFilePath(LOCAL_DIR + "with_ts_time")
.lazyPrintStatistics("Statistics for data in the folder 'with_ts_time' : ");
new AkSourceBatchOp()
.setFilePath(LOCAL_DIR + "with_ts_time" + File.separator + "199709210000000")
.print();
}
static void c_2_5_3() throws Exception {
new AkSourceStreamOp()
.setFilePath(LOCAL_DIR + "ratings_ordered.ak")
.link(
new Export2FileSinkStreamOp()
.setFilePath(LOCAL_DIR + "data_with_partitions")
.setTimeCol("ts")
.setWindowTime(3600 * 24)
.setPartitionsFormat("year=yyyy/month=MM/day=dd")
.setOverwriteSink(true)
);
StreamOperator.execute();
}
static void c_2_6() throws Exception {
new AkSourceBatchOp()
.setFilePath(LOCAL_DIR + "data_with_partitions")
.lazyPrintStatistics("Statistics for data in the folder 'data_with_partitions' : ");
new AkSourceBatchOp()
.setFilePath(LOCAL_DIR + "data_with_partitions")
.setPartitions("year='1997'")
.lazyPrintStatistics("Statistics for data of year=1997 : ");
new AkSourceBatchOp()
.setFilePath(LOCAL_DIR + "data_with_partitions")
.setPartitions("year='1997' AND month>='10'")
.lazyPrintStatistics("Statistics for data of year 1997's last 3 months : ");
BatchOperator.execute();
new AkSourceBatchOp()
.setFilePath(LOCAL_DIR + "data_with_partitions")
.setPartitions("day LIKE '3_'")
.lazyPrint(10, ">>> day LIKE '3_'");
new AkSourceBatchOp()
.setFilePath(LOCAL_DIR + "data_with_partitions")
.setPartitions("day LIKE '%3%'")
.print(10, ">>> day LIKE '%3%'");
new AkSourceStreamOp()
.setFilePath(LOCAL_DIR + "data_with_partitions")
.setPartitions("day IN('01', '02')")
.sample(0.001)
.print();
StreamOperator.execute();
}
public static class FromUnixTimestamp extends ScalarFunction {
public java.sql.Timestamp eval(Long ts) {
return new java.sql.Timestamp(ts * 1000);
}
}
}