类别

版本

使用Kafka连接器扩展

简介

扩展连接RapidMiner与一个卡夫卡服务器允许从主题中读取消息并将数据写入Kafka主题。

安装Kafka连接器扩展

安装扩展,去扩展菜单,打开RapidMiner市场(更新和扩展),并搜寻卡夫卡连接器.有关更多细节,请参见添加扩展

连接到Kafka服务器

Kafka连接器使用RapidMiner的连接框架。要连接到Kafka服务器,请创建一个新的Kafka连接对象在存储库中。这允许集中管理连接并重用操作符之间的连接。扩展支持以下安全选项:

  • 没有一个
  • SASL平原
  • SASL急停
  • SSL双向

此外,SASL Plain和SASL SCRAM可以配置为使用加密的SSL通道,可以使用公共证书,也可以使用本地提供的SSL密钥和信任库文件。

没有认证

要连接到不提供额外安全功能的Kafka服务器,提供可以到达的主机地址和端口号就足够了。

SASL平原

使用SASL(简单身份验证和安全层)Plain作为身份验证连接到Kafka服务器,需要用户名和密码。

如果加密设置为none时,将不加密传输凭据。如果服务端提供加密证书(SSL),请设置加密“是的”。

SASL急停

要连接到使用SASL SCRAM(盐碱化挑战响应身份验证机制)作为身份验证的Kafka服务器,需要用户名和密码以及用于哈希的正确SCRAM版本。

如果加密设置为none时,将不加密传输凭据。如果服务端提供加密证书(SSL),请设置加密“是的”。

SSL双向

当与Kafka服务器的通信通过SSL安全通道而不需要额外的身份验证时,选择此选项。

使用Keystore文件

在自创建证书的情况下,需要提供保存此信息的必要文件。

密钥存储库File holder是存储公私密钥和证书的存储库信任存储库文件保存受信任的证书。由于此信息的敏感性,两个文件都有自己的密码。

例子

你可以找到运算符读卡夫卡的话题而且卡夫卡写主题通过搜索卡夫卡在操作面板中。

对于这两种操作,您可以通过连接存储的连接对象到连接输入端口来指定所使用的Kafka服务器。不需要其他凭据,因为它们都存储在Connection对象中。通过交换连接,您可以轻松地在不同的服务器之间切换。

这两个操作符都可以选择提供服务器上已经可用的主题列表。在寻找要订阅的特定主题或向已存在的主题发送新消息时,这非常有帮助。

从现有的Kafka Topic中读取

读卡夫卡的话题operator是有可能从Kafka服务器检索消息的。

有两种不同的访问方法,称为补偿策略

与参数补偿策略设置为最早的从此主题最早可用的消息开始检索过去的消息。这在用过去的数据训练机器学习模型或检查最近的事件时很有用。可以查询选定数量的消息或检索该主题的所有过去消息。

如果补偿策略被设置为最新的操作员等待收集新的传入消息。的收集策略是不是要等到一个固定的数量没有新消息到达或已经过了一定的时间。如同退步,即使等到一个固定的数字,也有一个时间参数,以防止无限期等待。此策略可用于收集新事件,例如用于监视事件或使用训练过的模型对事件进行评分。

在这两种情况下,结果示例集将每个消息作为示例行包含

故障排除

根据服务器和连接速度的不同,读请求可能会超时,在这种情况下将返回一个空示例集。特别是对于远程服务,增加超时设置以获得可靠的结果可能会有所帮助,这样不会过多增加流程执行时间。

向Kafka主题写入数据

卡夫卡写操作符允许从RapidMiner发送数据到指定的Kafka主题。这可以在一个大的批次中完成批量发送选择的选项,或在指定的消息间隔内。

还可以选择两种不同的消息格式。与消息格式设置为JSON (JavaScript对象表示法),每个示例被转换为一个JSON消息,该消息包含作为键设置的示例的属性名。

消息格式字符串只发送不带属性名和指定分隔符标记(默认为“;”)的原始数据。