Java 类名:com.alibaba.alink.operator.stream.dataproc.LookupRedisRowStreamOp
Python 类名:LookupRedisRowStreamOp
支持数据查找功能,支持多个key的查找,并将查找后的结果中的value列添加到待查询数据后面。
需要和RedisRowSinkBatchOp或RedisRowSinkStreamOp组件配合使用,该组件用来保存数据。
名称 | 中文名称 | 描述 | 类型 | 是否必须? | 取值范围 | 默认值 |
---|---|---|---|---|---|---|
outputSchemaStr | Schema | Schema。格式为“colname coltype[, colname2, coltype2[, …]]”,例如“f0 string, f1 bigint, f2 double” | String | ✓ | ||
pluginVersion | 插件版本号 | 插件版本号 | String | ✓ | ||
selectedCols | 选择的列名 | 计算列对应的列名列表 | String[] | ✓ | ||
clusterMode | 集群模式 | 是集群模式还是单机模式 | Boolean | false | ||
databaseIndex | 数据库索引号 | 数据库索引号 | Long | |||
pipelineSize | 流水线大小 | Redis 发送命令流水线的大小 | Integer | 1 | ||
redisIPs | Redis IP | Redis 集群的 IP/端口 | String[] | |||
redisPassword | Redis 密码 | Redis 服务器密码 | String | |||
reservedCols | 算法保留列名 | 算法保留列 | String[] | null | ||
timeout | 超时 | 关闭连接的超时时间 | Integer |
** 以下代码仅用于示意,可能需要修改部分代码或者配置环境后才能正常运行!**
df = pd.DataFrame([ ["id001", 123, 45.6, "str"] ]) inOp = BatchOperator.fromDataframe(df, schemaStr='id string, col0 bigint, col1 double, col2 string') redisIP = "*:*" RedisRowSinkBatchOp()\ .setRedisIPs([redisIP])\ .setKeyCols(["id"])\ .setPluginVersion("2.9.0")\ .setValueCols(["col0", "col1", "col2"])\ .linkFrom(inOp) BatchOperator.execute() df2 = pd.DataFrame([ ["id001"] ]) needToLookup = StreamOperator.fromDataframe(df2, schemaStr="id string") LookupRedisRowStreamOp()\ .setRedisIPs([redisIP])\ .setPluginVersion("2.9.0")\ .setSelectedCols(["id"])\ .setOutputSchemaStr("col0 bigint, col1 double, col2 string")\ .linkFrom(needToLookup)\ .print() StreamOperator.execute()
import com.alibaba.alink.common.AlinkGlobalConfiguration; import com.alibaba.alink.operator.batch.BatchOperator; import com.alibaba.alink.operator.stream.StreamOperator; import com.alibaba.alink.operator.stream.dataproc.LookupRedisRowStreamOp; import com.alibaba.alink.operator.batch.sink.RedisRowSinkBatchOp; import com.alibaba.alink.operator.stream.source.MemSourceStreamOp; import com.alibaba.alink.testutil.AlinkTestBase; import org.junit.Test; import java.util.Collections; public class LookupRedisRowStreamOpTest extends AlinkTestBase { @Test public void map() throws Exception { String redisIP = "*:*"; MemSourceBatchOp memSourceBatchOp = new MemSourceBatchOp( Collections.singletonList(Row.of("id001", 123L, 45.6, "str")), "id string, col0 bigint, col1 double, col2 string" ); new RedisRowSinkBatchOp() .setRedisIPs(redisIP) .setKeyCols("id") .setPluginVersion("2.9.0") .setValueCols("col0", "col1", "col2") .linkFrom(memSourceBatchOp); BatchOperator.execute(); MemSourceStreamOp needToLookup = new MemSourceStreamOp( Collections.singletonList(Row.of("id001")), "id string" ); new LookupRedisRowStreamOp() .setRedisIPs(redisIP) .setPluginVersion("2.9.0") .setSelectedCols("id") .setOutputSchemaStr("col0 bigint, col1 double, col2 string") .linkFrom(needToLookup) .lazyPrint(10); StreamOperator.execute(); } }
id | col0 | col1 | col2 |
---|---|---|---|
id001 | 123 | 45.6000 | str |