Flink 用户电商行为分析项目
Flink 用户电商行为分析
文章目录
1. 实时统计分析
1. 1 热门商品统计
-
需求描述:每隔5分钟 实时展示1小时内的该网站的热门商品的TopN
-
展示的数据形式:
时间窗口信息:
NO 1:商品ID+浏览次数1
NO 2:商品ID+浏览次数2
NO 1.商品ID+浏览次数3
-
实现思路:
-
- 因为最终要窗口信息+商品ID 所有keyBy后需要全窗口函数 这样才能拿到窗口时间+key
-
- 而且需要浏览次数 所以需要增量聚合函数 keyBy聚合后来一条数据增量聚合一条 拿到浏览次数
-
- 以上1 2步骤后只能拿到 一个商品的浏览次数 所以为了拿到1小时内的 根据时间窗口keyBy 使用processFunction 窗口内的商品保存到ListStat中 定时器到达窗口截止时间 输出ListStat的数据
-
-
代码
/**
* 做什么 :统计一小时内热门商品 5分钟更新一次结果
* 怎么做:
* 1.既然输出1小时内商品信息,即输出历史数据,且每隔5分钟触发一次 即到达窗口结束的时候触发一次
* 输出5分钟内保存的状态信息
* 输出: 窗口结束时间 商品ID 热门数
* <p>
* 2 那么就要统计数出商品结束时间 商品ID 热门数
* 热门数:增量聚合函数
* 结束时间+商品ID:全窗口
* <p>
* 输出结果:
* 窗口结束时间:2017-11-26 12:20:00.0
* 窗口内容:
* NO 1: 商品ID = 2338453 热门度 = 27
* NO 2: 商品ID = 812879 热门度 = 18
* NO 3: 商品ID = 4443059 热门度 = 18
* NO 4: 商品ID = 3810981 热门度 = 14
* NO 5: 商品ID = 2364679 热门度 = 14
*/
public class HotItemsPractise {
public static void main(String[] args) throws Exception {
// 1. 环境准备
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
DataStreamSource<String> inputStream = env.readTextFile("/Users/liangfangwei/IdeaProjects/flinkUserAnalays/data_file/UserBehavior.csv");
//2. 准备数据源
DataStream<ItemBean> filterStream = inputStream.map(line -> {
String[] split = line.split(",");
return new ItemBean(Long.parseLong(split[0]), Long.parseLong(split[1]), Integer.parseInt(split[2]), split[3], Long.parseLong(split[4]));
}).filter(item -> "pv".equals(item.getBehavior()));
//3. 收集一个商品的聚合结果
DataStream<ItemViewCount> windowsResult = filterStream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<ItemBean>() {
@Override
public long extractAscendingTimestamp(ItemBean element) {
return element.getTimestamp() * 1000L;
}
}).keyBy("itemId")
.timeWindow(Time.hours(1), Time.minutes(5))
.aggregate(new MyAggreateCount(), new MyAllWindowsView());
//4. 收集一小时的聚合结果
SingleOutputStreamOperator<String> windowEnd = windowsResult
.keyBy("windowEnd")
.process(new ItemHotTopN(5));
windowEnd.print();
env.execute("HotItemsPractise");
}
/**
* 窗口函数 增量聚合 为了拿到同个商品的浏览次数
*/
public static class MyAggreateCount implements AggregateFunction<ItemBean, Long, Long> {
@Override
public Long createAccumulator() {
return 0L;
}
@Override
public Long add(ItemBean value, Long accumulator) {
return accumulator + 1L;
}
@Override
public Long getResult(Long accumulator) {
return accumulator;
}
@Override
public Long merge(Long a, Long b) {
return null;
}
}
/**
* 全函数:输入值是增量聚合的结果+key, 为了拿到时间窗口信息 窗口的截止时间+商品ID
*/
public static class MyAllWindowsView implements WindowFunction<Long, ItemViewCount, Tuple, TimeWindow> {
/**
* @param tuple
* @param window
* @param input
* @param out
* @throws Exception
*/
@Override
public void apply(Tuple tuple, TimeWindow window, Iterable<Long> input, Collector<ItemViewCount> out) throws Exception {
long windowEnd = window.getEnd();
long count = input.iterator().next();
long itemId = tuple.getField(0);
out.collect(new ItemViewCount(itemId, windowEnd, count));
}
}
/**
* 将一小时内的商品 保存起来 时间窗口到了排序输出TopN
*/
public static class ItemHotTopN extends KeyedProcessFunction<Tuple, ItemViewCount, String> {
ListState<ItemViewCount> itemViewCountListState;
private int topN;
public ItemHotTopN(int topN) {
this.topN = topN;
}
@Override
public void open(Configuration parameters) throws Exception {
itemViewCountListState = getRuntimeContext().getListState(new ListStateDescriptor<ItemViewCount>("itemViewCount", ItemViewCount.class));
}
@Override
public void processElement(ItemViewCount itemViewCount, Context ctx, Collector<String> out) throws Exception {
itemViewCountListState.add(itemViewCount);
ctx.timerService().registerEventTimeTimer(itemViewCount.getWindowEnd() + 1L);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
// ListState转为ArrayList
ArrayList<ItemViewCount> arraylist = Lists.newArrayList(itemViewCountListState.get().iterator());
arraylist.sort(new Comparator<ItemViewCount>() {
@Override
public int compare(ItemViewCount o1, ItemViewCount o2) {
return o2.getCount().intValue() - o1.getCount().intValue();
}
});
StringBuilder resultStringBuilder = new StringBuilder();
resultStringBuilder.append("===================================" + "n");
resultStringBuilder.append("窗口结束时间:").append(new Timestamp(timestamp).toString()).append("n");
for (int i = 0; i < Math.min(topN, arraylist.size()); i++) {
resultStringBuilder
.append("NO ")
.append(i + 1)
.append(": 商品ID = ")
.append(arraylist.get(i).getItemId())
.append(" 热门度 = ")
.append(arraylist.get(i).getCount())
.append("n");
}
resultStringBuilder.append("===================================n");
out.collect(resultStringBuilder.toString());
Thread.sleep(1000L);
}
}
}
1. 2 热门页面统计
- 需求 :每隔5分钟输出一小时内浏览的热门页面
- 输出结果展示:
窗口结束时间:2015-05-18 13:08:50.0
NO 1: 页面URL = /blog/tags/puppet?flav=rss20 热门度 = 11
NO 2: 页面URL = /projects/xdotool/xdotool.xhtml 热门度 = 5
NO 3: 页面URL = /projects/xdotool/ 热门度 = 4
NO 4: 页面URL = /?flav=rss20 热门度 = 4
NO 5: 页面URL = /robots.txt 热门度 = 4
-
实现思路:和上一个不同的是 该数据源中的数据的时间非增量
- 怎么保证保证乱序数据不丢
- 1.所以要设置watermark与数据源之间的乱序程度
- 2.设置一定的窗口延迟关闭时间 在初始的时间窗口到了 先聚合数据 后续再来属于该窗口的数据 来一条计算一条输出一条
- 3.再有迟到的数据 则直接扔到侧输出流中
- 怎么保证后续迟到的数据 来一条覆盖前面的数据
- 1 先开窗增量聚合 再全窗口聚合 再根据窗口截止时间分组
- 2 根据时间的截止窗口开窗key by 收集窗口截止时间内的所有数据 排序输出
- 3 如果后续再来了延迟数据 需要更新之前的结果。所以把之间的数据存咋mapstat中 key为 页面url value为输出结果
- 怎么保证保证乱序数据不丢
-
代码
/** * * - _ooOoo_ * - o8888888o * - 88" . "88 * - (| -_- |) * - O = /O * - ____/`---'____ * - . ' \| |// `. * - / \||| : |||// * - / _||||| -:- |||||- * - | | \ - /// | | * - | _| ''---/'' | | * - .-__ `-` ___/-. / * - ___`. .' /--.-- `. . __ * - ."" '< `.____<|>_/___.' >'"". * - | | : `- `.;` _ /`;.`/ - ` : | | * - `-. _ __ /__ _/ .-` / / * ======`-.____`-.________/___.-`____.-'====== * `=---=' * ............................................. * 佛祖保佑 永无BUG * <p> * 需求 * 每5分钟输出一次1小时之内排名前5的页面 * 小时统计一次结果 ,即开窗是一小时 收集1小时内的统计结果,按照窗口结束时间输出窗口内的结果。窗口的滑动步长设置为5min */ public class HotPages { public static void main(String[] args) throws Exception { StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); executionEnvironment.setParallelism(1); DataStreamSource<String> stringDataStreamSource = executionEnvironment.readTextFile("/Users/liangfangwei/IdeaProjects/flinkUserAnalays/data_file/apache.log"); SimpleDateFormat simpleFormatter = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss"); OutputTag<ApacheLogEvent> lateTag = new OutputTag<ApacheLogEvent>("late_date") { }; DataStream<PageViewCount> streamPageViewCount = stringDataStreamSource.map(line -> { String[] s = line.split(" "); // 日期转时间戳 Long timestamp = simpleFormatter.parse(s[3]).getTime(); return new ApacheLogEvent(s[0], s[1], timestamp, s[5], s[6]); }).filter(date -> "GET".equals(date.getMethod())) .filter(data -> { // 过滤处css js png ico 结尾的 String regex = "((?!\.(css|js|png|ico|jpg)$).)*$"; return Pattern.matches(regex, data.getUrl()); }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<ApacheLogEvent>(Time.seconds(1)) { @Override public long extractTimestamp(ApacheLogEvent apacheLogEvent) { return apacheLogEvent.getTimestamp(); } }).keyBy("url") .timeWindow(Time.minutes(10), Time.seconds(5)) .allowedLateness(Time.minutes(1)) .sideOutputLateData(lateTag) .aggregate(new HotPageIncreaseAgg(), new HotPageAllAgg()); SingleOutputStreamOperator<String> windowEnd = streamPageViewCount .keyBy("windowEnd") .process(new MyProcessFunction(5)); // 控制台输出 windowEnd.print("data"); windowEnd.getSideOutput(lateTag).print("late_date"); executionEnvironment.execute(); } public static class HotPageIncreaseAgg implements AggregateFunction<ApacheLogEvent, Long, Long> { @Override public Long createAccumulator() { return 0L; } @Override public Long add(ApacheLogEvent value, Long accumulator) { return accumulator + 1; } @Override public Long getResult(Long accumulator) { return accumulator; } @Override public Long merge(Long a, Long b) { return a + b; } } public static class HotPageAllAgg implements WindowFunction<Long, PageViewCount, Tuple, TimeWindow> { @Override public void apply(Tuple tuple, TimeWindow window, Iterable<Long> input, Collector<PageViewCount> out) throws Exception { String url = tuple.getField(0); Long count = input.iterator().next(); long windowEnd = window.getEnd(); out.collect(new PageViewCount(url, windowEnd, count)); } } public static class MyProcessFunction extends KeyedProcessFunction<Tuple, PageViewCount, String> { private Integer topSize; MapState<String, Long> hotPageCount; public MyProcessFunction(Integer topSize) { this.topSize = topSize; } @Override public void open(Configuration parameters) throws Exception { hotPageCount = getRuntimeContext().getMapState(new MapStateDescriptor<String, Long>("hot_page_count", String.class, Long.class)); } /** * 如果有迟到数据 需要覆盖就的数据 * 那么定义一个map 加入相同的key 会被覆盖 * 如果时间超过1分钟 那就清除状态 */ @Override public void processElement(PageViewCount pageViewCount, Context ctx, Collector<String> out) throws Exception { // map 类型 如果key相同就更新 hotPageCount.put(pageViewCount.getUrl(),pageViewCount.getCount()); ctx.timerService().registerEventTimeTimer(pageViewCount.getWindowEnd()+1); } /** * 输出map中的结果 * 定时器触发的时间: watermark >= 定时时间 * @param timestamp * @param ctx * @param out * @throws Exception */ @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception { Long currentKey = ctx.getCurrentKey().getField(0); // 判断是否到了窗口关闭清理的时间, 如果是 直接清空状态 if (timestamp == currentKey + 60 * 1000L) { hotPageCount.clear(); return; } ArrayList<Map.Entry<String, Long>> pageViewCounts = Lists.newArrayList(hotPageCount.entries()); pageViewCounts.sort(new Comparator<Map.Entry<String, Long>>() { @Override public int compare(Map.Entry<String, Long> o1, Map.Entry<String, Long> o2) { if(o1.getValue() > o2.getValue()) return -1; else if(o1.getValue() < o2.getValue()) return 1; else return 0; } }); StringBuilder stringBuilder = new StringBuilder(); stringBuilder.append("===================================n"); stringBuilder.append("窗口结束时间:").append(new Timestamp(timestamp - 1)).append("n"); for (int i = 0; i < Math.min(topSize, pageViewCounts.size()); i++) { Map.Entry<String, Long> stringLongEntry = pageViewCounts.get(i); stringBuilder.append("NO ").append(i + 1).append(":") .append(" 页面URL = ").append(stringLongEntry.getKey()) .append(" 热门度 = ").append(stringLongEntry.getValue()) .append("n"); } stringBuilder.append("===============================nn"); // 控制输出频率 Thread.sleep(1000L); out.collect(stringBuilder.toString()); } } }
1. 3 网站uv统计
-
需求:实时输出每小时内网站的uv
-
输出格式: 窗口的截止时间+窗口的独立访问人数
-
实现思路:
-
1.设置滚动窗口为1小时 每来一条数据就要触发计算 那么就需要自定义触发器,
-
2 . 触发器的方法是每条数据都去触发后续的统计逻辑 ,uerid去重,去重逻辑就是每条数据根据uerId去redis中查询,如果有那么丢弃 如果没有则count+1。
-
- 每条数据来了 解析出user ID,根据自定义的hash函数解析出在位图的位置,查询位置的值为1 则取出窗口截止时间对应的访问数并输出,如果为1 设置为1,取出访问数+1 输出 将更新后的count数存储到redis中
存储格式:
count :哈希结果 存储格式 “uv_count”,<窗口的截止时间,访问数>
uesrId :位图
hash函数:userId的当前位的Ascii*seed+上一位的统计结果
-
-
代码
/**
* @author :LiangFangWei
* @date: 2021-12-21 15:55
* - _ooOoo_
* - o8888888o
* - 88" . "88
* - (| -_- |)
* - O = /O
* - ____/`---'____
* - . ' \| |// `.
* - / \||| : |||//
* - / _||||| -:- |||||-
* - | | \ - /// | |
* - | _| ''---/'' | |
* - .-__ `-` ___/-. /
* - ___`. .' /--.-- `. . __
* - ."" '< `.____<|>_/___.' >'"".
* - | | : `- `.;` _ /`;.`/ - ` : | |
* - `-. _ __ /__ _/ .-` / /
* ======`-.____`-.________/___.-`____.-'======
* .............................................
* - 佛祖保佑 永无BUG
* <p>
* 需求:实时输出统计每个小时内的的uv。每个小时内用户去重数实时输出
* 思路:
* <p>
* 再一小时的时间窗口内,每来一条数据 触发计算 。
* 计算逻辑:
* 1.取当前数据去redis的位图中查有没有
* 查询的key为 时间窗口的结束时间
* 查询的offset为 userID的hash值
* 2. 如果没有给查询的位置 置为1
* <p>
* 取判读redis的位图中有没有
* 如果有丢弃 如果没有 count+1 将新的值存到redis中
*/
public class HotUVWithBloomFilter {
public static void main(String[] args) throws Exception {
//1.环境准备
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
executionEnvironment.setParallelism(1);
// 2. 准备数据
DataStreamSource<String> inputStream = executionEnvironment.readTextFile("/Users/liangfangwei/IdeaProjects/flinkUserAnalays/data_file/UserBehavior.csv");
SingleOutputStreamOperator<ItemBean> filterData = inputStream.map(line -> {
String[] split = line.split(",");
return new ItemBean(Long.parseLong(split[0]), Long.parseLong(split[1]), Integer.parseInt(split[2]), split[3], Long.parseLong(split[4]));
}).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<ItemBean>() {
@Override
public long extractAscendingTimestamp(ItemBean element) {
return element.getTimestamp() * 1000L;
}
}).filter(itemBean -> "pv".equals(itemBean.getBehavior()));
//2.滚动窗口为1小时
SingleOutputStreamOperator<PageViewCount> streamOperator = filterData
.timeWindowAll(Time.hours(1))
//3.定义触发器 需要定义每来一条数据触发计算 而不是等全部的窗口再触发计算
.trigger(new UVTriigger())
// 4 计算逻辑 去redis的位图查是否有没有当前userID
.process(new UVProcessFunction());
// 5 如果没有则 需要插入进去
streamOperator.print();
executionEnvironment.execute();
}
/**
* 定义静态内部类 不需要将类的定义额外写在class文件中
*/
public static class UVTriigger extends Trigger<ItemBean, TimeWindow> {
@Override
public TriggerResult onElement(ItemBean element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.FIRE_AND_PURGE;
}
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
}
}
public static class UVProcessFunction extends ProcessAllWindowFunction<ItemBean, PageViewCount, TimeWindow> {
private Jedis jedis;
private String pageCountKey = "uv_page_count";
private BloomFilter bloomFilter;
@Override
public void open(Configuration parameters) throws Exception {
jedis = new Jedis("localhost", 6379);
bloomFilter = new BloomFilter(1 << 29);
}
/**
* 来一条数据去redis中查
*
* @param context
* @param elements
* @param out
* @throws Exception
*/
@Override
public void process(Context context, Iterable<ItemBean> elements, Collector<PageViewCount> out) throws Exception {
Long windowEnd1 = context.window().getEnd();
String windowEnd = windowEnd1.toString();
ItemBean itemBean = elements.iterator().next();
Long userId = itemBean.getUserId();
long offset = bloomFilter.hash(userId.toString(), 61);
Boolean isExist = jedis.getbit(windowEnd, offset);
if (!isExist) {
jedis.setbit(windowEnd, offset, true);
// count值+1 cont值存储为hash结构
Long uvCount = 0L; // 初始count值
String uvCountString = jedis.hget(pageCountKey, windowEnd);
if (StringUtils.isNoneBlank(uvCountString)) {
uvCount = Long.valueOf(uvCountString);
}
jedis.hset(pageCountKey, windowEnd, String.valueOf(uvCount + 1));
out.collect(new PageViewCount("uv", windowEnd1, uvCount + 1));
}
}
}
public static class BloomFilter {
// 要去2的幂次方 result&(capacity-1) 才是求余的
private long capacity;
public BloomFilter(long capacity) {
this.capacity = capacity;
}
public long hash(String userId, int seed) {
long result = 0L;
for (int i = 0; i < userId.length(); i++) {
result = result * seed + userId.charAt(i);
}
return result & (capacity - 1);
}
}
}
2. 业务流程以及风险控制
2. 1 页面广告黑名单过滤
-
需求: 输出每个省份每个广告的点击数。统计周期是一个小时 输出间隔是5分钟。要求如果当天某人都某个广告点击次数超过3次 则将该用户输出到侧输出流中 。如果当天内用户再次点击该广告 则计为无效 不做统计
-
输出格式:
-
blacklist-user> BlackAdUerInfo(uerId=937166, adId=1715, count=click over 3times.)
blacklist-user> BlackAdUerInfo(uerId=161501, adId=36156, count=click over 3times.)—>> AdOutputInfo(province=beijing, windowEnd=2017-11-26 09:25:00.0, count=2)
—>> AdOutputInfo(province=guangdong, windowEnd=2017-11-26 09:25:00.0, count=5)
—>> AdOutputInfo(province=beijing, windowEnd=2017-11-26 09:25:00.0, count=2)
—>> AdOutputInfo(province=beijing, windowEnd=2017-11-26 09:30:00.0, count=2)
—>> AdOutputInfo(province=guangdong, windowEnd=2017-11-26 09:30:00.0, count=5)
—>> AdOutputInfo(province=shanghai, windowEnd=2017-11-26 09:30:00.0, count=2)
-
-
统计逻辑:
- 怎么过滤异常数据:根据uerid+adId keyBy 分组 再使用process,该分区每来一条数据 判断是否到达设置的点击数,如果没有则+1,并且输出该条记录 则将该用户的uerID加入到黑名单(侧输出流中) 并注册第二天凌晨的定时器 定时器第二天清空改用好点击次数的状态
- 后面广告数的统计 就和前面统计方式如出一辙了
-
代码
/**
* - _ooOoo_
* - o8888888o
* - 88" . "88
* - (| -_- |)
* - O = /O
* - ____/`---'____
* - . ' \| |// `.
* - / \||| : |||//
* - / _||||| -:- |||||-
* - | | \ - /// | |
* - | _| ''---/'' | |
* - .-__ `-` ___/-. /
* - ___`. .' /--.-- `. . __
* - ."" '< `.____<|>_/___.' >'"".
* - | | : `- `.;` _ /`;.`/ - ` : | |
* - `-. _ __ /__ _/ .-` / /
* ======`-.____`-.________/___.-`____.-'======
* .............................................
* - 佛祖保佑 永无BUG
*
* @author :LiangFangWei
* @date: 2021-12-23 18:58
* @description: 统计每个省份的每个广告的点击次数, 如果某个用户当天对广告的点击超过次数 输出作为一个流输出
* <p>
* 思路:
* 1.最终输出形式
* (省,窗口截止时间,总数)
* 2. 创建增量聚合可以拿到总数 全窗口函数可以拿到窗口截止时间 和key
* 3. 异常数据进行过滤,如果用户在某天对同一广告的点击次数如果超过一定次数 则单独作为流输出
* 3.1 就要保存某个用户对某个广告的点击次数的状态,如果超过100次 并加入黑名单 如果在黑名单中直接返回 什么也不处理和统计
* 3.2 如果没有超过 那么次数+1 输出数据
*/
public class AdStatisticsByProvince {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
executionEnvironment.setParallelism(1);
DataStream<String> inputStream = executionEnvironment.readTextFile("/Users/liangfangwei/IdeaProjects/flinkUserAnalays/data_file/AdClickLog.csv");
DataStream<AdvertInfo> processStream1 = inputStream.map(line -> {
String[] split = line.split(",");
return new AdvertInfo(split[0], split[1], split[2], Long.parseLong(split[4]));
}).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<AdvertInfo>() {
@Override
public long extractAscendingTimestamp(AdvertInfo element) {
return element.getTimeStramp() * 1000L;
}
});
// 过滤掉异常的流数据
SingleOutputStreamOperator<AdvertInfo> fliterBlackStream = processStream1
.keyBy("userId", "adId")
.process(new BlackUserProcess(3));
DataStream<AdOutputInfo> resultStream = fliterBlackStream
.keyBy("province")
.timeWindow(Time.hours(1), Time.minutes(5))
.aggregate(new IncreaseAggreateEle(), new AllAggreateCount());
fliterBlackStream.getSideOutput(new OutputTag<BlackAdUerInfo>("blacklist"){}).print("blacklist-user");
resultStream.print("--->");
executionEnvironment.execute();
}
/**
* 过滤处异常数据
*/
public static class BlackUserProcess extends KeyedProcessFunction<Tuple, AdvertInfo, AdvertInfo> {
ValueState<Long> adClickCount;
ValueState<Boolean> isBlackUser;
private int bound;
public BlackUserProcess(int bound) {
this.bound = bound;
}
@Override
public void open(Configuration parameters) throws Exception {
adClickCount = getRuntimeContext().getState(new ValueStateDescriptor<Long>("ad_click_count", Long.class, 0l));
isBlackUser = getRuntimeContext().getState(new ValueStateDescriptor<Boolean>("is_black_user", Boolean.class, false));
}
/**
* @param value
* @param ctx
* @param out
* @throws Exception
*/
@Override
public void processElement(AdvertInfo value, Context ctx, Collector<AdvertInfo> out) throws Exception {
// 1.判断是否到了设置的边界 注意状态只保留一天
Long userIdClickCount = adClickCount.value();
// 注册第二天的定时器 如果到了清楚状态
Long timestamp = ctx.timerService().currentProcessingTime();
Long clserTime = ((timestamp / 24 * 60 * 60 * 1000L) + 1) * 24 * 60 * 60 * 1000L - 8 * 60 * 60 * 1000;
ctx.timerService().registerEventTimeTimer(clserTime);
// 2.如果到了设置了边界
if (userIdClickCount >= bound) {
// 2.1 没有在黑名单中
if (!isBlackUser.value()) {
// 加入黑名单 加入到侧输出流中
isBlackUser.update(true);
ctx.output(new OutputTag<BlackAdUerInfo>("blacklist") {
},
new BlackAdUerInfo(value.getUserId(), value.getAdId(), "click over " + userIdClickCount + "times."));
}
// 2.2 在黑名单 直接返回
return;
}
// 3. 如果没有达到设置的边界 更新状态 输出该条数据
adClickCount.update(userIdClickCount+1);
out.collect(value);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<AdvertInfo> out) throws Exception {
adClickCount.clear();
isBlackUser.clear();
}
}
public static class IncreaseAggreateEle implements AggregateFunction<AdvertInfo, Long, Long> {
@Override
public Long createAccumulator() {
return 0L;
}
@Override
public Long add(AdvertInfo value, Long accumulator) {
return accumulator+1;
}
@Override
public Long getResult(Long accumulator) {
return accumulator;
}
@Override
public Long merge(Long a, Long b) {
return a+b;
}
}
public static class AllAggreateCount implements WindowFunction<Long, AdOutputInfo, Tuple, TimeWindow> {
@Override
public void apply(Tuple tuple, TimeWindow window, Iterable<Long> input, Collector<AdOutputInfo> out) throws Exception {
Timestamp formateDate = new Timestamp(window.getEnd());
out.collect(new AdOutputInfo(tuple.getField(0).toString(),formateDate.toString(),input.iterator().next()));
}
}
}
2. 2 恶意登陆监控
-
需求:检测出两秒内 连续登陆失败的2次用用户
-
实现思路:CEP编程 定义 2秒内连续登陆是吧失败两次的规则,并将流应用到改规则上 筛选出应用规则后的流
/**
* - _ooOoo_
* - o8888888o
* - 88" . "88
* - (| -_- |)
* - O = /O
* - ____/`---'____
* - . ' \| |// `.
* - / \||| : |||//
* - / _||||| -:- |||||-
* - | | \ - /// | |
* - | _| ''---/'' | |
* - .-__ `-` ___/-. /
* - ___`. .' /--.-- `. . __
* - ."" '< `.____<|>_/___.' >'"".
* - | | : `- `.;` _ /`;.`/ - ` : | |
* - `-. _ __ /__ _/ .-` / /
* ======`-.____`-.________/___.-`____.-'======
* .............................................
* - 佛祖保佑 永无BUG
*
* @author :LiangFangWei
* @date: 2021-12-26 17:11
* @description: 连续登陆失败检测 输出两秒内登陆失败3次的记录
*/
public class LoginCheck {
public static void main(String[] args) throws Exception {
// 1.定义环境
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
executionEnvironment.setParallelism(1);
executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStreamSource<String> stringDataStreamSource = executionEnvironment.readTextFile("/Users/liangfangwei/IdeaProjects/flinkUserAnalays/data_file/LoginLog.csv");
// 2.包装为对象
KeyedStream<LoginInfo, Tuple> keyedStream = stringDataStreamSource.map(line -> {
String[] split = line.split(",");
return new LoginInfo(split[0], split[2], Long.parseLong(split[3]));
}).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<LoginInfo>(Time.seconds(3)) {
@Override
public long extractTimestamp(LoginInfo element) {
return element.getTimeStamp() * 1000L;
}
}).keyBy("status");
// 3.定义规则
// 3.1 创建规则
Pattern<LoginInfo, LoginInfo> failPattern = Pattern.<LoginInfo>begin("loginFailEvent").where(new SimpleCondition<LoginInfo>() {
@Override
public boolean filter(LoginInfo value) throws Exception {
return "fail".equals(value.getStatus());
}
// 连续三次登陆失败 consecutive 设置为严格近邻
}).times(3).consecutive().within(Time.seconds(5));
// 3.2 规则匹配到流上
PatternStream<LoginInfo> pattern = CEP.pattern(keyedStream, failPattern);
// 3.3 筛选数据
SingleOutputStreamOperator selectStream = pattern.select(new PatternSelectFunction<LoginInfo, LoginFailInfo>() {
/*
* Map中存储的是规则匹配上的数据
*/
@Override
public LoginFailInfo select(Map<String, List<LoginInfo>> pattern) throws Exception {
List<LoginInfo> loginFailEvent = pattern.get("loginFailEvent");
LoginInfo firstFail = loginFailEvent.get(0);
String userId = firstFail.getUserId();
LoginInfo lastFail = pattern.get("loginFailEvent").get(loginFailEvent.size()-1);
Timestamp firstFailTimeStamp = new Timestamp(firstFail.getTimeStamp() * 1000L);
Timestamp secondFailTimeStamp = new Timestamp(lastFail.getTimeStamp() * 1000L);
return new LoginFailInfo(userId, firstFailTimeStamp.toString(), secondFailTimeStamp.toString(), "连续"+loginFailEvent.size()+"登陆失败");
}
});
selectStream.print();
executionEnvironment.execute();
}
}
2. 3 订单支付失效监控
-
需求:实时检测是15分钟内下单咩有支付的订单
-
实现逻辑:
- 定义CEP规则: 15分钟内有下单和支付的规则。
- 匹配到流上
- 从匹配的流上筛选出匹配的数据,并从map中解析出延迟数据和非延数据
- 匹配的数据会封装到map中
- 没匹配上的数据也会输出到map中
-
代码
/**
* - _ooOoo_
* - o8888888o
* - 88" . "88
* - (| -_- |)
* - O = /O
* - ____/`---'____
* - . ' \| |// `.
* - / \||| : |||//
* - / _||||| -:- |||||-
* - | | \ - /// | |
* - | _| ''---/'' | |
* - .-__ `-` ___/-. /
* - ___`. .' /--.-- `. . __
* - ."" '< `.____<|>_/___.' >'"".
* - | | : `- `.;` _ /`;.`/ - ` : | |
* - `-. _ __ /__ _/ .-` / /
* ======`-.____`-.________/___.-`____.-'======
* .............................................
* - 佛祖保佑 永无BUG
*
* @description:检测15分钟内没有支付的订单
*/
public class OrderCheck {
private static final Logger logger = LoggerFactory.getLogger(OrderCheck.class);
public static void main(String[] args) throws Exception {
// 1.
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
executionEnvironment.setParallelism(1);
executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStreamSource<String> stringDataStreamSource = executionEnvironment.readTextFile("/Users/liangfangwei/IdeaProjects/flinkUserAnalays/data_file/OrderLog.csv");
SingleOutputStreamOperator<OrderInfo> objectSingleOutputStreamOperator = stringDataStreamSource.map(line -> {
String[] split = line.split(",");
return new OrderInfo(split[0], split[1], split[2], Long.parseLong(split[3]));
}).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<OrderInfo>() {
@Override
public long extractAscendingTimestamp(OrderInfo element) {
return element.getTimeStamp()*1000L;
}
});
// 2 定义规则
Pattern<OrderInfo, OrderInfo> orderPayPattern = Pattern.<OrderInfo>begin("create").where(new SimpleCondition<OrderInfo>() {
@Override
public boolean filter(OrderInfo value) throws Exception {
return "create".equals(value.getStatus());
}
}).followedBy("pay").where(new SimpleCondition<OrderInfo>() {
@Override
public boolean filter(OrderInfo value) throws Exception {
return "pay".equals(value.getStatus());
}
}).within(Time.minutes(15));
// 3. 匹配模式
PatternStream<OrderInfo> orderStream = CEP.pattern(objectSingleOutputStreamOperator.keyBy("orderId"), orderPayPattern);
OutputTag<OrderTimeoutInfo> outputTag = new OutputTag<OrderTimeoutInfo>("timeoutStream") {
};
// 4. 筛选输出匹配上和超时事件
SingleOutputStreamOperator<OrderTimeoutInfo> resultStream = orderStream.select(outputTag, new OrderTimeoutSelect(), new OrderPaySelect());
resultStream.print("payed normally");
resultStream.getSideOutput(outputTag).print("timeout");
executionEnvironment.execute("order timeout detect job");
}
/**
* 什么时候 会判断超时:定义的时间范围内如果没有匹配上或者 就判断超时
* 输出到哪里:超时事件会输出到侧输出流中
*/
public static class OrderTimeoutSelect implements PatternTimeoutFunction<OrderInfo,OrderTimeoutInfo>{
@Override
public OrderTimeoutInfo timeout(Map<String, List<OrderInfo>> pattern, long timeoutTimestamp) throws Exception {
logger.error("rrrr_locker: get locker fail: key={}", pattern.toString());
OrderInfo OrderInfo = pattern.get("create").get(0);
return new OrderTimeoutInfo(OrderInfo.getOrderId(),"timeout"+timeoutTimestamp);
}
}
public static class OrderPaySelect implements PatternSelectFunction<OrderInfo, OrderTimeoutInfo>{
@Override
public OrderTimeoutInfo select(Map<String, List<OrderInfo>> pattern) throws Exception {
OrderInfo OrderInfo = pattern.get("pay").get(0);
return new OrderTimeoutInfo(OrderInfo.getOrderId(),"pay");
}
}
}
2. 4 支付实时对账
-
双流join
-
代码
/** * - _ooOoo_ * - o8888888o * - 88" . "88 * - (| -_- |) * - O = /O * - ____/`---'____ * - . ' \| |// `. * - / \||| : |||// * - / _||||| -:- |||||- * - | | \ - /// | | * - | _| ''---/'' | | * - .-__ `-` ___/-. / * - ___`. .' /--.-- `. . __ * - ."" '< `.____<|>_/___.' >'"". * - | | : `- `.;` _ /`;.`/ - ` : | | * - `-. _ __ /__ _/ .-` / / * ======`-.____`-.________/___.-`____.-'====== * ............................................. * - 佛祖保佑 永无BUG * @author :LiangFangWei * @description: 检测订单是否到账 */ public class OrderPay { private final static OutputTag<OrderInfo> unmatchedPays = new OutputTag<OrderInfo>("unmatchedPays") { }; private final static OutputTag<Receipt> unmatchedReceipts = new OutputTag<Receipt>("unmatchedReceipts") { }; public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //1. 支付数据 DataStreamSource<String> inputSteam1 = env.readTextFile("/Users/liangfangwei/IdeaProjects/flinkUserAnalays/data_file/OrderLog.csv"); SingleOutputStreamOperator<OrderInfo> orderStream = inputSteam1.map(line -> { String[] split = line.split(","); return new OrderInfo(split[0], split[1], split[2], Long.parseLong(split[3])); }).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<OrderInfo>() { @Override public long extractAscendingTimestamp(OrderInfo element) { return element.getTimeStamp() * 1000L; } }); // 2.入账数据 DataStreamSource<String> inputStream2 = env.readTextFile("/Users/liangfangwei/IdeaProjects/flinkUserAnalays/data_file/OrderLog.csv"); SingleOutputStreamOperator<Receipt> payStream = inputStream2.map(line -> { String[] split = line.split(","); return new Receipt(split[0], split[1], Long.parseLong(split[2])); }).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Receipt>() { @Override public long extractAscendingTimestamp(Receipt element) { return element.getTimeStamp() * 1000L; } }); // 3.双里join SingleOutputStreamOperator<Tuple2<OrderInfo, Receipt>> resultStream = orderStream.keyBy("payId").connect(payStream.keyBy("payId")).process(new DoubleStreamJoinProcess()); // 4.如果join上返回 resultStream.print("matched"); resultStream.getSideOutput(unmatchedPays).print("unmatchedPays"); } public static class DoubleStreamJoinProcess extends CoProcessFunction<OrderInfo, Receipt, Tuple2<OrderInfo, Receipt>> { ValueState<OrderInfo> payState; ValueState<Receipt> receiptState; @Override public void open(Configuration parameters) throws Exception { payState = getRuntimeContext().getState(new ValueStateDescriptor<OrderInfo>("pay", OrderInfo.class)); receiptState = getRuntimeContext().getState(new ValueStateDescriptor<Receipt>("receipt", Receipt.class)); } @Override public void processElement1(OrderInfo orderInfo, Context ctx, Collector<Tuple2<OrderInfo, Receipt>> out) throws Exception { Receipt receipt = receiptState.value(); // 取出流2 if (receipt != null) { out.collect(new Tuple2<>(orderInfo, receipt)); receiptState.clear(); } else { payState.update(orderInfo); ctx.timerService().registerEventTimeTimer(orderInfo.getTimeStamp() * 1000L + 5000L); } } @Override public void processElement2(Receipt receipt, Context ctx, Collector<Tuple2<OrderInfo, Receipt>> out) throws Exception { // 取出流1 OrderInfo orderInfo = payState.value(); if (orderInfo != null) { out.collect(new Tuple2<>(orderInfo, receipt)); payState.clear(); } else { receiptState.update(receipt); ctx.timerService().registerEventTimeTimer(receipt.getTimeStamp() * 1000L + 5000L); } } @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<OrderInfo, Receipt>> out) throws Exception { if (payState.value() != null) { ctx.output(unmatchedPays, payState.value()); } if (receiptState.value() != null) { ctx.output(unmatchedReceipts, receiptState.value()); } payState.clear(); receiptState.clear(); super.onTimer(timestamp, ctx, out); } } }