本章包括下面各节:
3.1 文件系统简介
3.1.1 本地文件系统
3.1.2 Hadoop文件系统
3.1.3 阿里云OSS文件系统
3.2 数据文件的读入与导出
3.2.1 CSV格式
3.2.2 TSV格式、LibSVM格式和Text格式
3.2.3 AK格式
详细内容请阅读纸质书《Alink权威指南:基于Flink的机器学习实例入门(Python)》,这里为本章对应的示例代码。
from pyalink.alink import * useLocalEnv(1) from utils import * import os import pandas as pd pd.set_option('display.max_colwidth', 1000) DATA_DIR = ROOT_DIR + "temp" + os.sep LOCAL_DIR = DATA_DIR OSS_END_POINT = "*"; OSS_BUCKET_NAME = "*"; OSS_ACCESS_ID = "*"; OSS_ACCESS_KEY = "*"; OSS_PREFIX_URI = "oss://" + OSS_BUCKET_NAME + "/"; HDFS_URI = "hdfs://10.*.*.*:9000/"; IRIS_HTTP_URL = "http://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data"; IRIS_SCHEMA_STR = "sepal_length double, sepal_width double, petal_length double, petal_width double, category string";
#c_1_1 import datetime local = LocalFileSystem(); print(local.getHomeDirectory()); print(local.getKind()); if not local.exists(LOCAL_DIR) : local.mkdirs(LOCAL_DIR); for status in local.listStatus(LOCAL_DIR) : print(str(status.getPath().toUri()) + " \t" + str(status.getLen()) + " \t" + str(datetime.datetime.utcfromtimestamp(status.getModificationTime()/1000))) path = LOCAL_DIR + "hello.txt"; fos = local.create(path, True) fos.write(b'Hello Alink!') fos.flush() fos.close() status = local.getFileStatus(path) print(status) print(status.getLen()) print(datetime.datetime.utcfromtimestamp(status.getModificationTime()/1000)); nBytes = local.getFileStatus(path).getLen() fis = local.open(path) (numBytesRead, b) = fis.read(nBytes, 0) fis.close() print(b)
#c_1_2_1 hdfs = HadoopFileSystem(HDFS_URI); print(hdfs.getKind()); hdfsDir = HDFS_URI + "user/yangxu/alink/data/temp/"; if not hdfs.exists(hdfsDir) : hdfs.mkdirs(hdfsDir); for status in hdfs.listStatus(hdfsDir) : print(str(status.getPath().toUri()) + " \t" + str(status.getLen()) + " \t" + str(datetime.datetime.utcfromtimestamp(status.getModificationTime()/1000))) path = hdfsDir + "hello.txt"; fos = hdfs.create(path, True) fos.write(b'Hello Alink!') fos.flush() fos.close() nBytes = hdfs.getFileStatus(path).getLen() fis = hdfs.open(path) (numBytesRead, bytesRead) = fis.read(nBytes, 0) print(bytesRead)
def copy(fs_from, path_from, fs_to, path_to) : nBytes = fs_from.getFileStatus(path_from).getLen() fis = fs_from.open(path_from) (numBytesRead, bytesRead) = fis.read(nBytes, 0) fis.close() fos = fs_to.create(path_to, True) fos.write(bytesRead) fos.flush() fos.close()
#c_1_2_2 local = LocalFileSystem(); hdfs = HadoopFileSystem(HDFS_URI); copy(hdfs, HDFS_URI + "user/yangxu/alink/data/temp/hello.txt", local, LOCAL_DIR + "hello_1.txt"); copy(local, LOCAL_DIR + "hello_1.txt", hdfs, HDFS_URI + "user/yangxu/alink/data/temp/hello_2.txt") for status in hdfs.listStatus(HDFS_URI + "user/yangxu/alink/data/temp/") : print(str(status.getPath().toUri()) + " \t" + str(status.getLen()) + " \t" + str(datetime.datetime.utcfromtimestamp(status.getModificationTime()/1000)))
#c_1_3_1 oss = OssFileSystem(OSS_END_POINT,OSS_BUCKET_NAME,OSS_ACCESS_ID,OSS_ACCESS_KEY); print(oss.getKind()); ossDir = OSS_PREFIX_URI + "alink/data/temp/"; if not oss.exists(ossDir) : oss.mkdirs(ossDir); for status in oss.listStatus(ossDir) : print(str(status.getPath().toUri()) + " \t" + str(status.getLen()) + " \t" + str(datetime.datetime.utcfromtimestamp(status.getModificationTime()/1000))) path = ossDir + "hello.txt"; fos = oss.create(path, True) fos.write(b'Hello Alink!') fos.flush() fos.close() nBytes = oss.getFileStatus(path).getLen() fis = oss.open(path) (numBytesRead, bytesRead) = fis.read(nBytes, 0) print(bytesRead)
#c_1_3_2 local = LocalFileSystem(); oss = OssFileSystem(OSS_END_POINT,OSS_BUCKET_NAME,OSS_ACCESS_ID,OSS_ACCESS_KEY); copy(oss, OSS_PREFIX_URI + "alink/data/temp/hello.txt", local, LOCAL_DIR + "hello_1.txt"); copy(local, LOCAL_DIR + "hello_1.txt", oss, OSS_PREFIX_URI + "alink/data/temp/hello_2.txt"); for status in hdfs.listStatus(HDFS_URI + "user/yangxu/alink/data/temp/") : print(str(status.getPath().toUri()) + " \t" + str(status.getLen()) + " \t" + str(datetime.datetime.utcfromtimestamp(status.getModificationTime()/1000)))
help(CsvSourceBatchOp.setFilePath)
#c_2_1_1 source_local = CsvSourceBatchOp()\ .setFilePath(LOCAL_DIR + "iris.data")\ .setSchemaStr("sepal_length double, sepal_width double, " + "petal_length double, petal_width double, category string"); source_local.firstN(5).print(); source_url = CsvSourceBatchOp()\ .setFilePath("http://archive.ics.uci.edu/ml/machine-learning-databases" + "/iris/iris.data")\ .setSchemaStr("sepal_length double, sepal_width double, " + "petal_length double, petal_width double, category string"); source_url.firstN(5).print(); source_stream = CsvSourceStreamOp()\ .setFilePath("http://archive.ics.uci.edu/ml/machine-learning-databases" + "/iris/iris.data")\ .setSchemaStr("sepal_length double, sepal_width double, " + "petal_length double, petal_width double, category string"); source_stream.filter("sepal_length < 4.5").print(); StreamOperator.execute(); wine_url = CsvSourceBatchOp()\ .setFilePath("http://archive.ics.uci.edu/ml/machine-learning-databases" + "/wine-quality/winequality-white.csv")\ .setSchemaStr("fixedAcidity double,volatileAcidity double,citricAcid double," + "residualSugar double, chlorides double,freeSulfurDioxide double," + "totalSulfurDioxide double,density double, pH double," + "sulphates double,alcohol double,quality double")\ .setFieldDelimiter(";")\ .setIgnoreFirstLine(True); wine_url.firstN(5).print();
#c_2_1_2 oss = OssFileSystem(OSS_END_POINT,OSS_BUCKET_NAME,OSS_ACCESS_ID,OSS_ACCESS_KEY); filePaths = [FilePath(LOCAL_DIR + "iris.csv"), FilePath(HDFS_URI + "user/yangxu/alink/data/temp/iris.csv"), FilePath(OSS_PREFIX_URI + "alink/data/temp/iris.csv", oss)] for filePath in filePaths : print(filePath.getPathStr()) CsvSourceBatchOp()\ .setFilePath(IRIS_HTTP_URL)\ .setSchemaStr(IRIS_SCHEMA_STR)\ .link( CsvSinkBatchOp().setFilePath(filePath).setOverwriteSink(True) ); BatchOperator.execute(); CsvSourceBatchOp()\ .setFilePath(filePath)\ .setSchemaStr(IRIS_SCHEMA_STR)\ .firstN(3)\ .print() for filePath in filePaths : print(filePath.getPathStr()) CsvSourceStreamOp()\ .setFilePath(IRIS_HTTP_URL)\ .setSchemaStr(IRIS_SCHEMA_STR)\ .link( CsvSinkStreamOp().setFilePath(filePath).setOverwriteSink(True) ); StreamOperator.execute(); CsvSourceStreamOp()\ .setFilePath(filePath)\ .setSchemaStr(IRIS_SCHEMA_STR)\ .filter("sepal_length < 4.5")\ .print() StreamOperator.execute();
#c_2_2 TsvSourceBatchOp()\ .setFilePath("http://files.grouplens.org/datasets/movielens/ml-100k/u.data")\ .setSchemaStr("user_id long, item_id long, rating float, ts long")\ .firstN(5)\ .print(); TextSourceBatchOp()\ .setFilePath(LOCAL_DIR + "iris.scale")\ .firstN(5)\ .print(); LibSvmSourceBatchOp()\ .setFilePath(LOCAL_DIR + "iris.scale")\ .firstN(5)\ .lazyPrint(-1, "< read by LibSvmSourceBatchOp >")\ .link( VectorNormalizeBatchOp().setSelectedCol("features") )\ .lazyPrint(-1, "< after VectorNormalize >") BatchOperator.execute()
#c_2_3_1 oss = OssFileSystem(OSS_END_POINT,OSS_BUCKET_NAME,OSS_ACCESS_ID,OSS_ACCESS_KEY); filePaths = [FilePath(LOCAL_DIR + "iris.ak"), FilePath(HDFS_URI + "user/yangxu/alink/data/temp/iris.ak"), FilePath(OSS_PREFIX_URI + "alink/data/temp/iris.ak", oss)] for filePath in filePaths : print(filePath.getPathStr()) CsvSourceBatchOp()\ .setFilePath(IRIS_HTTP_URL)\ .setSchemaStr(IRIS_SCHEMA_STR)\ .link( AkSinkBatchOp().setFilePath(filePath).setOverwriteSink(True) ); BatchOperator.execute(); AkSourceBatchOp()\ .setFilePath(filePath)\ .firstN(3)\ .print() for filePath in filePaths : print(filePath.getPathStr()) CsvSourceStreamOp()\ .setFilePath(IRIS_HTTP_URL)\ .setSchemaStr(IRIS_SCHEMA_STR)\ .link( AkSinkStreamOp().setFilePath(filePath).setOverwriteSink(True) ); StreamOperator.execute(); AkSourceStreamOp()\ .setFilePath(filePath)\ .filter("sepal_length < 4.5")\ .print() StreamOperator.execute();
#c_2_4 ParquetSourceBatchOp()\ .setFilePath("https://alink-test-data.oss-cn-hangzhou.aliyuncs.com/iris.parquet")\ .lazyPrintStatistics()\ .print()
ParquetSourceStreamOp()\ .setFilePath("https://alink-test-data.oss-cn-hangzhou.aliyuncs.com/iris.parquet")\ .print() StreamOperator.execute()
#c_2_5_1 import datetime @udf(input_types=[AlinkDataTypes.BIGINT()], result_type=AlinkDataTypes.TIMESTAMP(3)) def from_unix_timestamp(ts): return datetime.datetime.fromtimestamp(ts) source = TsvSourceStreamOp()\ .setFilePath("http://files.grouplens.org/datasets/movielens/ml-100k/u.data")\ .setSchemaStr("user_id long, item_id long, rating float, ts long")\ .link( UDFStreamOp()\ .setFunc(from_unix_timestamp)\ .setSelectedCols(["ts"])\ .setOutputCol("ts") ) source.link( Export2FileSinkStreamOp()\ .setFilePath(LOCAL_DIR + "with_local_time")\ .setWindowTime(5)\ .setOverwriteSink(True) ) source.link( AkSinkStreamOp()\ .setFilePath(LOCAL_DIR + "ratings.ak")\ .setOverwriteSink(True) ) StreamOperator.execute()
AkSourceBatchOp()\ .setFilePath(LOCAL_DIR + "with_local_time")\ .lazyPrintStatistics("Statistics for data in the folder 'with_local_time' : ") BatchOperator.execute()
#c_2_5_2 AkSourceBatchOp()\ .setFilePath(LOCAL_DIR + "ratings.ak")\ .orderBy("ts", 1000000)\ .lazyPrintStatistics("Statistics for data in the file 'ratings.ak' : ")\ .link( AkSinkBatchOp()\ .setFilePath(LOCAL_DIR + "ratings_ordered.ak")\ .setOverwriteSink(True) ) BatchOperator.execute()
AkSourceStreamOp()\ .setFilePath(LOCAL_DIR + "ratings_ordered.ak")\ .link( Export2FileSinkStreamOp()\ .setFilePath(LOCAL_DIR + "with_ts_time")\ .setTimeCol("ts")\ .setWindowTime(3600 * 24)\ .setOverwriteSink(True) ) StreamOperator.execute()
AkSourceBatchOp()\ .setFilePath(LOCAL_DIR + "with_ts_time")\ .lazyPrintStatistics("Statistics for data in the folder 'with_ts_time' : ") AkSourceBatchOp()\ .setFilePath(LOCAL_DIR + "with_ts_time" + os.sep + "199709210000000")\ .print()
#c_2_5_3 AkSourceStreamOp()\ .setFilePath(LOCAL_DIR + "ratings_ordered.ak")\ .link( Export2FileSinkStreamOp()\ .setFilePath(LOCAL_DIR + "data_with_partitions")\ .setTimeCol("ts")\ .setWindowTime(3600 * 24)\ .setPartitionsFormat("year=yyyy/month=MM/day=dd")\ .setOverwriteSink(True) ) StreamOperator.execute()
#c_2_6 AkSourceBatchOp()\ .setFilePath(LOCAL_DIR + "data_with_partitions")\ .lazyPrintStatistics("Statistics for data in the folder 'data_with_partitions' : ") AkSourceBatchOp()\ .setFilePath(LOCAL_DIR + "data_with_partitions")\ .setPartitions("year='1997'")\ .lazyPrintStatistics("Statistics for data of year=1997 : ") AkSourceBatchOp()\ .setFilePath(LOCAL_DIR + "data_with_partitions")\ .setPartitions("year='1997' AND month>='10'")\ .lazyPrintStatistics("Statistics for data of year 1997's last 3 months : ") BatchOperator.execute()
AkSourceBatchOp()\ .setFilePath(LOCAL_DIR + "data_with_partitions")\ .setPartitions("day LIKE '3_'")\ .lazyPrint(10, ">>> day LIKE '3_'") AkSourceBatchOp()\ .setFilePath(LOCAL_DIR + "data_with_partitions")\ .setPartitions("day LIKE '%3%'")\ .print(10, ">>> day LIKE '%3%'")
AkSourceStreamOp()\ .setFilePath(LOCAL_DIR + "data_with_partitions")\ .setPartitions("day IN('01', '02')")\ .sample(0.001)\ .print() StreamOperator.execute()