本章包括下面各节:
3.1 文件系统简介
3.1.1 本地文件系统
3.1.2 Hadoop文件系统
3.1.3 阿里云OSS文件系统
3.2 数据文件的读入与导出
3.2.1 CSV格式
3.2.2 TSV格式、LibSVM格式和Text格式
3.2.3 AK格式
详细内容请阅读纸质书《Alink权威指南:基于Flink的机器学习实例入门(Python)》,这里为本章对应的示例代码。
from pyalink.alink import *
useLocalEnv(1)
from utils import *
import os
import pandas as pd
pd.set_option('display.max_colwidth', 1000)
DATA_DIR = ROOT_DIR + "temp" + os.sep
LOCAL_DIR = DATA_DIR
OSS_END_POINT = "*";
OSS_BUCKET_NAME = "*";
OSS_ACCESS_ID = "*";
OSS_ACCESS_KEY = "*";
OSS_PREFIX_URI = "oss://" + OSS_BUCKET_NAME + "/";
HDFS_URI = "hdfs://10.*.*.*:9000/";
IRIS_HTTP_URL = "http://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data";
IRIS_SCHEMA_STR = "sepal_length double, sepal_width double, petal_length double, petal_width double, category string";
#c_1_1
import datetime
local = LocalFileSystem();
print(local.getHomeDirectory());
print(local.getKind());
if not local.exists(LOCAL_DIR) :
local.mkdirs(LOCAL_DIR);
for status in local.listStatus(LOCAL_DIR) :
print(str(status.getPath().toUri()) + " \t" + str(status.getLen()) + " \t"
+ str(datetime.datetime.utcfromtimestamp(status.getModificationTime()/1000)))
path = LOCAL_DIR + "hello.txt";
fos = local.create(path, True)
fos.write(b'Hello Alink!')
fos.flush()
fos.close()
status = local.getFileStatus(path)
print(status)
print(status.getLen())
print(datetime.datetime.utcfromtimestamp(status.getModificationTime()/1000));
nBytes = local.getFileStatus(path).getLen()
fis = local.open(path)
(numBytesRead, b) = fis.read(nBytes, 0)
fis.close()
print(b)
#c_1_2_1
hdfs = HadoopFileSystem(HDFS_URI);
print(hdfs.getKind());
hdfsDir = HDFS_URI + "user/yangxu/alink/data/temp/";
if not hdfs.exists(hdfsDir) :
hdfs.mkdirs(hdfsDir);
for status in hdfs.listStatus(hdfsDir) :
print(str(status.getPath().toUri()) + " \t" + str(status.getLen()) + " \t"
+ str(datetime.datetime.utcfromtimestamp(status.getModificationTime()/1000)))
path = hdfsDir + "hello.txt";
fos = hdfs.create(path, True)
fos.write(b'Hello Alink!')
fos.flush()
fos.close()
nBytes = hdfs.getFileStatus(path).getLen()
fis = hdfs.open(path)
(numBytesRead, bytesRead) = fis.read(nBytes, 0)
print(bytesRead)
def copy(fs_from, path_from, fs_to, path_to) :
nBytes = fs_from.getFileStatus(path_from).getLen()
fis = fs_from.open(path_from)
(numBytesRead, bytesRead) = fis.read(nBytes, 0)
fis.close()
fos = fs_to.create(path_to, True)
fos.write(bytesRead)
fos.flush()
fos.close()
#c_1_2_2
local = LocalFileSystem();
hdfs = HadoopFileSystem(HDFS_URI);
copy(hdfs, HDFS_URI + "user/yangxu/alink/data/temp/hello.txt",
local, LOCAL_DIR + "hello_1.txt");
copy(local, LOCAL_DIR + "hello_1.txt",
hdfs, HDFS_URI + "user/yangxu/alink/data/temp/hello_2.txt")
for status in hdfs.listStatus(HDFS_URI + "user/yangxu/alink/data/temp/") :
print(str(status.getPath().toUri()) + " \t" + str(status.getLen()) + " \t"
+ str(datetime.datetime.utcfromtimestamp(status.getModificationTime()/1000)))
#c_1_3_1
oss = OssFileSystem(OSS_END_POINT,OSS_BUCKET_NAME,OSS_ACCESS_ID,OSS_ACCESS_KEY);
print(oss.getKind());
ossDir = OSS_PREFIX_URI + "alink/data/temp/";
if not oss.exists(ossDir) :
oss.mkdirs(ossDir);
for status in oss.listStatus(ossDir) :
print(str(status.getPath().toUri()) + " \t" + str(status.getLen()) + " \t"
+ str(datetime.datetime.utcfromtimestamp(status.getModificationTime()/1000)))
path = ossDir + "hello.txt";
fos = oss.create(path, True)
fos.write(b'Hello Alink!')
fos.flush()
fos.close()
nBytes = oss.getFileStatus(path).getLen()
fis = oss.open(path)
(numBytesRead, bytesRead) = fis.read(nBytes, 0)
print(bytesRead)
#c_1_3_2
local = LocalFileSystem();
oss = OssFileSystem(OSS_END_POINT,OSS_BUCKET_NAME,OSS_ACCESS_ID,OSS_ACCESS_KEY);
copy(oss, OSS_PREFIX_URI + "alink/data/temp/hello.txt",
local, LOCAL_DIR + "hello_1.txt");
copy(local, LOCAL_DIR + "hello_1.txt",
oss, OSS_PREFIX_URI + "alink/data/temp/hello_2.txt");
for status in hdfs.listStatus(HDFS_URI + "user/yangxu/alink/data/temp/") :
print(str(status.getPath().toUri()) + " \t" + str(status.getLen()) + " \t"
+ str(datetime.datetime.utcfromtimestamp(status.getModificationTime()/1000)))
help(CsvSourceBatchOp.setFilePath)
#c_2_1_1
source_local = CsvSourceBatchOp()\
.setFilePath(LOCAL_DIR + "iris.data")\
.setSchemaStr("sepal_length double, sepal_width double, "
+ "petal_length double, petal_width double, category string");
source_local.firstN(5).print();
source_url = CsvSourceBatchOp()\
.setFilePath("http://archive.ics.uci.edu/ml/machine-learning-databases"
+ "/iris/iris.data")\
.setSchemaStr("sepal_length double, sepal_width double, "
+ "petal_length double, petal_width double, category string");
source_url.firstN(5).print();
source_stream = CsvSourceStreamOp()\
.setFilePath("http://archive.ics.uci.edu/ml/machine-learning-databases"
+ "/iris/iris.data")\
.setSchemaStr("sepal_length double, sepal_width double, "
+ "petal_length double, petal_width double, category string");
source_stream.filter("sepal_length < 4.5").print();
StreamOperator.execute();
wine_url = CsvSourceBatchOp()\
.setFilePath("http://archive.ics.uci.edu/ml/machine-learning-databases"
+ "/wine-quality/winequality-white.csv")\
.setSchemaStr("fixedAcidity double,volatileAcidity double,citricAcid double,"
+ "residualSugar double, chlorides double,freeSulfurDioxide double,"
+ "totalSulfurDioxide double,density double, pH double,"
+ "sulphates double,alcohol double,quality double")\
.setFieldDelimiter(";")\
.setIgnoreFirstLine(True);
wine_url.firstN(5).print();
#c_2_1_2
oss = OssFileSystem(OSS_END_POINT,OSS_BUCKET_NAME,OSS_ACCESS_ID,OSS_ACCESS_KEY);
filePaths = [FilePath(LOCAL_DIR + "iris.csv"),
FilePath(HDFS_URI + "user/yangxu/alink/data/temp/iris.csv"),
FilePath(OSS_PREFIX_URI + "alink/data/temp/iris.csv", oss)]
for filePath in filePaths :
print(filePath.getPathStr())
CsvSourceBatchOp()\
.setFilePath(IRIS_HTTP_URL)\
.setSchemaStr(IRIS_SCHEMA_STR)\
.link(
CsvSinkBatchOp().setFilePath(filePath).setOverwriteSink(True)
);
BatchOperator.execute();
CsvSourceBatchOp()\
.setFilePath(filePath)\
.setSchemaStr(IRIS_SCHEMA_STR)\
.firstN(3)\
.print()
for filePath in filePaths :
print(filePath.getPathStr())
CsvSourceStreamOp()\
.setFilePath(IRIS_HTTP_URL)\
.setSchemaStr(IRIS_SCHEMA_STR)\
.link(
CsvSinkStreamOp().setFilePath(filePath).setOverwriteSink(True)
);
StreamOperator.execute();
CsvSourceStreamOp()\
.setFilePath(filePath)\
.setSchemaStr(IRIS_SCHEMA_STR)\
.filter("sepal_length < 4.5")\
.print()
StreamOperator.execute();
#c_2_2
TsvSourceBatchOp()\
.setFilePath("http://files.grouplens.org/datasets/movielens/ml-100k/u.data")\
.setSchemaStr("user_id long, item_id long, rating float, ts long")\
.firstN(5)\
.print();
TextSourceBatchOp()\
.setFilePath(LOCAL_DIR + "iris.scale")\
.firstN(5)\
.print();
LibSvmSourceBatchOp()\
.setFilePath(LOCAL_DIR + "iris.scale")\
.firstN(5)\
.lazyPrint(-1, "< read by LibSvmSourceBatchOp >")\
.link(
VectorNormalizeBatchOp().setSelectedCol("features")
)\
.lazyPrint(-1, "< after VectorNormalize >")
BatchOperator.execute()
#c_2_3_1
oss = OssFileSystem(OSS_END_POINT,OSS_BUCKET_NAME,OSS_ACCESS_ID,OSS_ACCESS_KEY);
filePaths = [FilePath(LOCAL_DIR + "iris.ak"),
FilePath(HDFS_URI + "user/yangxu/alink/data/temp/iris.ak"),
FilePath(OSS_PREFIX_URI + "alink/data/temp/iris.ak", oss)]
for filePath in filePaths :
print(filePath.getPathStr())
CsvSourceBatchOp()\
.setFilePath(IRIS_HTTP_URL)\
.setSchemaStr(IRIS_SCHEMA_STR)\
.link(
AkSinkBatchOp().setFilePath(filePath).setOverwriteSink(True)
);
BatchOperator.execute();
AkSourceBatchOp()\
.setFilePath(filePath)\
.firstN(3)\
.print()
for filePath in filePaths :
print(filePath.getPathStr())
CsvSourceStreamOp()\
.setFilePath(IRIS_HTTP_URL)\
.setSchemaStr(IRIS_SCHEMA_STR)\
.link(
AkSinkStreamOp().setFilePath(filePath).setOverwriteSink(True)
);
StreamOperator.execute();
AkSourceStreamOp()\
.setFilePath(filePath)\
.filter("sepal_length < 4.5")\
.print()
StreamOperator.execute();
#c_2_4
ParquetSourceBatchOp()\
.setFilePath("https://alink-test-data.oss-cn-hangzhou.aliyuncs.com/iris.parquet")\
.lazyPrintStatistics()\
.print()
ParquetSourceStreamOp()\
.setFilePath("https://alink-test-data.oss-cn-hangzhou.aliyuncs.com/iris.parquet")\
.print()
StreamOperator.execute()
#c_2_5_1
import datetime
@udf(input_types=[AlinkDataTypes.BIGINT()], result_type=AlinkDataTypes.TIMESTAMP(3))
def from_unix_timestamp(ts):
return datetime.datetime.fromtimestamp(ts)
source = TsvSourceStreamOp()\
.setFilePath("http://files.grouplens.org/datasets/movielens/ml-100k/u.data")\
.setSchemaStr("user_id long, item_id long, rating float, ts long")\
.link(
UDFStreamOp()\
.setFunc(from_unix_timestamp)\
.setSelectedCols(["ts"])\
.setOutputCol("ts")
)
source.link(
Export2FileSinkStreamOp()\
.setFilePath(LOCAL_DIR + "with_local_time")\
.setWindowTime(5)\
.setOverwriteSink(True)
)
source.link(
AkSinkStreamOp()\
.setFilePath(LOCAL_DIR + "ratings.ak")\
.setOverwriteSink(True)
)
StreamOperator.execute()
AkSourceBatchOp()\
.setFilePath(LOCAL_DIR + "with_local_time")\
.lazyPrintStatistics("Statistics for data in the folder 'with_local_time' : ")
BatchOperator.execute()
#c_2_5_2
AkSourceBatchOp()\
.setFilePath(LOCAL_DIR + "ratings.ak")\
.orderBy("ts", 1000000)\
.lazyPrintStatistics("Statistics for data in the file 'ratings.ak' : ")\
.link(
AkSinkBatchOp()\
.setFilePath(LOCAL_DIR + "ratings_ordered.ak")\
.setOverwriteSink(True)
)
BatchOperator.execute()
AkSourceStreamOp()\
.setFilePath(LOCAL_DIR + "ratings_ordered.ak")\
.link(
Export2FileSinkStreamOp()\
.setFilePath(LOCAL_DIR + "with_ts_time")\
.setTimeCol("ts")\
.setWindowTime(3600 * 24)\
.setOverwriteSink(True)
)
StreamOperator.execute()
AkSourceBatchOp()\
.setFilePath(LOCAL_DIR + "with_ts_time")\
.lazyPrintStatistics("Statistics for data in the folder 'with_ts_time' : ")
AkSourceBatchOp()\
.setFilePath(LOCAL_DIR + "with_ts_time" + os.sep + "199709210000000")\
.print()
#c_2_5_3
AkSourceStreamOp()\
.setFilePath(LOCAL_DIR + "ratings_ordered.ak")\
.link(
Export2FileSinkStreamOp()\
.setFilePath(LOCAL_DIR + "data_with_partitions")\
.setTimeCol("ts")\
.setWindowTime(3600 * 24)\
.setPartitionsFormat("year=yyyy/month=MM/day=dd")\
.setOverwriteSink(True)
)
StreamOperator.execute()
#c_2_6
AkSourceBatchOp()\
.setFilePath(LOCAL_DIR + "data_with_partitions")\
.lazyPrintStatistics("Statistics for data in the folder 'data_with_partitions' : ")
AkSourceBatchOp()\
.setFilePath(LOCAL_DIR + "data_with_partitions")\
.setPartitions("year='1997'")\
.lazyPrintStatistics("Statistics for data of year=1997 : ")
AkSourceBatchOp()\
.setFilePath(LOCAL_DIR + "data_with_partitions")\
.setPartitions("year='1997' AND month>='10'")\
.lazyPrintStatistics("Statistics for data of year 1997's last 3 months : ")
BatchOperator.execute()
AkSourceBatchOp()\
.setFilePath(LOCAL_DIR + "data_with_partitions")\
.setPartitions("day LIKE '3_'")\
.lazyPrint(10, ">>> day LIKE '3_'")
AkSourceBatchOp()\
.setFilePath(LOCAL_DIR + "data_with_partitions")\
.setPartitions("day LIKE '%3%'")\
.print(10, ">>> day LIKE '%3%'")
AkSourceStreamOp()\
.setFilePath(LOCAL_DIR + "data_with_partitions")\
.setPartitions("day IN('01', '02')")\
.sample(0.001)\
.print()
StreamOperator.execute()
```python
```