Kafka+PowerJob实现延时消息、定时消息,动态控制消息发送时间

前言:因为公司需要一个kafka延时消息的组件服务,看了下市面上的实现kafka延时消息的实现,感觉都比较复杂难理解,自己就去研究了下使用其他中间件进行解决,于是有了这篇分享文章

实现技术:SpringBoot+kafka+powerjob(最新的分布式任务调度产品)

思路:powerjob是一款非常不错的java分布式任务调度产品,配合这个产品来定时调度我们的kafka的producer从而实现了延迟消息、定时消息

本项目之后的扩展:

1、甚至能在可视化界面动态控制消息的发送时间

2、可在消息发送后动态指定接下来业务的走向(例如发送消息后需要干某件些其他事情,也可以在这里动态控制每个事情的执行顺序)

前提:kafka环境已经装好

大家可以去官方了解下 地址:PowerJob

项目我自己改了官方的后 也开源了:https://gitee.com/yangjial/powerjob.git

整体结构:

 

第一步:启动调度任务服务 

1.1修改配置文件、

依赖一个MySql5.7以上的数据库,在application-daily.properties配置中改(根据自己的配置文件)随便创建一个新库即可

1.2启动 PowerJobServerApplication

启动成功后访问http://localhost:7700/  会出现如下界面

 1.3 创建执行应用(创建执行者)

应用名称:一般用项目名称

密码:自定义就好

 创建好之后登录进入

以上powerjob服务就启动成功了

第二步:启动任务执行者(kafka服务)

2.1 结构

 2.2 先看配置文件application.yml

kafka的配置我就不讲了,大家根据自己要求配置

主要看powerjob的配置 ,

1、工作端口默认2777

2、接入的应用名称和密码 就是第一步我们在界面上面注册的应用名称 一定要相同

3、调度服务的地址就是刚刚第一步启动服务的地址

2.3 看PowerJobUtil工具类

这里引入了powerjob的openApi  文档: OpenAPI · 语雀

saveJob方法:创建任务或者更改任务,每个任务都有ID如果传入ID就是修改任务,ID为空就是创建任务,具体参数可以看注释

重要参数说明:

1、StartTime:延时最重要的是时间参数,powerjob支持cron表达式,将要延时的时间转换成cron格式后调用API即可创建定时任务

2、params:这是任务参数,也就是业务参数,在本章节就是消息内容

3、processorInfo:执行者全类名,指定执行者类(本项目中也就是KafkaProducer的类路径)

4、jobId:任务id,如果为空就是创建任务

package com.kafka.cloud.util;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import tech.powerjob.client.PowerJobClient;
import tech.powerjob.common.enums.ExecuteType;
import tech.powerjob.common.enums.ProcessorType;
import tech.powerjob.common.enums.TimeExpressionType;
import tech.powerjob.common.request.http.SaveJobInfoRequest;
import tech.powerjob.common.response.ResultDTO;

import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import java.util.concurrent.TimeUnit;

/**
 * powerJob的api封装成一个工具类,供外部通过调用生产任务
 * @author YangBoos
 * @date 2021-11-22
 */
@Configuration
public class PowerJobUtil {

    protected final Logger logger = LoggerFactory.getLogger(PowerJobUtil.class);

    @Autowired
    private PowerJobClient ohMyClient;

    @Bean
    public PowerJobUtil getPowerJobUtil() {
        return new PowerJobUtil();
    }
    
    /**
     * 创建一个单核任务
     *
     * @param StartTime     任务开始时间
     * @param params        任务参数
     * @param processorInfo 回调得类全类名
     * @param jobId         任务id:如果为空就是创建任务
     * @return 返回结果
     * @throws Exception
     */
    public ResultDTO<Long> saveJob(Date StartTime, String params, String processorInfo, String jobId, String jobName, String jobDescription) throws Exception {
        logger.info("saveJob .......{},{},{},{}", StartTime, params, processorInfo, jobId);

        SaveJobInfoRequest request = new SaveJobInfoRequest();
        if (StringUtils.isNotEmpty(jobId)) {
            request.setId(Long.valueOf(jobId));
        }
        //任务名称
        request.setJobName(jobName);
        //任务描述
        request.setJobDescription(jobDescription);
        //任务参数,Processor#process方法入参TaskContext对象的jobParams字段
        request.setJobParams(params);
        //时间表达式类型,枚举值
        request.setTimeExpressionType(TimeExpressionType.CRON);
        //时间表达式,填写类型由timeExpressionType决定,比如CRON需要填写CRON表达式
        request.setTimeExpression(getCron(StartTime));
        //执行类型,枚举值
        request.setExecuteType(ExecuteType.STANDALONE);
        //处理器类型,枚举值
        request.setProcessorType(ProcessorType.BUILT_IN);
        //处理器参数,填写类型由processorType决定,如Java处理器需要填写全限定类名,如:com.github.kfcfans.oms.processors.demo.MapReduceProcessorDemo
        request.setProcessorInfo(processorInfo);
        //最大实例数,该任务同时执行的数量(任务和实例就像是类和对象的关系,任务被调度执行后被称为实例)
        request.setMaxInstanceNum(1);
        //单机线程并发数,表示该实例执行过程中每个Worker使用的线程数量
        request.setConcurrency(1);
        //任务实例运行时间限制,0代表无任何限制,超时会被打断并判定为执行失败
        request.setInstanceTimeLimit(0L);
        //任务实例重试次数,整个任务失败时重试,代价大,不推荐使用
        request.setMaxInstanceNum(0);
        //Task重试次数,每个子Task失败后单独重试,代价小,推荐使用
        request.setTaskRetryNum(2);
        //最小可用CPU核心数,CPU可用核心数小于该值的Worker将不会执行该任务,0代表无任何限制
        request.setMinCpuCores(0);
        //最小内存大小(GB),可用内存小于该值的Worker将不会执行该任务,0代表无任何限制
        request.setMinMemorySpace(0);
        //最小磁盘大小(GB),可用磁盘空间小于该值的Worker将不会执行该任务,0代表无任何限制
        request.setMinDiskSpace(0);
        //指定机器执行,设置该参数后只有列表中的机器允许执行该任务,空代表不指定机器
        request.setDesignatedWorkers(null);
        //最大执行机器数量,限定调动执行的机器数量,0代表无限制
        request.setMaxWorkerCount(1);
        //是否启用该任务,未启用的任务不会被调度
        request.setEnable(true);

        ResultDTO<Long> resultDTO = ohMyClient.saveJob(request);
        return resultDTO;
    }

    /**
     * 禁用某个任务
     *
     * @param jobId
     * @return
     * @throws Exception
     */
    public ResultDTO<Void> disableJob(Long jobId) {
        logger.info("disableJob .......{}", jobId);
        try {
            TimeUnit.MINUTES.sleep(5);
            return ohMyClient.disableJob(jobId);
        } catch (Exception e) {
            logger.error("disableJob  error.......{},{}", e, jobId);
        }
        return null;
    }

    /**
     * 删除某个任务
     *
     * @param jobId
     * @return
     * @throws Exception
     */
    public ResultDTO<Void> deleteJob(Long jobId) throws Exception {
        return ohMyClient.deleteJob(jobId);
    }

    /**
     * 通过输入指定日期时间生成cron表达式
     *
     * @param date
     * @return cron表达式
     */
    public String getCron(Date date) {
        SimpleDateFormat timeSdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        System.out.println("消息产生时间:"+timeSdf.format(new Date()));
        System.out.println("预计消费时间:"+timeSdf.format(date));
        String dateFormat = "ss mm HH dd MM ? yyyy-yyyy";
        SimpleDateFormat sdf = new SimpleDateFormat(dateFormat);
        String formatTimeStr = null;
        if (date != null) {
            formatTimeStr = sdf.format(date);
        }
        System.out.println("任务时间得CRON:" + formatTimeStr);
        return formatTimeStr;
    }
}

2.4 看具体执行者类KafkaProducer(kafka生产类)

这里是具体执行业务的地方,当定时到点就会执行process方法,其中参数context可获取所有任务体内容 包括之前提到的参数也就是消息,在这个方法中发送kafka消息即可

package com.kafka.cloud.producer;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import tech.powerjob.worker.core.processor.ProcessResult;
import tech.powerjob.worker.core.processor.TaskContext;
import tech.powerjob.worker.core.processor.sdk.BasicProcessor;

/**
 *  kafka的生产,也是调度任务的消费者
 * @author YangBoos
 * @date 2021-11-22
 */
@Component
public class KafkaProducer  implements BasicProcessor {


    private final static String TOPIC_NAME = "test01"; //topic的名称

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    /**
     * 任务执行者,具体业务执行方法,也就是我们发送消息的方法
     * @param context 任务体 其中包括非常多的属性,可看官方文档解释
     * @return
     */
    @Override
    public ProcessResult process(TaskContext context) {
        String params = context.getJobParams();
        //发送一个简单的消息
        kafkaTemplate.send(TOPIC_NAME  , params);
        return new ProcessResult(true, context.getJobId() + " process successfully.");
    }

}

2.5 看消费者 KafkaConsumer类

这里就是普通的消费 ,将时间消费消息时间打印

package com.kafka.cloud.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * kafka消费者
 * @author YangBoos
 * @date 2021-11-22
 */
@Component
public class KafkaConsumer {

    //kafka的监听器,topic为"zhTest",消费者组为"zhTestGroup"
    @KafkaListener(topics = "test01", groupId = "zhTestGroup")
    public void listenZhugeGroup(ConsumerRecord<String, String> record, Acknowledgment ack) {
        SimpleDateFormat timeSdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        System.out.println("消息实际消费时间:"+timeSdf.format(new Date()));
        String value = record.value();
        System.out.println(value);
        System.out.println(record);
        //手动提交offset
        ack.acknowledge();
    }
}

2.6 看接口调用类 调用任务生成 KafkaController

通过调用找个接口 生成一个1分钟后执行的任务,此任务的执行内容就是发送一个kafka消息

参数:

1、指定任务执行时间

2、执行者具体类路径

3、任务ID  为null表示创建任务,如果修改任务就些具体任务ID,可以动态改变任务执行时间等各种功能

4、任务标题

5、任务说明

package com.kafka.cloud.controller;

import com.kafka.cloud.util.PowerJobUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import tech.powerjob.common.response.ResultDTO;

import java.util.Calendar;
import java.util.Date;


/**
 * 测试接口
 * @author YangBoos
 * @date 2021-11-22
 */
@RestController
public class KafkaController {
    /**
     * 调度任务
     */
    @Autowired
    private PowerJobUtil powerJobUtil;


    @RequestMapping(value = "/send")
    public String send() {
        //获取1分钟后时间(一分钟后才进行消息发送)
        Calendar beforeTime = Calendar.getInstance();
        beforeTime.add(Calendar.MINUTE, +1);
        Date beforeDate = beforeTime.getTime();
        ResultDTO<Long> resultDTO = null;
        try {
            //生成任务
            resultDTO = powerJobUtil.saveJob(beforeDate, "这是一个延迟1分钟的消息",
                    "com.kafka.cloud.producer.KafkaProducer",
                    null,"kafka定时任务",
                    "kafka消息定时-开始任务");
            if (resultDTO.isSuccess()) {
                return "创建任务成功";
            } else {
                return "创建任务失败,请联系管理员";
            }
        } catch (Exception e) {
            e.printStackTrace();
            return "创建任务异常:"+e.getMessage();
        }
    }
}

2.7 启动执行者服务(kafka服务)PowerjobKafkaApplication

2.8 测试调用

启动成功后,调用 http://localhost:8080/send 可以看见创建任务成功

 2.9 重新回到第一步的可视化界面刷新

首页可以看见具体连入的应用,可以看见任务也多了一个

看到左侧的任务,可以看见我们刚刚接口创建的任务和具体任务执行时间

 

点击编辑也可以 查看所有详情内容

甚至可以动态去修改这个任务的所有参数,达到动态改变消息的发送时间

, 

也可以查看这个任务的执行记录 日志,操作重试等等

 

2.10 看到控制台

 可以看到任务已经创建 并且打印了一分钟后的时间

 等待一分钟后 ,任务准时执行 并发送消息 实时消费消息

 到此 kafka延时消息就完成了

任务如果已经过了时间可以在界面上看到状态已停止 

此项目可以扩展很多功能,消息发送后 建议调用API将任务删除,不用保留无用资源.

扩展:powerjob还可以可视化的指定工作流,可在消息发送后执行其他顺序任务

 

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
THE END
分享
二维码
< <上一篇
下一篇>>