Flink 用户电商行为分析项目

Flink 用户电商行为分析

1. 实时统计分析

1. 1 热门商品统计
  • 需求描述:每隔5分钟 实时展示1小时内的该网站的热门商品的TopN

  • 展示的数据形式:

    时间窗口信息:

    NO 1:商品ID+浏览次数1

    NO 2:商品ID+浏览次数2

    NO 1.商品ID+浏览次数3

  • 实现思路:

      1. 因为最终要窗口信息+商品ID 所有keyBy后需要全窗口函数 这样才能拿到窗口时间+key
      1. 而且需要浏览次数 所以需要增量聚合函数 keyBy聚合后来一条数据增量聚合一条 拿到浏览次数
      1. 以上1 2步骤后只能拿到 一个商品的浏览次数 所以为了拿到1小时内的 根据时间窗口keyBy 使用processFunction 窗口内的商品保存到ListStat中 定时器到达窗口截止时间 输出ListStat的数据
  • 代码


/**
 * 做什么 :统计一小时内热门商品 5分钟更新一次结果
 * 怎么做:
 * 1.既然输出1小时内商品信息,即输出历史数据,且每隔5分钟触发一次 即到达窗口结束的时候触发一次
 * 输出5分钟内保存的状态信息
 * 输出: 窗口结束时间 商品ID 热门数
 * <p>
 * 2 那么就要统计数出商品结束时间 商品ID 热门数
 * 热门数:增量聚合函数
 * 结束时间+商品ID:全窗口
 * <p>
 * 输出结果:
 * 窗口结束时间:2017-11-26 12:20:00.0
 * 窗口内容:
 * NO 1: 商品ID = 2338453 热门度 = 27
 * NO 2: 商品ID = 812879 热门度 = 18
 * NO 3: 商品ID = 4443059 热门度 = 18
 * NO 4: 商品ID = 3810981 热门度 = 14
 * NO 5: 商品ID = 2364679 热门度 = 14
 */
public class HotItemsPractise {

    public static void main(String[] args) throws Exception {
        //    1. 环境准备
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);
        DataStreamSource<String> inputStream = env.readTextFile("/Users/liangfangwei/IdeaProjects/flinkUserAnalays/data_file/UserBehavior.csv");
        //2. 准备数据源
        DataStream<ItemBean> filterStream = inputStream.map(line -> {
            String[] split = line.split(",");
            return new ItemBean(Long.parseLong(split[0]), Long.parseLong(split[1]), Integer.parseInt(split[2]), split[3], Long.parseLong(split[4]));

        }).filter(item -> "pv".equals(item.getBehavior()));
       //3. 收集一个商品的聚合结果
        DataStream<ItemViewCount> windowsResult = filterStream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<ItemBean>() {
            @Override
            public long extractAscendingTimestamp(ItemBean element) {
                return element.getTimestamp() * 1000L;
            }
        }).keyBy("itemId")
                .timeWindow(Time.hours(1), Time.minutes(5))
                .aggregate(new MyAggreateCount(), new MyAllWindowsView());
        //4. 收集一小时的聚合结果

        SingleOutputStreamOperator<String> windowEnd = windowsResult
                .keyBy("windowEnd")
                .process(new ItemHotTopN(5));
        windowEnd.print();

        env.execute("HotItemsPractise");
    }

    /**
     * 窗口函数 增量聚合 为了拿到同个商品的浏览次数
     */
    public static class MyAggreateCount implements AggregateFunction<ItemBean, Long, Long> {

        @Override
        public Long createAccumulator() {
            return 0L;
        }

        @Override
        public Long add(ItemBean value, Long accumulator) {
            return accumulator + 1L;
        }

        @Override
        public Long getResult(Long accumulator) {
            return accumulator;
        }

        @Override
        public Long merge(Long a, Long b) {
            return null;
        }
    }
    /**
     * 全函数:输入值是增量聚合的结果+key, 为了拿到时间窗口信息 窗口的截止时间+商品ID
     */
    public static class MyAllWindowsView implements WindowFunction<Long, ItemViewCount, Tuple, TimeWindow> {

        /**
         * @param tuple
         * @param window
         * @param input
         * @param out
         * @throws Exception
         */
        @Override
        public void apply(Tuple tuple, TimeWindow window, Iterable<Long> input, Collector<ItemViewCount> out) throws Exception {
            long windowEnd = window.getEnd();
            long count = input.iterator().next();
            long itemId = tuple.getField(0);

            out.collect(new ItemViewCount(itemId, windowEnd, count));
        }
    }

    /**
     * 将一小时内的商品 保存起来 时间窗口到了排序输出TopN
     */
    public static class ItemHotTopN extends KeyedProcessFunction<Tuple, ItemViewCount, String> {
        ListState<ItemViewCount> itemViewCountListState;
        private int topN;

        public ItemHotTopN(int topN) {
            this.topN = topN;
        }

        @Override
        public void open(Configuration parameters) throws Exception {
            itemViewCountListState = getRuntimeContext().getListState(new ListStateDescriptor<ItemViewCount>("itemViewCount", ItemViewCount.class));
        }

        @Override
        public void processElement(ItemViewCount itemViewCount, Context ctx, Collector<String> out) throws Exception {
            itemViewCountListState.add(itemViewCount);
            ctx.timerService().registerEventTimeTimer(itemViewCount.getWindowEnd() + 1L);
        }

        @Override
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
            // ListState转为ArrayList
            ArrayList<ItemViewCount> arraylist = Lists.newArrayList(itemViewCountListState.get().iterator());

            arraylist.sort(new Comparator<ItemViewCount>() {
                @Override
                public int compare(ItemViewCount o1, ItemViewCount o2) {
                    return o2.getCount().intValue() - o1.getCount().intValue();
                }
            });
            StringBuilder resultStringBuilder = new StringBuilder();
            resultStringBuilder.append("===================================" + "n");
            resultStringBuilder.append("窗口结束时间:").append(new Timestamp(timestamp).toString()).append("n");

            for (int i = 0; i < Math.min(topN, arraylist.size()); i++) {

                resultStringBuilder
                        .append("NO ")
                        .append(i + 1)
                        .append(": 商品ID = ")
                        .append(arraylist.get(i).getItemId())
                        .append(" 热门度 = ")
                        .append(arraylist.get(i).getCount())
                        .append("n");

            }
            resultStringBuilder.append("===================================n");
            out.collect(resultStringBuilder.toString());
            Thread.sleep(1000L);

        }

    }
}

1. 2 热门页面统计
  • 需求 :每隔5分钟输出一小时内浏览的热门页面
  • 输出结果展示:

窗口结束时间:2015-05-18 13:08:50.0

NO 1: 页面URL = /blog/tags/puppet?flav=rss20 热门度 = 11
NO 2: 页面URL = /projects/xdotool/xdotool.xhtml 热门度 = 5
NO 3: 页面URL = /projects/xdotool/ 热门度 = 4
NO 4: 页面URL = /?flav=rss20 热门度 = 4
NO 5: 页面URL = /robots.txt 热门度 = 4

  • 实现思路:和上一个不同的是 该数据源中的数据的时间非增量

    • 怎么保证保证乱序数据不丢
      • 1.所以要设置watermark与数据源之间的乱序程度
      • 2.设置一定的窗口延迟关闭时间 在初始的时间窗口到了 先聚合数据 后续再来属于该窗口的数据 来一条计算一条输出一条
      • 3.再有迟到的数据 则直接扔到侧输出流中
    • 怎么保证后续迟到的数据 来一条覆盖前面的数据
      • 1 先开窗增量聚合 再全窗口聚合 再根据窗口截止时间分组
      • 2 根据时间的截止窗口开窗key by 收集窗口截止时间内的所有数据 排序输出
      • 3 如果后续再来了延迟数据 需要更新之前的结果。所以把之间的数据存咋mapstat中 key为 页面url value为输出结果
  • 代码

    /**
     *
     * -                   _ooOoo_
     * -                  o8888888o
     * -                  88" . "88
     * -                  (| -_- |)
     * -                   O = /O
     * -               ____/`---'____
     * -             .   ' \| |// `.
     * -              / \||| : |||// 
     * -            / _||||| -:- |||||- 
     * -              | | \ - /// | |
     * -            | _| ''---/'' | |
     * -              .-__ `-` ___/-. /
     * -          ___`. .' /--.-- `. . __
     * -       ."" '< `.____<|>_/___.' >'"".
     * -      | | : `- `.;` _ /`;.`/ - ` : | |
     * -          `-. _ __ /__ _/ .-` / /
     * ======`-.____`-.________/___.-`____.-'======
     * `=---='
     * .............................................
     * 佛祖保佑             永无BUG
     * <p>
     * 需求
     * 每5分钟输出一次1小时之内排名前5的页面
     *  小时统计一次结果 ,即开窗是一小时 收集1小时内的统计结果,按照窗口结束时间输出窗口内的结果。窗口的滑动步长设置为5min
     */
    
    public class HotPages {
    
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            executionEnvironment.setParallelism(1);
    
            DataStreamSource<String> stringDataStreamSource = executionEnvironment.readTextFile("/Users/liangfangwei/IdeaProjects/flinkUserAnalays/data_file/apache.log");
    
            SimpleDateFormat simpleFormatter = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss");
    
            OutputTag<ApacheLogEvent> lateTag = new OutputTag<ApacheLogEvent>("late_date") {
            };
    
            DataStream<PageViewCount> streamPageViewCount = stringDataStreamSource.map(line -> {
                String[] s = line.split(" ");
                // 日期转时间戳
                Long timestamp = simpleFormatter.parse(s[3]).getTime();
                return new ApacheLogEvent(s[0], s[1], timestamp, s[5], s[6]);
            }).filter(date -> "GET".equals(date.getMethod()))
                    .filter(data -> {
                        // 过滤处css  js png ico 结尾的
                        String regex = "((?!\.(css|js|png|ico|jpg)$).)*$";
                        return Pattern.matches(regex, data.getUrl());
                    }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<ApacheLogEvent>(Time.seconds(1)) {
                        @Override
                        public long extractTimestamp(ApacheLogEvent apacheLogEvent) {
    
                            return apacheLogEvent.getTimestamp();
                        }
                    }).keyBy("url")
                    .timeWindow(Time.minutes(10), Time.seconds(5))
                    .allowedLateness(Time.minutes(1))
                    .sideOutputLateData(lateTag)
                    .aggregate(new HotPageIncreaseAgg(), new HotPageAllAgg());
    
            SingleOutputStreamOperator<String> windowEnd = streamPageViewCount
                    .keyBy("windowEnd")
                    .process(new MyProcessFunction(5));
            // 控制台输出
            windowEnd.print("data");
            windowEnd.getSideOutput(lateTag).print("late_date");
            executionEnvironment.execute();
    
        }
    
    
        public static class HotPageIncreaseAgg implements AggregateFunction<ApacheLogEvent, Long, Long> {
    
    
            @Override
            public Long createAccumulator() {
                return 0L;
            }
    
            @Override
            public Long add(ApacheLogEvent value, Long accumulator) {
                return accumulator + 1;
            }
    
            @Override
            public Long getResult(Long accumulator) {
                return accumulator;
            }
    
            @Override
            public Long merge(Long a, Long b) {
                return a + b;
            }
        }
    
        public static class HotPageAllAgg implements WindowFunction<Long, PageViewCount, Tuple, TimeWindow> {
    
    
            @Override
            public void apply(Tuple tuple, TimeWindow window, Iterable<Long> input, Collector<PageViewCount> out) throws Exception {
    
                String url = tuple.getField(0);
                Long count = input.iterator().next();
                long windowEnd = window.getEnd();
    
                out.collect(new PageViewCount(url, windowEnd, count));
            }
        }
    
        public static class MyProcessFunction extends KeyedProcessFunction<Tuple, PageViewCount, String> {
            private Integer topSize;
    
            MapState<String, Long> hotPageCount;
    
            public MyProcessFunction(Integer topSize) {
                this.topSize = topSize;
            }
    
            @Override
            public void open(Configuration parameters) throws Exception {
    
                  hotPageCount = getRuntimeContext().getMapState(new MapStateDescriptor<String, Long>("hot_page_count", String.class, Long.class));
            }
    
            /**
             * 如果有迟到数据 需要覆盖就的数据
             * 那么定义一个map 加入相同的key 会被覆盖
             * 如果时间超过1分钟 那就清除状态
             */
            @Override
            public void processElement(PageViewCount pageViewCount, Context ctx, Collector<String> out) throws Exception {
                // map 类型 如果key相同就更新
                     hotPageCount.put(pageViewCount.getUrl(),pageViewCount.getCount());
                    ctx.timerService().registerEventTimeTimer(pageViewCount.getWindowEnd()+1);
            }
    
    
            /**
             * 输出map中的结果
             * 定时器触发的时间:  watermark >= 定时时间
             * @param timestamp
             * @param ctx
             * @param out
             * @throws Exception
             */
            @Override
            public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws
                    Exception {
                Long currentKey = ctx.getCurrentKey().getField(0);
                // 判断是否到了窗口关闭清理的时间, 如果是 直接清空状态
               if (timestamp == currentKey + 60 * 1000L) {
                   hotPageCount.clear();
                    return;
                }
                ArrayList<Map.Entry<String, Long>> pageViewCounts = Lists.newArrayList(hotPageCount.entries());
    
                pageViewCounts.sort(new Comparator<Map.Entry<String, Long>>() {
                    @Override
                    public int compare(Map.Entry<String, Long> o1, Map.Entry<String, Long> o2) {
                        if(o1.getValue() > o2.getValue())
                            return -1;
                        else if(o1.getValue() < o2.getValue())
                            return 1;
                        else
                            return 0;
                    }
                });
                StringBuilder stringBuilder = new StringBuilder();
    
                stringBuilder.append("===================================n");
                stringBuilder.append("窗口结束时间:").append(new Timestamp(timestamp - 1)).append("n");
                for (int i = 0; i < Math.min(topSize, pageViewCounts.size()); i++) {
                    Map.Entry<String, Long> stringLongEntry = pageViewCounts.get(i);
                    stringBuilder.append("NO ").append(i + 1).append(":")
                            .append(" 页面URL = ").append(stringLongEntry.getKey())
                            .append(" 热门度 = ").append(stringLongEntry.getValue())
                            .append("n");
                }
                stringBuilder.append("===============================nn");
    
                // 控制输出频率
               Thread.sleep(1000L);
    
                out.collect(stringBuilder.toString());
    
            }
        }
    
    
    }
    
    
    
1. 3 网站uv统计
  • 需求:实时输出每小时内网站的uv

  • 输出格式: 窗口的截止时间+窗口的独立访问人数

  • 实现思路:

    • 1.设置滚动窗口为1小时 每来一条数据就要触发计算 那么就需要自定义触发器,

    • 2 . 触发器的方法是每条数据都去触发后续的统计逻辑 ,uerid去重,去重逻辑就是每条数据根据uerId去redis中查询,如果有那么丢弃 如果没有则count+1。

      1. 每条数据来了 解析出user ID,根据自定义的hash函数解析出在位图的位置,查询位置的值为1 则取出窗口截止时间对应的访问数并输出,如果为1 设置为1,取出访问数+1 输出 将更新后的count数存储到redis中

      存储格式:

      count :哈希结果 存储格式 “uv_count”,<窗口的截止时间,访问数>

      uesrId :位图

      hash函数:userId的当前位的Ascii*seed+上一位的统计结果

  • 代码



/**
 * @author :LiangFangWei
 * @date: 2021-12-21 15:55
 * -                   _ooOoo_
 * -                  o8888888o
 * -                  88" . "88
 * -                  (| -_- |)
 * -                   O = /O
 * -               ____/`---'____
 * -             .   ' \| |// `.
 * -              / \||| : |||// 
 * -            / _||||| -:- |||||- 
 * -              | | \ - /// | |
 * -            | _| ''---/'' | |
 * -              .-__ `-` ___/-. /
 * -          ___`. .' /--.-- `. . __
 * -       ."" '< `.____<|>_/___.' >'"".
 * -      | | : `- `.;` _ /`;.`/ - ` : | |
 * -          `-. _ __ /__ _/ .-` / /
 * ======`-.____`-.________/___.-`____.-'======
 * .............................................
 * -          佛祖保佑             永无BUG
 * <p>
 * 需求:实时输出统计每个小时内的的uv。每个小时内用户去重数实时输出
 * 思路:
 * <p>
 * 再一小时的时间窗口内,每来一条数据 触发计算 。
 * 计算逻辑:
 * 1.取当前数据去redis的位图中查有没有
 * 查询的key为          时间窗口的结束时间
 * 查询的offset为       userID的hash值
 * 2. 如果没有给查询的位置 置为1
 * <p>
 * 取判读redis的位图中有没有
 * 如果有丢弃 如果没有 count+1 将新的值存到redis中
 */


public class HotUVWithBloomFilter {
    public static void main(String[] args) throws Exception {
        //1.环境准备
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        executionEnvironment.setParallelism(1);
        // 2. 准备数据

        DataStreamSource<String> inputStream = executionEnvironment.readTextFile("/Users/liangfangwei/IdeaProjects/flinkUserAnalays/data_file/UserBehavior.csv");
        SingleOutputStreamOperator<ItemBean> filterData = inputStream.map(line -> {
            String[] split = line.split(",");
            return new ItemBean(Long.parseLong(split[0]), Long.parseLong(split[1]), Integer.parseInt(split[2]), split[3], Long.parseLong(split[4]));
        }).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<ItemBean>() {
            @Override
            public long extractAscendingTimestamp(ItemBean element) {
                return element.getTimestamp() * 1000L;
            }
        }).filter(itemBean -> "pv".equals(itemBean.getBehavior()));

        //2.滚动窗口为1小时
        SingleOutputStreamOperator<PageViewCount> streamOperator = filterData
                .timeWindowAll(Time.hours(1))
      //3.定义触发器 需要定义每来一条数据触发计算 而不是等全部的窗口再触发计算
                .trigger(new UVTriigger())
      // 4 计算逻辑 去redis的位图查是否有没有当前userID
                .process(new UVProcessFunction());
        // 5 如果没有则 需要插入进去
        streamOperator.print();
        executionEnvironment.execute();
    }

    /**
     * 定义静态内部类 不需要将类的定义额外写在class文件中
     */
    public static class UVTriigger extends Trigger<ItemBean, TimeWindow> {
        @Override
        public TriggerResult onElement(ItemBean element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
            return TriggerResult.FIRE_AND_PURGE;
        }

        @Override
        public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
            return TriggerResult.CONTINUE;
        }

        @Override
        public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
            return TriggerResult.CONTINUE;
        }

        @Override
        public void clear(TimeWindow window, TriggerContext ctx) throws Exception {

        }
    }

    public static class UVProcessFunction extends ProcessAllWindowFunction<ItemBean, PageViewCount, TimeWindow> {
        private Jedis jedis;
        private String pageCountKey = "uv_page_count";
        private BloomFilter bloomFilter;

        @Override
        public void open(Configuration parameters) throws Exception {
            jedis = new Jedis("localhost", 6379);
            bloomFilter = new BloomFilter(1 << 29);
        }


        /**
         * 来一条数据去redis中查
         *
         * @param context
         * @param elements
         * @param out
         * @throws Exception
         */
        @Override
        public void process(Context context, Iterable<ItemBean> elements, Collector<PageViewCount> out) throws Exception {
            Long windowEnd1 = context.window().getEnd();
            String windowEnd = windowEnd1.toString();
            ItemBean itemBean = elements.iterator().next();
            Long userId = itemBean.getUserId();
            long offset = bloomFilter.hash(userId.toString(), 61);

            Boolean isExist = jedis.getbit(windowEnd, offset);
            if (!isExist) {
                jedis.setbit(windowEnd, offset, true);
                // count值+1 cont值存储为hash结构
                Long uvCount = 0L;    // 初始count值

                String uvCountString = jedis.hget(pageCountKey, windowEnd);
                if (StringUtils.isNoneBlank(uvCountString)) {
                    uvCount = Long.valueOf(uvCountString);

                }
                jedis.hset(pageCountKey, windowEnd, String.valueOf(uvCount + 1));
                out.collect(new PageViewCount("uv", windowEnd1, uvCount + 1));


            }

        }
    }

    public static class BloomFilter {
        // 要去2的幂次方 result&(capacity-1) 才是求余的
        private long capacity;

        public BloomFilter(long capacity) {
            this.capacity = capacity;
        }

        public long hash(String userId, int seed) {
            long result = 0L;
            for (int i = 0; i < userId.length(); i++) {
                result = result * seed + userId.charAt(i);
            }

            return result & (capacity - 1);
        }

    }
}


2. 业务流程以及风险控制

2. 1 页面广告黑名单过滤
  • 需求: 输出每个省份每个广告的点击数。统计周期是一个小时 输出间隔是5分钟。要求如果当天某人都某个广告点击次数超过3次 则将该用户输出到侧输出流中 。如果当天内用户再次点击该广告 则计为无效 不做统计

  • 输出格式:

    • blacklist-user> BlackAdUerInfo(uerId=937166, adId=1715, count=click over 3times.)
      blacklist-user> BlackAdUerInfo(uerId=161501, adId=36156, count=click over 3times.)

      —>> AdOutputInfo(province=beijing, windowEnd=2017-11-26 09:25:00.0, count=2)

      —>> AdOutputInfo(province=guangdong, windowEnd=2017-11-26 09:25:00.0, count=5)
      —>> AdOutputInfo(province=beijing, windowEnd=2017-11-26 09:25:00.0, count=2)
      —>> AdOutputInfo(province=beijing, windowEnd=2017-11-26 09:30:00.0, count=2)
      —>> AdOutputInfo(province=guangdong, windowEnd=2017-11-26 09:30:00.0, count=5)
      —>> AdOutputInfo(province=shanghai, windowEnd=2017-11-26 09:30:00.0, count=2)

  • 统计逻辑:

    • 怎么过滤异常数据:根据uerid+adId keyBy 分组 再使用process,该分区每来一条数据 判断是否到达设置的点击数,如果没有则+1,并且输出该条记录 则将该用户的uerID加入到黑名单(侧输出流中) 并注册第二天凌晨的定时器 定时器第二天清空改用好点击次数的状态
    • 后面广告数的统计 就和前面统计方式如出一辙了
  • 代码



/**
 * -                   _ooOoo_
 * -                  o8888888o
 * -                  88" . "88
 * -                  (| -_- |)
 * -                   O = /O
 * -               ____/`---'____
 * -             .   ' \| |// `.
 * -              / \||| : |||// 
 * -            / _||||| -:- |||||- 
 * -              | | \ - /// | |
 * -            | _| ''---/'' | |
 * -              .-__ `-` ___/-. /
 * -          ___`. .' /--.-- `. . __
 * -       ."" '< `.____<|>_/___.' >'"".
 * -      | | : `- `.;` _ /`;.`/ - ` : | |
 * -          `-. _ __ /__ _/ .-` / /
 * ======`-.____`-.________/___.-`____.-'======
 * .............................................
 * -          佛祖保佑             永无BUG
 *
 * @author :LiangFangWei
 * @date: 2021-12-23 18:58
 * @description: 统计每个省份的每个广告的点击次数, 如果某个用户当天对广告的点击超过次数 输出作为一个流输出
 * <p>
 * 思路:
 * 1.最终输出形式
 * (省,窗口截止时间,总数)
 * 2. 创建增量聚合可以拿到总数 全窗口函数可以拿到窗口截止时间 和key
 * 3. 异常数据进行过滤,如果用户在某天对同一广告的点击次数如果超过一定次数 则单独作为流输出
 * 3.1 就要保存某个用户对某个广告的点击次数的状态,如果超过100次 并加入黑名单 如果在黑名单中直接返回 什么也不处理和统计
 * 3.2 如果没有超过 那么次数+1 输出数据
 */


public class AdStatisticsByProvince {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        executionEnvironment.setParallelism(1);
        DataStream<String> inputStream = executionEnvironment.readTextFile("/Users/liangfangwei/IdeaProjects/flinkUserAnalays/data_file/AdClickLog.csv");
        DataStream<AdvertInfo> processStream1 = inputStream.map(line -> {
            String[] split = line.split(",");
            return new AdvertInfo(split[0], split[1], split[2], Long.parseLong(split[4]));
        }).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<AdvertInfo>() {
            @Override
            public long extractAscendingTimestamp(AdvertInfo element) {
                return element.getTimeStramp() * 1000L;
            }
        });
        // 过滤掉异常的流数据
        SingleOutputStreamOperator<AdvertInfo> fliterBlackStream = processStream1
                .keyBy("userId", "adId")
                .process(new BlackUserProcess(3));

        DataStream<AdOutputInfo> resultStream = fliterBlackStream
                .keyBy("province")
                .timeWindow(Time.hours(1), Time.minutes(5))
                .aggregate(new IncreaseAggreateEle(), new AllAggreateCount());
        fliterBlackStream.getSideOutput(new OutputTag<BlackAdUerInfo>("blacklist"){}).print("blacklist-user");

        resultStream.print("--->");

        executionEnvironment.execute();
    }


    /**
     * 过滤处异常数据
     */

    public static class BlackUserProcess extends KeyedProcessFunction<Tuple, AdvertInfo, AdvertInfo> {
        ValueState<Long> adClickCount;
        ValueState<Boolean> isBlackUser;


        private int bound;

        public BlackUserProcess(int bound) {

            this.bound = bound;
        }

        @Override
        public void open(Configuration parameters) throws Exception {
            adClickCount = getRuntimeContext().getState(new ValueStateDescriptor<Long>("ad_click_count", Long.class, 0l));
            isBlackUser = getRuntimeContext().getState(new ValueStateDescriptor<Boolean>("is_black_user", Boolean.class, false));


        }

        /**
         * @param value
         * @param ctx
         * @param out
         * @throws Exception
         */
        @Override
        public void processElement(AdvertInfo value, Context ctx, Collector<AdvertInfo> out) throws Exception {
            // 1.判断是否到了设置的边界 注意状态只保留一天
            Long userIdClickCount = adClickCount.value();
            // 注册第二天的定时器 如果到了清楚状态
            Long timestamp = ctx.timerService().currentProcessingTime();
            Long clserTime = ((timestamp / 24 * 60 * 60 * 1000L) + 1) * 24 * 60 * 60 * 1000L - 8 * 60 * 60 * 1000;
            ctx.timerService().registerEventTimeTimer(clserTime);

            // 2.如果到了设置了边界
            if (userIdClickCount >= bound) {
                // 2.1 没有在黑名单中
                if (!isBlackUser.value()) {
                    // 加入黑名单 加入到侧输出流中
                    isBlackUser.update(true);
                    ctx.output(new OutputTag<BlackAdUerInfo>("blacklist") {
                               },
                            new BlackAdUerInfo(value.getUserId(), value.getAdId(), "click over " + userIdClickCount + "times."));
                }
                // 2.2 在黑名单 直接返回
                return;
            }


            // 3. 如果没有达到设置的边界 更新状态 输出该条数据
            adClickCount.update(userIdClickCount+1);
            out.collect(value);

        }


        @Override
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<AdvertInfo> out) throws Exception {
            adClickCount.clear();
            isBlackUser.clear();
        }
    }


    public static class IncreaseAggreateEle implements AggregateFunction<AdvertInfo, Long, Long> {


        @Override
        public Long createAccumulator() {
            return 0L;
        }

        @Override
        public Long add(AdvertInfo value, Long accumulator) {
            return accumulator+1;
        }

        @Override
        public Long getResult(Long accumulator) {
            return accumulator;
        }

        @Override
        public Long merge(Long a, Long b) {
            return a+b;
        }
    }


    public static class AllAggreateCount implements WindowFunction<Long, AdOutputInfo, Tuple, TimeWindow> {

        @Override
        public void apply(Tuple tuple, TimeWindow window, Iterable<Long> input, Collector<AdOutputInfo> out) throws Exception {
            Timestamp formateDate = new Timestamp(window.getEnd());
            out.collect(new AdOutputInfo(tuple.getField(0).toString(),formateDate.toString(),input.iterator().next()));
        }
    }
}


2. 2 恶意登陆监控
  • 需求:检测出两秒内 连续登陆失败的2次用用户

  • 实现思路:CEP编程 定义 2秒内连续登陆是吧失败两次的规则,并将流应用到改规则上 筛选出应用规则后的流



/**
 * -                   _ooOoo_
 * -                  o8888888o
 * -                  88" . "88
 * -                  (| -_- |)
 * -                   O = /O
 * -               ____/`---'____
 * -             .   ' \| |// `.
 * -              / \||| : |||// 
 * -            / _||||| -:- |||||- 
 * -              | | \ - /// | |
 * -            | _| ''---/'' | |
 * -              .-__ `-` ___/-. /
 * -          ___`. .' /--.-- `. . __
 * -       ."" '< `.____<|>_/___.' >'"".
 * -      | | : `- `.;` _ /`;.`/ - ` : | |
 * -          `-. _ __ /__ _/ .-` / /
 * ======`-.____`-.________/___.-`____.-'======
 * .............................................
 * -          佛祖保佑             永无BUG
 *
 * @author :LiangFangWei
 * @date: 2021-12-26 17:11
 * @description: 连续登陆失败检测 输出两秒内登陆失败3次的记录
 */


public class LoginCheck {

    public static void main(String[] args) throws Exception {

        // 1.定义环境
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(1);
        executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);


        DataStreamSource<String> stringDataStreamSource = executionEnvironment.readTextFile("/Users/liangfangwei/IdeaProjects/flinkUserAnalays/data_file/LoginLog.csv");

        // 2.包装为对象

        KeyedStream<LoginInfo, Tuple> keyedStream = stringDataStreamSource.map(line -> {
            String[] split = line.split(",");
            return new LoginInfo(split[0], split[2], Long.parseLong(split[3]));
        }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<LoginInfo>(Time.seconds(3)) {

            @Override
            public long extractTimestamp(LoginInfo element) {
                return element.getTimeStamp() * 1000L;
            }
        }).keyBy("status");


        // 3.定义规则
        // 3.1 创建规则
        Pattern<LoginInfo, LoginInfo> failPattern = Pattern.<LoginInfo>begin("loginFailEvent").where(new SimpleCondition<LoginInfo>() {
            @Override
            public boolean filter(LoginInfo value) throws Exception {
                return "fail".equals(value.getStatus());
            }
            // 连续三次登陆失败  consecutive 设置为严格近邻
        }).times(3).consecutive().within(Time.seconds(5));

        // 3.2 规则匹配到流上

        PatternStream<LoginInfo> pattern = CEP.pattern(keyedStream, failPattern);
        // 3.3 筛选数据
        SingleOutputStreamOperator selectStream = pattern.select(new PatternSelectFunction<LoginInfo, LoginFailInfo>() {

             /* 
             * Map中存储的是规则匹配上的数据
             */
            @Override
            public LoginFailInfo select(Map<String, List<LoginInfo>> pattern) throws Exception {
                List<LoginInfo> loginFailEvent = pattern.get("loginFailEvent");
                LoginInfo firstFail = loginFailEvent.get(0);
                String userId = firstFail.getUserId();
                LoginInfo lastFail = pattern.get("loginFailEvent").get(loginFailEvent.size()-1);
                Timestamp firstFailTimeStamp = new Timestamp(firstFail.getTimeStamp() * 1000L);
                Timestamp secondFailTimeStamp = new Timestamp(lastFail.getTimeStamp() * 1000L);
                return new LoginFailInfo(userId, firstFailTimeStamp.toString(), secondFailTimeStamp.toString(), "连续"+loginFailEvent.size()+"登陆失败");

            }
        });
        selectStream.print();
        executionEnvironment.execute();

    }


}

2. 3 订单支付失效监控
  • 需求:实时检测是15分钟内下单咩有支付的订单

  • 实现逻辑:

    • 定义CEP规则: 15分钟内有下单和支付的规则。
    • 匹配到流上
    • 从匹配的流上筛选出匹配的数据,并从map中解析出延迟数据和非延数据
      • 匹配的数据会封装到map中
      • 没匹配上的数据也会输出到map中
  • 代码



/**
 * -                   _ooOoo_
 * -                  o8888888o
 * -                  88" . "88
 * -                  (| -_- |)
 * -                   O = /O
 * -               ____/`---'____
 * -             .   ' \| |// `.
 * -              / \||| : |||// 
 * -            / _||||| -:- |||||- 
 * -              | | \ - /// | |
 * -            | _| ''---/'' | |
 * -              .-__ `-` ___/-. /
 * -          ___`. .' /--.-- `. . __
 * -       ."" '< `.____<|>_/___.' >'"".
 * -      | | : `- `.;` _ /`;.`/ - ` : | |
 * -          `-. _ __ /__ _/ .-` / /
 * ======`-.____`-.________/___.-`____.-'======
 * .............................................
 * -          佛祖保佑             永无BUG
 *
 * @description:检测15分钟内没有支付的订单
 */


public class OrderCheck {
    private static final Logger logger = LoggerFactory.getLogger(OrderCheck.class);

    public static void main(String[] args) throws Exception {
           // 1.
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(1);
        executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        DataStreamSource<String> stringDataStreamSource = executionEnvironment.readTextFile("/Users/liangfangwei/IdeaProjects/flinkUserAnalays/data_file/OrderLog.csv");
        SingleOutputStreamOperator<OrderInfo> objectSingleOutputStreamOperator = stringDataStreamSource.map(line -> {
            String[] split = line.split(",");
            return new OrderInfo(split[0], split[1], split[2], Long.parseLong(split[3]));
        }).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<OrderInfo>() {

            @Override
            public long extractAscendingTimestamp(OrderInfo element) {
                return element.getTimeStamp()*1000L;
            }
        });


        // 2  定义规则

        Pattern<OrderInfo, OrderInfo> orderPayPattern = Pattern.<OrderInfo>begin("create").where(new SimpleCondition<OrderInfo>() {
            @Override
            public boolean filter(OrderInfo value) throws Exception {
                return "create".equals(value.getStatus());
            }
        }).followedBy("pay").where(new SimpleCondition<OrderInfo>() {

            @Override
            public boolean filter(OrderInfo value) throws Exception {
                return "pay".equals(value.getStatus());
            }
    }).within(Time.minutes(15));
        // 3. 匹配模式
        PatternStream<OrderInfo> orderStream = CEP.pattern(objectSingleOutputStreamOperator.keyBy("orderId"), orderPayPattern);
        OutputTag<OrderTimeoutInfo> outputTag = new OutputTag<OrderTimeoutInfo>("timeoutStream") {
        };
        // 4. 筛选输出匹配上和超时事件
        SingleOutputStreamOperator<OrderTimeoutInfo> resultStream = orderStream.select(outputTag, new OrderTimeoutSelect(), new OrderPaySelect());
        resultStream.print("payed normally");
        resultStream.getSideOutput(outputTag).print("timeout");

        executionEnvironment.execute("order timeout detect job");
    }

    /**
     * 什么时候 会判断超时:定义的时间范围内如果没有匹配上或者 就判断超时
     * 输出到哪里:超时事件会输出到侧输出流中
     */
    public static class OrderTimeoutSelect implements PatternTimeoutFunction<OrderInfo,OrderTimeoutInfo>{
    @Override
    public OrderTimeoutInfo timeout(Map<String, List<OrderInfo>> pattern, long timeoutTimestamp) throws Exception {
        logger.error("rrrr_locker: get locker fail: key={}", pattern.toString());
        OrderInfo OrderInfo = pattern.get("create").get(0);

        return new OrderTimeoutInfo(OrderInfo.getOrderId(),"timeout"+timeoutTimestamp);
    }



    }

    public  static  class  OrderPaySelect implements PatternSelectFunction<OrderInfo, OrderTimeoutInfo>{
        @Override
        public OrderTimeoutInfo select(Map<String, List<OrderInfo>> pattern) throws Exception {
            OrderInfo OrderInfo = pattern.get("pay").get(0);

            return new OrderTimeoutInfo(OrderInfo.getOrderId(),"pay");
        }
    }

}

2. 4 支付实时对账
  • 双流join

  • 代码

    
    
    /**
     * -                   _ooOoo_
     * -                  o8888888o
     * -                  88" . "88
     * -                  (| -_- |)
     * -                   O = /O
     * -               ____/`---'____
     * -             .   ' \| |// `.
     * -              / \||| : |||// 
     * -            / _||||| -:- |||||- 
     * -              | | \ - /// | |
     * -            | _| ''---/'' | |
     * -              .-__ `-` ___/-. /
     * -          ___`. .' /--.-- `. . __
     * -       ."" '< `.____<|>_/___.' >'"".
     * -      | | : `- `.;` _ /`;.`/ - ` : | |
     * -          `-. _ __ /__ _/ .-` / /
     * ======`-.____`-.________/___.-`____.-'======
     * .............................................
     * -          佛祖保佑             永无BUG
     * @author :LiangFangWei
     * @description: 检测订单是否到账
     */
    
    
    public class OrderPay {
        private final static OutputTag<OrderInfo> unmatchedPays = new OutputTag<OrderInfo>("unmatchedPays") {
        };
    
    
        private final static OutputTag<Receipt> unmatchedReceipts = new OutputTag<Receipt>("unmatchedReceipts") {
        };
    
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    
            //1. 支付数据
            DataStreamSource<String> inputSteam1 = env.readTextFile("/Users/liangfangwei/IdeaProjects/flinkUserAnalays/data_file/OrderLog.csv");
            SingleOutputStreamOperator<OrderInfo> orderStream = inputSteam1.map(line -> {
                String[] split = line.split(",");
                return new OrderInfo(split[0], split[1], split[2], Long.parseLong(split[3]));
            }).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<OrderInfo>() {
    
                @Override
                public long extractAscendingTimestamp(OrderInfo element) {
                    return element.getTimeStamp() * 1000L;
                }
            });
            // 2.入账数据
            DataStreamSource<String> inputStream2 = env.readTextFile("/Users/liangfangwei/IdeaProjects/flinkUserAnalays/data_file/OrderLog.csv");
            SingleOutputStreamOperator<Receipt> payStream = inputStream2.map(line -> {
                String[] split = line.split(",");
                return new Receipt(split[0], split[1], Long.parseLong(split[2]));
            }).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Receipt>() {
    
                @Override
                public long extractAscendingTimestamp(Receipt element) {
                    return element.getTimeStamp() * 1000L;
                }
            });
            // 3.双里join
            SingleOutputStreamOperator<Tuple2<OrderInfo, Receipt>> resultStream = orderStream.keyBy("payId").connect(payStream.keyBy("payId")).process(new DoubleStreamJoinProcess());
    
    
            // 4.如果join上返回
            resultStream.print("matched");
            resultStream.getSideOutput(unmatchedPays).print("unmatchedPays");
        }
    
    
        public static class DoubleStreamJoinProcess extends CoProcessFunction<OrderInfo, Receipt, Tuple2<OrderInfo, Receipt>> {
    
            ValueState<OrderInfo> payState;
            ValueState<Receipt> receiptState;
    
            @Override
            public void open(Configuration parameters) throws Exception {
                payState = getRuntimeContext().getState(new ValueStateDescriptor<OrderInfo>("pay", OrderInfo.class));
                receiptState = getRuntimeContext().getState(new ValueStateDescriptor<Receipt>("receipt", Receipt.class));
    
            }
    
    
            @Override
            public void processElement1(OrderInfo orderInfo, Context ctx, Collector<Tuple2<OrderInfo, Receipt>> out) throws Exception {
    
                Receipt receipt = receiptState.value();
                //  取出流2
                if (receipt != null) {
                    out.collect(new Tuple2<>(orderInfo, receipt));
                    receiptState.clear();
                } else {
                    payState.update(orderInfo);
                    ctx.timerService().registerEventTimeTimer(orderInfo.getTimeStamp() * 1000L + 5000L);
    
                }
            }
    
            @Override
            public void processElement2(Receipt receipt, Context ctx, Collector<Tuple2<OrderInfo, Receipt>> out) throws Exception {
    
    
                // 取出流1
                OrderInfo orderInfo = payState.value();
                if (orderInfo != null) {
                    out.collect(new Tuple2<>(orderInfo, receipt));
                    payState.clear();
                } else {
                    receiptState.update(receipt);
                    ctx.timerService().registerEventTimeTimer(receipt.getTimeStamp() * 1000L + 5000L);
    
    
                }
            }
    
            @Override
            public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<OrderInfo, Receipt>> out) throws Exception {
                if (payState.value() != null) {
                    ctx.output(unmatchedPays, payState.value());
                }
                if (receiptState.value() != null) {
                    ctx.output(unmatchedReceipts, receiptState.value());
                }
                payState.clear();
                receiptState.clear();
    
                super.onTimer(timestamp, ctx, out);
            }
        }
    
    }
    
    

3. 项目地址

项目地址欢迎大家来踩踩踩

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
THE END
分享
二维码
< <上一篇
下一篇>>