[新扩展]Kafka连接器
总结
这个扩展包含与Apache Kafka消息代理一起工作的操作符。
它增加了两个操作符来与特定的Kafka主题交互:
从主题中读取消息,要么是批处理中的旧消息,要么是收集新发布的消息
将示例集中的数据作为新消息写入Kafka集群上的主题
两家运营商都以批处理方式收集数据,并且与后台执行、AI Hub部署或Real-Time-Scoring代理结合使用效果最佳。
为了简化对现有Kafka服务器的访问,该扩展支持一个新的连接对象来存储和共享连接细节。用例示例
- 从一个主题阅读过去的消息
使用“Read Kafka Topic”操作符和轮询策略设置为最早的,您可以检索存储在给定主题下的过去消息。这可以用于分析历史数据,并可能在历史数据上构建模型。 - 从一个主题中读取新消息
使用“Read Kafka Topic”操作符和轮询策略设置为最新的,可以从主题检索新发布的消息。可以选择等待,直到检索到指定数量的消息,或者等待一段固定的时间,并返回在此期间发布的所有消息。这可以在部署中用于将学习到的模型应用于新数据或检查特定事件。 - 向一个主题写消息
使用“Write Kafka Topic”操作符,任何示例集数据都可以转换为消息并推送到给定的主题。每个示例或行都被转换为消息(作为简单字符串或JSON格式)并发送到消息代理。有一个选项可以批量发送所有消息,也可以在指定的时间间隔内发送所有消息。例如,这可以用于将得分或转换的数据发布回Kafka流。 - 对历史数据进行培训,并对新消息进行评分
通过读写组合,可以很容易地构建一个模型,该模型对存储在集群上的过去消息进行训练,然后定期检索新消息并将模型应用于它们。得分数据甚至可以再次发布到一个新的主题中。
免责声明
这是扩展的早期版本,因此要注意缺少的功能。反馈是非常感谢。
扩展本身仅作为消息消费者或生产者。Kafka集群没有附带这个扩展,但是简单的读写操作也不需要它。
标记:
2