Java 类名:com.alibaba.alink.operator.stream.source.KafkaSourceStreamOp
Python 类名:KafkaSourceStreamOp
读Kafka版。Kafka是由Apache软件基金会开发的一个开源流处理平台。详情
请参阅:https://kafka.apache.org/
名称 | 中文名称 | 描述 | 类型 | 是否必须? | 取值范围 | 默认值 |
---|---|---|---|---|---|---|
bootstrapServers | bootstrapServers | bootstrapServers | String | ✓ | ||
groupId | groupId | groupId | String | ✓ | ||
startupMode | startupMode | startupMode | String | ✓ | “EARLIEST”, “GROUP_OFFSETS”, “LATEST”, “TIMESTAMP” | |
properties | 用户自定义Kafka参数 | 用户自定义Kafka参数,形如: “prop1= val1, prop2 = val2” | String | null | ||
startTime | 起始时间 | 起始时间。默认从当前时刻开始读。 | String | null | ||
topic | topic名称 | topic名称 | String | null | ||
topicPattern | topic pattern | topic pattern | String | null |
** 以下代码仅用于示意,可能需要修改部分代码或者配置环境后才能正常运行!**
data = KafkaSourceStreamOp() \ .setBootstrapServers("localhost:9092") \ .setTopic("iris") \ .setStartupMode("EARLIEST") \ .setGroupId("alink_group") data.print() StreamOperator.execute()
import com.alibaba.alink.operator.stream.StreamOperator; import com.alibaba.alink.operator.stream.source.KafkaSourceStreamOp; import org.junit.Test; public class KafkaSourceStreamOpTest { @Test public void testKafkaSourceStreamOp() throws Exception { StreamOperator <?> data = new KafkaSourceStreamOp() .setBootstrapServers("localhost:9092") .setTopic("iris") .setStartupMode("EARLIEST") .setGroupId("alink_group"); data.print(); StreamOperator.execute(); } }