编译kafka3.0.0和2.8.0源代码

编译kafka3.0.0和2.8.0源代码

这个过程历时5天期间遇到了无数的坑点,特此记录

版本和环境

ide: idea2021

系统:windows10

kafka版本:最初我想在本地运行kafka3.0.0版本,但是最终因为很多的问题,我的本地kafka版本定格在了2.8.0。下文中也会阐述kafka3.0版本在windows10版本上的bug

  • git方式:https://github.com/apache/kafka
  • 下载地址:https://github.com/apache/kafka/tags

scala:我下载了很多的版本,最初我使用scala3.1.0版本,但是这个版本的scala版本显然太高了,目前kafka在kafka3.0.0才开始淘汰scala2.12,我最终选择了scala2.13.6版本,小版本的不一致是可以被允许的。

  • 下载地址:https://github.com/lampepfl/dotty/releases

java 11

gradle :kafka3.0.0的gradle版本是7.1.1,但是我在使用gradle7以上的版本build kafka2.8版本时遇到了build fail的情况,所以这个版本我选择了6.8

  • 下载地址:https://gradle.org/releases/

zookeeper:zookeeper的版本对于这个的影响不大,所以我使用的是最新的3.7.0

  • 下载地址:https://zookeeper.apache.org/releases.html
  • zookeeper不建议安装在本地windows系统上,我安装在了我的云服务器上,部署了一个伪集群。这里网上的教程很多就不在这里赘述

流程

  • 在idea上安装scala插件:在插件市场上搜索scala

在这里插入图片描述

  • 对下载好的kafka源代码cmd打开,执行gradle idea命令,我尝试过网上的换源方法,只能说一句,没什么用,如果没有vpn就等吧,我下载了2小时50分钟。

  • 用idea打开kafka项目

  • 配置本地gradle版本,如果不配置他会给你自动下载一个gradle

在这里插入图片描述

  • 配置scala,有的idea可能不需要这一步

在这里插入图片描述

  • 等待gradle build项目:为什么要单独拿出来呢,因为这个等待时间也太长了,要下载很多的jar包

  • 打开终端,执行gradle build --exclude-task test,一定要排除test,因为有的test跑不过可能会build失败

  • 配置zookeeper依赖版本:项目中使用的zookeeper jar包并不是3.7.0所以要去gradle目录下的dependencies.gradle中修改zookeeper版本

在这里插入图片描述

  • 修改config目录下的server.peoperties文件

    • log.dirs=E:\kafka-logs
      
    • zookeeper.connect=106.52.87.219:2181,106.52.87.219:3181,106.52.87.219:4181
      
    • 只需要修改这两个配置

  • 把log4.properties文件放到:

在这里插入图片描述

  • 修改log4j.properties文件,因为我们只需要在控制台打印日志,所以修改一下这个文件,只保留控制台输出这里,剩下的注释掉就好了

在这里插入图片描述

  • 修改build.gradle,把这两行注释掉,因为不注释掉,就打印不出来日志。手动也引入不了slf4j-log4j12这个包,太坑了。

在这里插入图片描述

  • 配置启动

在这里插入图片描述

坑!!!!

  • [2022-01-08 15:24:45,283] ERROR Failed to write meta.properties due to (kafka.server.BrokerMetadataCheckpoint)
    java.nio.file.AccessDeniedException: E:kafka-logs
    

    这是最大的一个坑,这个问题只会出现在kafka3.0.0中,kafka对于windows的兼容性不好,降级到2.8版本以后就没有这个问题了,我通过debug猜测可能是nio他开辟buffer的时候的bug,例如我的保存目录是E:kafka-logs他的buffer只有23byte,可能是这里的原因,和meta.properties的权限没有半毛钱关系。

  • 在gradle idea的时候报错bad gateway 502,网上一堆换源的,我换了也不行,最后发现就是网不好。(我不知道为什么很多时候我换了源没有起作用)

  • 没有日志输出,还会报错说没有slf4j-log4j12这个包里面的类,这个问题在上述的流程中已经解决了

  • 当你替换了kafka版本以后,切记要把zookeeper初始化一下,就是把data目录和logs目录里面的东西删了,data目录只保留myid文件,然后再启动。光清空data目录,我再启动发现启动不起来。

  • 这条本来想写在最后的,但是防止有人不看完,只有在完成了第一次消费以后才会出现_consumer_offset这个topic
    在这里插入图片描述

附上生产,消费demo

	<dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>2.0.0</version>
    </dependency>
// 生产者
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class KafkaTest {
    public static final String brokerList = "localhost:9092";
    public static final String topic = "topic-demo";

    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put("key.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("bootstrap.servers", brokerList);


        KafkaProducer<String, String> producer =
                new KafkaProducer<>(properties);
        ProducerRecord<String, String> record =
                new ProducerRecord<>(topic, "hello, Kafka!");
        try {
            producer.send(record);
        } catch (Exception e) {
            e.printStackTrace();
        }
        producer.close();
    }
}
// 消费者
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class KafkaConTest {
    public static final String brokerList = "localhost:9092";
    public static final String topic = "topic-demo";
    public static final String groupId = "group.demo";

    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put("key.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("bootstrap.servers", brokerList);
        //设置消费组的名称,具体的释义可以参见第3章
        properties.put("group.id", groupId);
        //创建一个消费者客户端实例
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        //订阅主题
        consumer.subscribe(Collections.singletonList(topic));
        //循环消费消息
        while (true) {
            ConsumerRecords<String, String> records =
                    consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record.value());
            }
        }
    }
}

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
THE END
分享
二维码
< <上一篇
下一篇>>