Java 类名:com.alibaba.alink.operator.stream.dataproc.LookupHBaseStreamOp
Python 类名:LookupHBaseStreamOp
LookupHBaseStreamOp ,将HBase中的数据取出。
读HBase Plugin版。plugin版本为1.2.12。
读数据时,指定HBase的zookeeper地址,表名称,列簇名称。指定rowkey列(可以是多列)和要读取的数据列和格式(可以写多列)。
在使用时,需要先下载插件,详情请看https://www.yuque.com/pinshu/alink_guide/czg4cx
名称 | 中文名称 | 描述 | 类型 | 是否必须? | 取值范围 | 默认值 |
---|---|---|---|---|---|---|
familyName | 簇值 | 簇值 | String | ✓ | ||
outputSchemaStr | Schema | Schema。格式为“colname coltype[, colname2, coltype2[, …]]”,例如“f0 string, f1 bigint, f2 double” | String | ✓ | ||
pluginVersion | 插件版本号 | 插件版本号 | String | ✓ | ||
rowKeyCols | rowkey所在列 | rowkey所在列 | String[] | ✓ | ||
tableName | HBase表名称 | HBase表名称 | String | ✓ | ||
zookeeperQuorum | Zookeeper quorum | Zookeeper quorum 地址 | String | ✓ | ||
reservedCols | 算法保留列名 | 算法保留列 | String[] | null | ||
timeout | HBase RPC 超时时间 | HBase RPC 超时时间,单位毫秒 | Integer | 1000 |
** 以下代码仅用于示意,可能需要修改部分代码或者配置环境后才能正常运行!**
df = pd.DataFrame([ ["1"], ["2"] ]) data = StreamOperator.fromDataframe(df, schemaStr='userid string') lookupHBaseStreamOp = LookupHBaseStreamOp()\ .setZookeeperQuorum("localhost:2181")\ .setTableName("user")\ .setRowKeyCols("userid")\ .setFamilyName("color")\ .setPluginVersion("1.2.12")\ .setOutputSchemaStr("red long,black double,green int") lookupHBaseStreamOp.linkFrom(data).print() StreamOperator.execute()
import org.apache.flink.types.Row; import com.alibaba.alink.operator.stream.StreamOperator; import com.alibaba.alink.operator.stream.dataproc.LookupHBaseStreamOp; import com.alibaba.alink.operator.stream.source.MemSourceStreamOp; import com.alibaba.alink.testutil.AlinkTestBase; import org.junit.Test; import java.util.Arrays; import java.util.List; public class HBaseTest { @Test public void testReadStream() throws Exception { List <Row> datas = Arrays.asList( Row.of("1"), Row.of("2") ); StreamOperator op = new MemSourceStreamOp(datas, "userid string"); LookupHBaseStreamOp lookupHBaseStreamOp = new LookupHBaseStreamOp() .setZookeeperQuorum("localhost:2181") .setTableName("user") .setRowKeyCols("userid") .setFamilyName("color") .setPluginVersion("1.2.12") .setOutputSchemaStr("red long,black double,green int"); lookupHBaseStreamOp.linkFrom(op).print(); StreamOperator.execute(); } }