Java Flink与kafka实现实时告警功能过程
2023-01-18 10:08:16 来源:易采站长站 作者:
目录引出问题demo设计环境搭建flink程序代码项目演示告警系统架构引出问题项目使用告警系统的逻辑是将实时数据保存到本地数据库再使用定时任务做判断,然后产生告警数据。这种方式存在告警的延时实在是太高...
目录
引出问题demo设计
环境搭建
flink程序代码
项目演示
告警系统架构
引出问题
项目使用告警系统的逻辑是将实时数据保存到本地数据库再使用定时任务做判断,然后产生告警数据。这种方式存在告警的延时实在是太高了。数据从产生到保存,从保存到判断都会存在时间差,按照保存数据定时5分钟一次,定时任务5分钟一次。最高会产生10分钟的误差,这种告警就没什么意义了。
demo设计
为了简单的还原业务场景,做了简单的demo假设
实现一个对于学生成绩评价的实时处理程序
数学成绩,基准范围是90-140,超出告警
物理成绩,基准范围是60-95,超出告警
环境搭建
使用Windows环境演示
准备工作
1、安装jdk
2、安装zookeeper
解压压缩包
zoo_sample.cfg将它重命名为zoo.cfg
修改配置 dataDir=D://tools//apache-zookeeper-3.5.10-bin//data
配置环境变量
3、安装kafka
解压压缩包
修改config/server.properties
log.dirs=D://tools//kafka_2.11-2.1.0//log
flink程序代码
pom
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-Java</artifactId> <version>1.13.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.12</artifactId> <version>1.13.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.12</artifactId> <version>1.13.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.12</artifactId> <version>1.13.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.projectlombok/lombok --> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.12</version> <scope>provided</scope> </dependency> <!-- https://mvnrepository.com/artifact/com.alibaba/fastjson --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.62</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>1.10.0</version> </dependency>
主程序
public class StreamAlertDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(3); Properties properties = new Properties(); properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), properties); DataStreamSource<String> inputDataStream = env.addSource(kafkaConsumer); DataStream<String> resultStream = inputDataStream.flatMap(new AlertFlatMapper()); resultStream.print().setParallelism(4); resultStream.addSink(new FlinkKafkaProducer<>("demo",new SimpleStringSchema(),properties)); env.execute(); } }
主程序,配置告警规则后期可以使用推送或者拉去方式获取数据
public class RuleMap { private RuleMap(){} public final static Map<String,List<AlertRule>> initialRuleMap; private static List<AlertRule> ruleList = new ArrayList<>(); private static List<String> ruleStringList = new ArrayList<>(Arrays.asList( "{\"target\":\"MathVal\",\"type\":\"0\",\"criticalVal\":90,\"descInfo\":\"You Math score is too low\"}", "{\"target\":\"MathVal\",\"type\":\"2\",\"criticalVal\":140,\"descInfo\":\"You Math score is too high\"}", "{\"target\":\"PhysicsVal\",\"type\":\"0\",\"criticalVal\":60,\"descInfo\":\"You Physics score is too low\"}", "{\"target\":\"PhysicsVal\",\"type\":\"2\",\"criticalVal\":95,\"descInfo\":\"You Physics score is too high\"}")); static { for (String i : ruleStringList) { ruleList.add(JSON.parseobject(i, AlertRule.class)); } initialRuleMap = ruleList.stream().collect(Collectors.groupingBy(AlertRule::getTarget)); } }
AlertFlatMapper,处理告警逻辑
public class AlertFlatMapper implements FlatMapFunction<String, String> { @Override public void flatMap(String inVal, Collector<String> out) throws Exception { Achievement user = JSON.parseObject(inVal, Achievement.class); Map<String, List<AlertRule>> initialRuleMap = RuleMap.initialRuleMap; List<AlertInfo> resList = new ArrayList<>(); List<AlertRule> mathRule = initialRuleMap.get("MathVal"); for (AlertRule rule : mathRule) { if (checkVal(user.getMathVal(), rule.getCriticalVal(), rule.getType())) { resList.add(new AlertInfo(user.getName(), rule.getDescInfo())); } } List<AlertRule> physicsRule = initialRuleMap.get("PhysicsVal"); for (AlertRule rule : physicsRule) { if (checkVal(user.getPhysicsVal(), rule.getCriticalVal(), rule.getType())) { resList.add(new AlertInfo(user.getName(), rule.getDescInfo())); } } String result = JSON.toJSONString(resList); out.collect(result); } private static boolean checkVal(Integer actVal, Integer targetVal, Integer type) { switch (type) { case 0: return actVal < targetVal; case 1: return actVal.equals(targetVal); case 2: return actVal > targetVal; default: return false; } } }
三个实体类
@Data @EqualsAndHashCode(callSuper = false) @Accessors(chain = true) public class Achievement implements Serializable { private static final long serialVersionUID = -1L; private String name; private Integer mathVal; private Integer physicsVal; } @Data @AllArgsConstructor @EqualsAndHashCode(callSuper = false) @Accessors(chain = true) public class AlertInfo implements Serializable { private static final long serialVersionUID = -1L; private String name; private String descInfo; } @Data @AllArgsConstructor @EqualsAndHashCode(callSuper = false) @Accessors(chain = true) public class AlertRule implements Serializable { private static final long serialVersionUID = -1L; private String target; //0小于 1等于 2大于 private Integer type; private Integer criticalVal; private String descInfo; }
项目演示
创建kafka生产者 test
.\bin\windows\kafka-console-producer.BAT --broker-list localhost:9092 --topic test
创建kafka消费者 demo
.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic demo --from-beginning
启动flink应用
给topic test发送消息
{"name":"liu","MathVal":45,"PhysicsVal":76}
消费topic demo
告警系统架构
到此这篇关于Java Flink与kafka实现实时告警功能过程的文章就介绍到这了,更多相关Java Flink与kafka实时告警内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!
如有侵权,请发邮件到 [email protected]
最新图文推荐
相关文章
-
Spring Cloud 整合Apache-SkyWalking实现链路跟踪的方法
什么是SkyWalking 查看官网https://skywalking.apache.org/ 分布式系统的应用程序性能监视工具,专为微服务、云原生架构和基于容器(Docker、K8s、Mesos)架构而设计。 安装 进入下载页面https://2020-06-18
-
成功解决IDEA2020 Plugins 连不上、打不开的方法
IntelliJ IDEA 2020.1 插件中心一直打不开,鉴于有部分同学反馈设置http proxy不能解决,所以可按以下顺序检查 一、设置 http proxy—勾上Auto-detect proxy setting,参照下图,加上地址 http://127.0.02020-06-25
-
IDEA2020 1.1中Plugins加载不出来的问题及解决方法
进入File-Setting 如图,取消勾选,点击确认后重启,点击了以后等一会就可以正常显示 ps:下面看下解决IDEA 2020.1.1 找不到程序包和符号 问题描述 IDEA 2020.1.1 maven项目build的时候报错,找2020-06-28
-
springboot + rabbitmq 如何实现消息确认机制(踩坑经验)
本文收录在个人博客:www.chengxy-nds.top,技术资源共享,一起进步 最近部门号召大伙多组织一些技术分享会,说是要活跃公司的技术氛围,但早就看穿一切的我知道,这 T M 就是为了刷2020-07-01
-
JetBrains IntelliJ IDEA 2020安装与使用教程详解
对于JetBrains IntelliJ IDEA 2020的认识 IntelliJ IDEA 2020是一款JAVA编程软件,捷克IntelliJ公司研发推出。该软件提供了一个非常强大的JAVA集成开发环境,不仅添加了对Records的完整代码洞察支持,2020-06-28