MaxCompute EndPoint: https://help.aliyun.com/document_detail/34951.html,这里选择的是外网Endpoint: http://service.cn-beijing.maxcompute.aliyun.com/api
Redis 文档地址为:https://help.aliyun.com/product/26340.html


package org.example;
import org.apache.flink.ml.api.misc.param.Params;
import org.apache.flink.table.catalog.ObjectPath;
import com.alibaba.alink.common.io.catalog.MySqlCatalog;
import com.alibaba.alink.common.io.catalog.OdpsCatalog;
import com.alibaba.alink.operator.batch.BatchOperator;
import com.alibaba.alink.operator.batch.sink.CatalogSinkBatchOp;
import com.alibaba.alink.operator.batch.sink.RedisSinkBatchOp;
import com.alibaba.alink.operator.batch.source.CatalogSourceBatchOp;
import com.alibaba.alink.params.io.HasCatalogObject.CatalogObject;
import com.alibaba.alink.params.shared.HasOverwriteSink;
public class HelloAlink {
public static void main(String[] args) throws Exception {
runSinkRedis();
}
public static void runSinkRedis() throws Exception {
// 填写好 EndPoint,AccessKeyID,AccessKeySecret, Project。 RunningProject 填写和 Project 相同
OdpsCatalog odpsCatalog = new OdpsCatalog(
"odps", "default",
"0.36.4-public", "AccessKeyID", "AccessKeySecret",
"Project",
"EndPoint",
"Project"
);
// schema: sepal_length double, sepal_width double, petal_length double, petal_width double, category string
CatalogSourceBatchOp source = new CatalogSourceBatchOp()
.setCatalogObject(
new CatalogObject(
odpsCatalog,
// 填写好 Project
new ObjectPath("Project", "test_alink_iris"),
new Params().set(HasOverwriteSink.OVERWRITE_SINK, true)
)
);
RedisSinkBatchOp sink = new RedisSinkBatchOp()
.setRedisIPs("ip:port")
.setRedisPassword("PASSWORD")
.setClusterMode(true)
.setDatabaseIndex(0)
.setKeyCols("sepal_length", "sepal_width", "petal_length", "petal_width")
.setValueCols("category")
.setPluginVersion("2.9.0");
source.link(sink);
BatchOperator.execute();
}
}