文档《Catalog读入》中介绍了Catalog描述数据库的属性和数据库的位置的功能,除此之外,Hive和Odps数据库还支持分区,Alink中也可以在Catalog设置分区,下面将介绍Python和Java中如何实现。
Odps对分区的支持方式见文档《Odps分区》,用Catlog读取Odps分区步骤如下:
String catalogName = "*";
String defaultDatabase = "*";
String odpsVersion = "0.36.4";
String accessId = "*";
String accessKey = "*";
String project = "*";
String endPoint = "*";
String tableNam = "*";
// 下载Odps插件
AlinkGlobalConfiguration.setPluginDir("~/alink_plugin/");
PluginDownloader pluginDownloader = AlinkGlobalConfiguration.getPluginDownloader();
// 定义OdpsCatalog。添加用户、数据库信息
Params parameter = new Params()
.set(OdpsCatalogParams.ACCESS_ID, accessId)
.set(OdpsCatalogParams.ACCESS_KEY, accessKey)
.set(OdpsCatalogParams.PROJECT, project)
.set(OdpsCatalogParams.END_POINT, endPoint)
.set(OdpsCatalogParams.RUNNING_PROJECT, project)
.set(OdpsCatalogParams.CATALOG_NAME,catalogName)
.set(OdpsCatalogParams.DEFAULT_DATABASE,defaultDatabase)
.set(OdpsCatalogParams.PLUGIN_VERSION, odpsVersion);
OdpsCatalog odpsCatalog = new OdpsCatalog(parameter);
// 定义CatalogObject
CatalogObject catalogObject = new CatalogObject(
odpsCatalog
,new ObjectPath(project,tableNam)
// 定义分区
,new Params().set(OdpsSourceParams.PARTITIONS, "ds=01/dt=2018,ds=02/dt=2018")
);
// 读取数据
CatalogSourceBatchOp sourceBatchOp = new CatalogSourceBatchOp().setCatalogObject(catalogObject);
sourceBatchOp.print();注意,PyAlink中Params的写法与Java不同,直接设置“partitions”,而非OdpsSourceParams.PARTITIONS。
from pyalink.alink import *
useLocalEnv(1)
AlinkGlobalConfiguration.setPluginDir("~/alink_plugin")
catalogName = "*"
defaultDatabase = "*"
odpsVersion = "0.36.4"
accessId = "*"
accessKey = "*"
project = "*"
endPoint = "*"
tableNam = "*"
parameter = Params() \
.set("accessId", accessId) \
.set("accessKey", accessKey) \
.set("project", project) \
.set("endPoint", endPoint) \
.set("runningProject", project) \
.set("catalogName",catalogName) \
.set("defaultDatabase",defaultDatabase) \
.set("pluginVersion", odpsVersion)
odpsCatalog = OdpsCatalog(parameter)
catalogObject = CatalogObject(
odpsCatalog, \
ObjectPath(project,tableNam), \
Params().set("partitions", "ds=01/dt=2018,ds=02/dt=2018") \
)
sourceBatchOp = CatalogSourceBatchOp().setCatalogObject(catalogObject)
sourceBatchOp.print()