使用流媒体扩展
的流媒体扩展允许构建流分析进程,并将它们部署到Apache Flink或Spark流集群上。该设计在一个特殊的嵌套操作符中遵循已知的基于操作符的方法流巢.
数据流使用Apache Kafka主题作为流处理的数据源和接收器。要了解如何建立Kafka连接,请参阅卡夫卡连接器扩展。
安装流媒体扩展
安装扩展,去扩展菜单,打开市场(更新和扩展),并搜寻流扩展.有关更多细节,请参见添加扩展.
连接到流集群
扩展需要连接到流集群,在那里可以部署流程。为此,连接使用RapidMiner的连接框架。这允许集中管理连接并重用操作符之间的连接。这些进程是独立于技术的,因此可以在Flink或Spark集群上执行相同的进程,但可以更改连接对象以外的任何内容。
连接到Flink
创建一个工作的Flink连接对象在存储库中,必须添加必要的属性。它们是主机和端口信息、应用的并行级别和远程指示板的地址。
连接到火花
创建一个工作的Spark连接对象,必须添加必要的属性。的火花设置TAB处理基本连接,需要主机地址和端口号以及到远程仪表板的链接。
的火花属性选项卡和HDFS选项卡可以保存集群的特定属性,这取决于所使用的spark版本和服务器设置。
的HDFS设置选项卡可选地使用HDFS文件系统的URL和路径。
构建流流程
任何流流程的起点都是流巢操作符。在此流程中,配置流流程,然后将其内容部署到通过连接输入端口定义的流集群上。
在流媒体巢中,只有来自流媒体扩展的操作符将被转换为Flink或Spark操作,并部署在流媒体服务器上。其他操作如Multiply可以帮助组织工作流。Kafka连接对象指向输入和输出数据流,RapidMiner模型可以与流上的应用模型结合使用下面).
构建ETL流程
这个过程将来自两个独立Kafka主题的两个传入数据流合并,然后筛选数据中的特定键。然后,结果被写回一个新的Kafka主题,例如,它们可以用于训练一个模型。
示例用例:
- 两个传入的Kafka流是来自两个独立生产工厂监视服务的事件。在数以百万计的传入事件中,只有那些标记为“警告”的事件对早期预警系统是重要的。
- 流也可以是来自两个网站的点击事件,并且只需要分析带有“取消”键的事件,例如触发用户留存事件。
下面的过程显示了来自不同资产的两个数据流如何首先由一个合并到一起联盟操作符。结果流被过滤,只包含带有“warning”标记的事件。过滤后的结果被写回一个新的Kafka主题。
在流上应用模型
基于上面的例子,第二个例子展示了如何在流集群上训练和应用任何RapidMiner模型。
第一步是检索数据并根据历史数据训练模型。例如,读卡夫卡运营商的卡夫卡连接器扩展可以用于从工厂监控中检索带有“警告”标签的过去事件。一个k - means聚类模型可以检测出告警的子组,帮助自动区分不同类型的问题。如果有标记数据可用,监督学习模型也可以训练;例如,预测警报的严重级别的模型。
现在,训练过的模型被放在流工作流中。然后将该模型应用于过滤后的报警事件流,并将预测结果推到另一个主题。
监控流程
流媒体扩展增加了一个新的面板到RapidMiner Studio,“流媒体仪表盘”。它可以添加到下面的用户界面视图->显示面板->流媒体仪表板.
它列出了所有已部署的流流程,并允许监视和管理它们。流式Nest操作符的执行在流式仪表板(称为工作流)中创建一个条目。工作流的名称、状态、定义RapidMiner流程的位置和开始时间都列在仪表板上。
此外,还列出了所有已部署的流作业。单个作业以及整个工作流都可以通过仪表板停止。工作流的条目可以通过使用相应的按钮从仪表板中删除。
流媒体仪表板还允许打开特定平台(Flink或Spark)的远程仪表板。
在流上应用RapidMiner模型
为了使用Apply Model on Stream操作符,需要对流集群进行一些更改。流引擎(Flink或Spark)需要一个额外的插件,这样它就知道如何处理RapidMiner模型,当然,需要RapidMiner执行引擎来实际运行模型。
在集群上安装RapidMiner Studio
可以找到RapidMiner Studio的安装指南在这里.如果特殊型号(例如从深度学习扩展)的使用,扩展* . jar文件是需要的,以及需要放在.RapidMiner /扩展
集群上的文件夹。扩展文件可以从RapidMiner市场下载或从.RapidMiner
本地安装的。
安装RapidMiner插件
RapidMiner插件文件随扩展名一起提供,可以在这个路径下的RapidMiner Home文件夹中找到/ workspace / rmx_streaming .RapidMiner /扩展
在从市场安装流扩展后。
这个文件夹里有两个文件叫做rapidminer-as-plugin-common.jar而且rapidminer-as-plugin.jar,最后是扩展的版本号。
把rapidminer-as-plugin-common.jar文件到/ lib
您的集群安装文件夹(例如/ opt / flink / lib /
).
准备rapidminer-as-plugin-common.jar-创建一个空的自由文件夹中。的内容复制到此文件夹自由文件夹从新的RapidMiner安装(没有jdbc而且插件子文件夹)。-复制rapidminer-as-plugin.jar也在这个文件夹里。——创建一个plugin.properties归档并写入以下内容:
plugin.class = com.rapidminer.extension.streaming.raap.wrapper。RapidMinerPlugIn插件。id=rm-as-plugin Plugin .version=9.9.0 Plugin .description=RapidMiner as Plugin Plugin。许可证= Apache Licents 2.0
方法创建zip-archive自由文件夹和插件。属性,并命名它rm-as-plugin.zip(确保名称是正确的,因为代码将查找这个文件)。
然后将这个zip文件放入一个新文件夹中% RM_INSTALLATION % / lib / rm-as-pluginRapidMiner安装(所以它是一个子文件夹,从* . jar复制的文件)。
之后应该重新启动集群实例,以便Flink或Spark可以加载新的插件。
执行一个RapidMiner模型
现在,在流上应用模型运算符可用。两个必需的参数是RapidMiner安装文件夹的位置(其中rm-as-plugin)和RapidMiner Home文件夹,其中存储扩展名和用户数据。
在典型的linux集群上,路径是这样的/ opt / rapidminer-9-9-0
而且/home/$用户名/。RapidMiner