Alink教程(Java版)
Alink教程(Python版)
该文档涉及的组件

Catalog中设置数据库分区【Alink使用技巧】

文档《Catalog读入》中介绍了Catalog描述数据库的属性和数据库的位置的功能,除此之外,Hive和Odps数据库还支持分区,Alink中也可以在Catalog设置分区,下面将介绍Python和Java中如何实现。

Odps设置分区

Odps对分区的支持方式见文档《Odps分区》,用Catlog读取Odps分区步骤如下:

  1. 定义OdpsCatalog。添加用户、数据库信息
  2. 定义CatalogObject。其中ObjectPath参数第一个参数是project名,第二个是table名
  3. 定义分区,因为分区是 OdpsSourceParams中的参数,不是OdpsCatalogParams中的参数,所以需要在这里加上。

Java代码

String catalogName = "*";
String defaultDatabase = "*";
String odpsVersion = "0.36.4";
String accessId = "*";
String accessKey = "*";
String project = "*";
String endPoint = "*";
String tableNam = "*";

// 下载Odps插件
AlinkGlobalConfiguration.setPluginDir("~/alink_plugin/");
PluginDownloader pluginDownloader = AlinkGlobalConfiguration.getPluginDownloader();

// 定义OdpsCatalog。添加用户、数据库信息
Params parameter = new Params()
	.set(OdpsCatalogParams.ACCESS_ID, accessId)
	.set(OdpsCatalogParams.ACCESS_KEY, accessKey)
	.set(OdpsCatalogParams.PROJECT, project)
	.set(OdpsCatalogParams.END_POINT, endPoint)
	.set(OdpsCatalogParams.RUNNING_PROJECT, project)
	.set(OdpsCatalogParams.CATALOG_NAME,catalogName)
	.set(OdpsCatalogParams.DEFAULT_DATABASE,defaultDatabase)
	.set(OdpsCatalogParams.PLUGIN_VERSION, odpsVersion);
OdpsCatalog odpsCatalog = new OdpsCatalog(parameter);

// 定义CatalogObject
CatalogObject catalogObject = new CatalogObject(
	odpsCatalog
	,new ObjectPath(project,tableNam)
    // 定义分区
	,new Params().set(OdpsSourceParams.PARTITIONS, "ds=01/dt=2018,ds=02/dt=2018")
);

// 读取数据
CatalogSourceBatchOp sourceBatchOp = new CatalogSourceBatchOp().setCatalogObject(catalogObject);
sourceBatchOp.print();

Python代码

注意,PyAlink中Params的写法与Java不同,直接设置“partitions”,而非OdpsSourceParams.PARTITIONS。

from pyalink.alink import *
useLocalEnv(1)

AlinkGlobalConfiguration.setPluginDir("~/alink_plugin")

catalogName = "*"
defaultDatabase = "*"
odpsVersion = "0.36.4"
accessId = "*"
accessKey = "*"
project = "*"
endPoint = "*"
tableNam = "*"

parameter = Params() \
.set("accessId", accessId) \
.set("accessKey", accessKey) \
.set("project", project) \
.set("endPoint", endPoint) \
.set("runningProject", project) \
.set("catalogName",catalogName) \
.set("defaultDatabase",defaultDatabase) \
.set("pluginVersion", odpsVersion)

odpsCatalog = OdpsCatalog(parameter)
catalogObject = CatalogObject(
odpsCatalog, \
ObjectPath(project,tableNam), \
Params().set("partitions", "ds=01/dt=2018,ds=02/dt=2018") \
)
sourceBatchOp = CatalogSourceBatchOp().setCatalogObject(catalogObject)
sourceBatchOp.print()