Java 类名:com.alibaba.alink.operator.stream.sink.RedisStringSinkStreamOp
Python 类名:RedisStringSinkStreamOp
将一个流式数据,(单列String类型键值)按行写到Redis里。
名称 | 中文名称 | 描述 | 类型 | 是否必须? | 取值范围 | 默认值 |
---|---|---|---|---|---|---|
pluginVersion | 插件版本号 | 插件版本号 | String | ✓ | ||
clusterMode | 集群模式 | 是集群模式还是单机模式 | Boolean | false | ||
databaseIndex | 数据库索引号 | 数据库索引号 | Long | |||
keyCol | 单键列 | 单键列 | String | null | ||
pipelineSize | 流水线大小 | Redis 发送命令流水线的大小 | Integer | 1 | ||
redisIPs | Redis IP | Redis 集群的 IP/端口 | String[] | |||
redisPassword | Redis 密码 | Redis 服务器密码 | String | |||
timeout | 超时 | 关闭连接的超时时间 | Integer | |||
valueCol | 单值列 | 单值列 | String | null |
** 以下代码仅用于示意,可能需要修改部分代码或者配置环境后才能正常运行!**
redisIP = "*" redisPort = 0 df = pd.DataFrame([ ["football", "1.0"], ["football", "2.0"], ["football", "3.0"]]) batchData = StreamOperator.fromDataframe(df, schemaStr='id string,val double') batchData.link(RedisStringSinkStreamOp()\ .setRedisIPs(redisIP)\ .setKeyCol("id")\ .setValueCol("val")\ .setPluginVersion("2.9.0")) BatchOperator.execute()
package com.alibaba.alink.operator.batch.sink; import org.apache.flink.types.Row; import com.alibaba.alink.operator.stream.StreamOperator; import com.alibaba.alink.operator.stream.source.MemSourceStreamOp; import com.alibaba.alink.operator.stream.sink.RedisStringSinkStreamOp; import com.alibaba.alink.testutil.AlinkTestBase; import org.junit.Test; import java.util.Arrays; import java.util.List; public class RedisStringSinkStreamOpTest extends AlinkTestBase { @Test public void test() throws Exception { String redisIP = "127.0.0.1:6379"; int redisPort = 0; List <Row> df = Arrays.asList( Row.of("football", "1.0"), Row.of("football", "2.0") ); StreamOperator <?> data = new MemSourceStreamOp(df, "id string,val string"); RedisStringSinkStreamOp sink = new RedisStringSinkStreamOp() .setRedisIPs(redisIP) .setKeyCol("id") .setValueCol("val") .setPluginVersion("2.9.0"); data.link(sink); StreamOperator.execute(); } }