文档《Catalog读入》中介绍了Catalog描述数据库的属性和数据库的位置的功能,除此之外,Hive和Odps数据库还支持分区,Alink中也可以在Catalog设置分区,下面将介绍Python和Java中如何实现。
Odps对分区的支持方式见文档《Odps分区》,用Catlog读取Odps分区步骤如下:
String catalogName = "*"; String defaultDatabase = "*"; String odpsVersion = "0.36.4"; String accessId = "*"; String accessKey = "*"; String project = "*"; String endPoint = "*"; String tableNam = "*"; // 下载Odps插件 AlinkGlobalConfiguration.setPluginDir("~/alink_plugin/"); PluginDownloader pluginDownloader = AlinkGlobalConfiguration.getPluginDownloader(); // 定义OdpsCatalog。添加用户、数据库信息 Params parameter = new Params() .set(OdpsCatalogParams.ACCESS_ID, accessId) .set(OdpsCatalogParams.ACCESS_KEY, accessKey) .set(OdpsCatalogParams.PROJECT, project) .set(OdpsCatalogParams.END_POINT, endPoint) .set(OdpsCatalogParams.RUNNING_PROJECT, project) .set(OdpsCatalogParams.CATALOG_NAME,catalogName) .set(OdpsCatalogParams.DEFAULT_DATABASE,defaultDatabase) .set(OdpsCatalogParams.PLUGIN_VERSION, odpsVersion); OdpsCatalog odpsCatalog = new OdpsCatalog(parameter); // 定义CatalogObject CatalogObject catalogObject = new CatalogObject( odpsCatalog ,new ObjectPath(project,tableNam) // 定义分区 ,new Params().set(OdpsSourceParams.PARTITIONS, "ds=01/dt=2018,ds=02/dt=2018") ); // 读取数据 CatalogSourceBatchOp sourceBatchOp = new CatalogSourceBatchOp().setCatalogObject(catalogObject); sourceBatchOp.print();
注意,PyAlink中Params的写法与Java不同,直接设置“partitions”,而非OdpsSourceParams.PARTITIONS。
from pyalink.alink import * useLocalEnv(1) AlinkGlobalConfiguration.setPluginDir("~/alink_plugin") catalogName = "*" defaultDatabase = "*" odpsVersion = "0.36.4" accessId = "*" accessKey = "*" project = "*" endPoint = "*" tableNam = "*" parameter = Params() \ .set("accessId", accessId) \ .set("accessKey", accessKey) \ .set("project", project) \ .set("endPoint", endPoint) \ .set("runningProject", project) \ .set("catalogName",catalogName) \ .set("defaultDatabase",defaultDatabase) \ .set("pluginVersion", odpsVersion) odpsCatalog = OdpsCatalog(parameter) catalogObject = CatalogObject( odpsCatalog, \ ObjectPath(project,tableNam), \ Params().set("partitions", "ds=01/dt=2018,ds=02/dt=2018") \ ) sourceBatchOp = CatalogSourceBatchOp().setCatalogObject(catalogObject) sourceBatchOp.print()