Alink教程(Python版)

第3章 文件系统与数据文件

本章包括下面各节:
3.1 文件系统简介
3.1.1 本地文件系统
3.1.2 Hadoop文件系统
3.1.3 阿里云OSS文件系统
3.2 数据文件的读入与导出
3.2.1 CSV格式
3.2.2 TSV格式、LibSVM格式和Text格式
3.2.3 AK格式

详细内容请阅读纸质书《Alink权威指南:基于Flink的机器学习实例入门(Python)》,这里为本章对应的示例代码。

from pyalink.alink import *
useLocalEnv(1)

from utils import *
import os
import pandas as pd

pd.set_option('display.max_colwidth', 1000)

DATA_DIR = ROOT_DIR + "temp" + os.sep
LOCAL_DIR = DATA_DIR

OSS_END_POINT = "*";
OSS_BUCKET_NAME = "*";
OSS_ACCESS_ID = "*";
OSS_ACCESS_KEY = "*";

OSS_PREFIX_URI = "oss://" + OSS_BUCKET_NAME + "/";

HDFS_URI = "hdfs://10.*.*.*:9000/";

IRIS_HTTP_URL = "http://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data";

IRIS_SCHEMA_STR = "sepal_length double, sepal_width double, petal_length double, petal_width double, category string";
#c_1_1
import datetime

local = LocalFileSystem();
print(local.getHomeDirectory());
print(local.getKind());

if not local.exists(LOCAL_DIR) :
    local.mkdirs(LOCAL_DIR);

for status in local.listStatus(LOCAL_DIR) :
    print(str(status.getPath().toUri()) + " \t" + str(status.getLen()) + " \t" 
          + str(datetime.datetime.utcfromtimestamp(status.getModificationTime()/1000)))

path = LOCAL_DIR + "hello.txt";

fos = local.create(path, True)
fos.write(b'Hello Alink!')
fos.flush()
fos.close()

status = local.getFileStatus(path)
print(status)
print(status.getLen())
print(datetime.datetime.utcfromtimestamp(status.getModificationTime()/1000));

nBytes = local.getFileStatus(path).getLen()
fis = local.open(path)
(numBytesRead, b) = fis.read(nBytes, 0)
fis.close()
print(b)
#c_1_2_1
hdfs = HadoopFileSystem(HDFS_URI);
print(hdfs.getKind());

hdfsDir = HDFS_URI + "user/yangxu/alink/data/temp/";

if not hdfs.exists(hdfsDir) :
    hdfs.mkdirs(hdfsDir);

for status in hdfs.listStatus(hdfsDir) :
    print(str(status.getPath().toUri()) + " \t" + str(status.getLen()) + " \t" 
          + str(datetime.datetime.utcfromtimestamp(status.getModificationTime()/1000)))

path = hdfsDir + "hello.txt";

fos = hdfs.create(path, True)
fos.write(b'Hello Alink!')
fos.flush()
fos.close()

nBytes = hdfs.getFileStatus(path).getLen()
fis = hdfs.open(path)
(numBytesRead, bytesRead) = fis.read(nBytes, 0)
print(bytesRead)
def copy(fs_from, path_from, fs_to, path_to) :
    nBytes = fs_from.getFileStatus(path_from).getLen()
    fis = fs_from.open(path_from)
    (numBytesRead, bytesRead) = fis.read(nBytes, 0)
    fis.close()
    fos = fs_to.create(path_to, True)
    fos.write(bytesRead)
    fos.flush()
    fos.close()
#c_1_2_2
local = LocalFileSystem();

hdfs = HadoopFileSystem(HDFS_URI);

copy(hdfs, HDFS_URI + "user/yangxu/alink/data/temp/hello.txt",
     local, LOCAL_DIR + "hello_1.txt");

copy(local, LOCAL_DIR + "hello_1.txt", 
     hdfs, HDFS_URI + "user/yangxu/alink/data/temp/hello_2.txt")

for status in hdfs.listStatus(HDFS_URI + "user/yangxu/alink/data/temp/") :
    print(str(status.getPath().toUri()) + " \t" + str(status.getLen()) + " \t" 
          + str(datetime.datetime.utcfromtimestamp(status.getModificationTime()/1000)))
#c_1_3_1
oss = OssFileSystem(OSS_END_POINT,OSS_BUCKET_NAME,OSS_ACCESS_ID,OSS_ACCESS_KEY);

print(oss.getKind());

ossDir = OSS_PREFIX_URI + "alink/data/temp/";

if not oss.exists(ossDir) :
    oss.mkdirs(ossDir);

for status in oss.listStatus(ossDir) :
    print(str(status.getPath().toUri()) + " \t" + str(status.getLen()) + " \t" 
          + str(datetime.datetime.utcfromtimestamp(status.getModificationTime()/1000)))

path = ossDir + "hello.txt";

fos = oss.create(path, True)
fos.write(b'Hello Alink!')
fos.flush()
fos.close()

nBytes = oss.getFileStatus(path).getLen()
fis = oss.open(path)
(numBytesRead, bytesRead) = fis.read(nBytes, 0)
print(bytesRead)
#c_1_3_2

local = LocalFileSystem();

oss = OssFileSystem(OSS_END_POINT,OSS_BUCKET_NAME,OSS_ACCESS_ID,OSS_ACCESS_KEY);

copy(oss, OSS_PREFIX_URI + "alink/data/temp/hello.txt",
     local, LOCAL_DIR + "hello_1.txt");

copy(local, LOCAL_DIR + "hello_1.txt",
     oss, OSS_PREFIX_URI + "alink/data/temp/hello_2.txt");

for status in hdfs.listStatus(HDFS_URI + "user/yangxu/alink/data/temp/") :
    print(str(status.getPath().toUri()) + " \t" + str(status.getLen()) + " \t" 
          + str(datetime.datetime.utcfromtimestamp(status.getModificationTime()/1000)))
help(CsvSourceBatchOp.setFilePath)
#c_2_1_1
source_local = CsvSourceBatchOp()\
.setFilePath(LOCAL_DIR + "iris.data")\
.setSchemaStr("sepal_length double, sepal_width double, "
              + "petal_length double, petal_width double, category string");

source_local.firstN(5).print();

source_url = CsvSourceBatchOp()\
.setFilePath("http://archive.ics.uci.edu/ml/machine-learning-databases"
             + "/iris/iris.data")\
.setSchemaStr("sepal_length double, sepal_width double, "
              + "petal_length double, petal_width double, category string");

source_url.firstN(5).print();

source_stream = CsvSourceStreamOp()\
.setFilePath("http://archive.ics.uci.edu/ml/machine-learning-databases"
             + "/iris/iris.data")\
.setSchemaStr("sepal_length double, sepal_width double, "
              + "petal_length double, petal_width double, category string");

source_stream.filter("sepal_length < 4.5").print();
StreamOperator.execute();

wine_url = CsvSourceBatchOp()\
.setFilePath("http://archive.ics.uci.edu/ml/machine-learning-databases"
             + "/wine-quality/winequality-white.csv")\
.setSchemaStr("fixedAcidity double,volatileAcidity double,citricAcid double,"
              + "residualSugar double, chlorides double,freeSulfurDioxide double,"
              + "totalSulfurDioxide double,density double, pH double,"
              + "sulphates double,alcohol double,quality double")\
.setFieldDelimiter(";")\
.setIgnoreFirstLine(True);

wine_url.firstN(5).print();
#c_2_1_2
oss = OssFileSystem(OSS_END_POINT,OSS_BUCKET_NAME,OSS_ACCESS_ID,OSS_ACCESS_KEY);

filePaths = [FilePath(LOCAL_DIR + "iris.csv"), 
             FilePath(HDFS_URI + "user/yangxu/alink/data/temp/iris.csv"),
             FilePath(OSS_PREFIX_URI + "alink/data/temp/iris.csv", oss)]

for filePath in filePaths :
    print(filePath.getPathStr())
    CsvSourceBatchOp()\
        .setFilePath(IRIS_HTTP_URL)\
        .setSchemaStr(IRIS_SCHEMA_STR)\
        .link(
            CsvSinkBatchOp().setFilePath(filePath).setOverwriteSink(True)
        );
    BatchOperator.execute();

    CsvSourceBatchOp()\
        .setFilePath(filePath)\
        .setSchemaStr(IRIS_SCHEMA_STR)\
        .firstN(3)\
        .print()

for filePath in filePaths :
    print(filePath.getPathStr())
    CsvSourceStreamOp()\
        .setFilePath(IRIS_HTTP_URL)\
        .setSchemaStr(IRIS_SCHEMA_STR)\
        .link(
            CsvSinkStreamOp().setFilePath(filePath).setOverwriteSink(True)
        );
    StreamOperator.execute();

    CsvSourceStreamOp()\
        .setFilePath(filePath)\
        .setSchemaStr(IRIS_SCHEMA_STR)\
        .filter("sepal_length < 4.5")\
        .print()
    StreamOperator.execute();
#c_2_2

TsvSourceBatchOp()\
.setFilePath("http://files.grouplens.org/datasets/movielens/ml-100k/u.data")\
.setSchemaStr("user_id long, item_id long, rating float, ts long")\
.firstN(5)\
.print();

TextSourceBatchOp()\
.setFilePath(LOCAL_DIR + "iris.scale")\
.firstN(5)\
.print();

LibSvmSourceBatchOp()\
    .setFilePath(LOCAL_DIR + "iris.scale")\
    .firstN(5)\
    .lazyPrint(-1, "< read by LibSvmSourceBatchOp >")\
    .link(
        VectorNormalizeBatchOp().setSelectedCol("features")
    )\
    .lazyPrint(-1, "< after VectorNormalize >")
BatchOperator.execute()
#c_2_3_1
oss = OssFileSystem(OSS_END_POINT,OSS_BUCKET_NAME,OSS_ACCESS_ID,OSS_ACCESS_KEY);

filePaths = [FilePath(LOCAL_DIR + "iris.ak"), 
             FilePath(HDFS_URI + "user/yangxu/alink/data/temp/iris.ak"),
             FilePath(OSS_PREFIX_URI + "alink/data/temp/iris.ak", oss)]

for filePath in filePaths :
    print(filePath.getPathStr())
    CsvSourceBatchOp()\
        .setFilePath(IRIS_HTTP_URL)\
        .setSchemaStr(IRIS_SCHEMA_STR)\
        .link(
            AkSinkBatchOp().setFilePath(filePath).setOverwriteSink(True)
        );
    BatchOperator.execute();

    AkSourceBatchOp()\
        .setFilePath(filePath)\
        .firstN(3)\
        .print()

for filePath in filePaths :
    print(filePath.getPathStr())
    CsvSourceStreamOp()\
        .setFilePath(IRIS_HTTP_URL)\
        .setSchemaStr(IRIS_SCHEMA_STR)\
        .link(
            AkSinkStreamOp().setFilePath(filePath).setOverwriteSink(True)
        );
    StreamOperator.execute();

    AkSourceStreamOp()\
        .setFilePath(filePath)\
        .filter("sepal_length < 4.5")\
        .print()
    StreamOperator.execute();

#c_2_4
ParquetSourceBatchOp()\
    .setFilePath("https://alink-test-data.oss-cn-hangzhou.aliyuncs.com/iris.parquet")\
    .lazyPrintStatistics()\
    .print()
ParquetSourceStreamOp()\
    .setFilePath("https://alink-test-data.oss-cn-hangzhou.aliyuncs.com/iris.parquet")\
    .print()
StreamOperator.execute()
#c_2_5_1
import datetime

@udf(input_types=[AlinkDataTypes.BIGINT()], result_type=AlinkDataTypes.TIMESTAMP(3)) 
def from_unix_timestamp(ts):
    return datetime.datetime.fromtimestamp(ts)

source = TsvSourceStreamOp()\
    .setFilePath("http://files.grouplens.org/datasets/movielens/ml-100k/u.data")\
    .setSchemaStr("user_id long, item_id long, rating float, ts long")\
    .link(
        UDFStreamOp()\
            .setFunc(from_unix_timestamp)\
            .setSelectedCols(["ts"])\
            .setOutputCol("ts")
    )

source.link(
    Export2FileSinkStreamOp()\
        .setFilePath(LOCAL_DIR + "with_local_time")\
        .setWindowTime(5)\
        .setOverwriteSink(True)
)

source.link(
    AkSinkStreamOp()\
        .setFilePath(LOCAL_DIR + "ratings.ak")\
        .setOverwriteSink(True)
)
StreamOperator.execute()
AkSourceBatchOp()\
    .setFilePath(LOCAL_DIR + "with_local_time")\
    .lazyPrintStatistics("Statistics for data in the folder 'with_local_time' : ")
BatchOperator.execute()
#c_2_5_2
AkSourceBatchOp()\
    .setFilePath(LOCAL_DIR + "ratings.ak")\
    .orderBy("ts", 1000000)\
    .lazyPrintStatistics("Statistics for data in the file 'ratings.ak' : ")\
    .link(
        AkSinkBatchOp()\
            .setFilePath(LOCAL_DIR + "ratings_ordered.ak")\
            .setOverwriteSink(True)
    )
BatchOperator.execute()
AkSourceStreamOp()\
    .setFilePath(LOCAL_DIR + "ratings_ordered.ak")\
    .link(
        Export2FileSinkStreamOp()\
            .setFilePath(LOCAL_DIR + "with_ts_time")\
            .setTimeCol("ts")\
            .setWindowTime(3600 * 24)\
            .setOverwriteSink(True)
    )
StreamOperator.execute()
AkSourceBatchOp()\
    .setFilePath(LOCAL_DIR + "with_ts_time")\
    .lazyPrintStatistics("Statistics for data in the folder 'with_ts_time' : ")

AkSourceBatchOp()\
    .setFilePath(LOCAL_DIR + "with_ts_time" + os.sep + "199709210000000")\
    .print()
#c_2_5_3
AkSourceStreamOp()\
    .setFilePath(LOCAL_DIR + "ratings_ordered.ak")\
    .link(
        Export2FileSinkStreamOp()\
            .setFilePath(LOCAL_DIR + "data_with_partitions")\
            .setTimeCol("ts")\
            .setWindowTime(3600 * 24)\
            .setPartitionsFormat("year=yyyy/month=MM/day=dd")\
            .setOverwriteSink(True)
    )
StreamOperator.execute()
#c_2_6
AkSourceBatchOp()\
    .setFilePath(LOCAL_DIR + "data_with_partitions")\
    .lazyPrintStatistics("Statistics for data in the folder 'data_with_partitions' : ")

AkSourceBatchOp()\
    .setFilePath(LOCAL_DIR + "data_with_partitions")\
    .setPartitions("year='1997'")\
    .lazyPrintStatistics("Statistics for data of year=1997 : ")

AkSourceBatchOp()\
    .setFilePath(LOCAL_DIR + "data_with_partitions")\
    .setPartitions("year='1997' AND month>='10'")\
    .lazyPrintStatistics("Statistics for data of year 1997's last 3 months : ")

BatchOperator.execute()
AkSourceBatchOp()\
    .setFilePath(LOCAL_DIR + "data_with_partitions")\
    .setPartitions("day LIKE '3_'")\
    .lazyPrint(10, ">>> day LIKE '3_'")

AkSourceBatchOp()\
    .setFilePath(LOCAL_DIR + "data_with_partitions")\
    .setPartitions("day LIKE '%3%'")\
    .print(10, ">>> day LIKE '%3%'")
AkSourceStreamOp()\
    .setFilePath(LOCAL_DIR + "data_with_partitions")\
    .setPartitions("day IN('01', '02')")\
    .sample(0.001)\
    .print()
StreamOperator.execute()