Alink教程(Java版)
Alink教程(Python版)

读取MaxCompute数据表,写入Redis

MaxCompute

MaxCompute EndPoint: https://help.aliyun.com/document_detail/34951.html,这里选择的是外网Endpoint: http://service.cn-beijing.maxcompute.aliyun.com/api

Redis

Redis 文档地址为:https://help.aliyun.com/product/26340.html

  1. 创建实例

  1. 这里创建按量付费本地盘实例
  2. 将 Redis 实例创建进和 Flink 全托管相同的网段内
  3. 获取专有网络地址

  1. 设置内网网段进白名单,使 Flink 集群可以访问到 Redis 实例

实例

  1. 代码
package org.example;

import org.apache.flink.ml.api.misc.param.Params;
import org.apache.flink.table.catalog.ObjectPath;

import com.alibaba.alink.common.io.catalog.MySqlCatalog;
import com.alibaba.alink.common.io.catalog.OdpsCatalog;
import com.alibaba.alink.operator.batch.BatchOperator;
import com.alibaba.alink.operator.batch.sink.CatalogSinkBatchOp;
import com.alibaba.alink.operator.batch.sink.RedisSinkBatchOp;
import com.alibaba.alink.operator.batch.source.CatalogSourceBatchOp;
import com.alibaba.alink.params.io.HasCatalogObject.CatalogObject;
import com.alibaba.alink.params.shared.HasOverwriteSink;

public class HelloAlink {

	public static void main(String[] args) throws Exception {
		runSinkRedis();
	}

	public static void runSinkRedis() throws Exception {
		// 填写好 EndPoint,AccessKeyID,AccessKeySecret, Project。 RunningProject 填写和 Project 相同
		OdpsCatalog odpsCatalog = new OdpsCatalog(
			"odps", "default",
			"0.36.4-public", "AccessKeyID", "AccessKeySecret",
			"Project",
			"EndPoint",
			"Project"
		);

		// schema: sepal_length double, sepal_width double, petal_length double, petal_width double, category string
		CatalogSourceBatchOp source = new CatalogSourceBatchOp()
			.setCatalogObject(
				new CatalogObject(
					odpsCatalog,
					// 填写好 Project
					new ObjectPath("Project", "test_alink_iris"),
					new Params().set(HasOverwriteSink.OVERWRITE_SINK, true)
				)
			);

		RedisSinkBatchOp sink = new RedisSinkBatchOp()
			.setRedisIPs("ip:port")
			.setRedisPassword("PASSWORD")
			.setClusterMode(true)
			.setDatabaseIndex(0)
			.setKeyCols("sepal_length", "sepal_width", "petal_length", "petal_width")
			.setValueCols("category")
			.setPluginVersion("2.9.0");

		source.link(sink);

		BatchOperator.execute();
	}
}
  1. 特性
  • 支持集群模式
  • 支持选择数据库索引号
  • 当前存储为 byte[], 未来版本中会进行扩展