Kafka3.0源码学习

kafka官网:https://kafka.apache.org/downloads

一、生产者源码

1、初始化

生产者 main 线程初始化,点击 main()方法中的 KafkaProducer()

KafkaProducer(ProducerConfig config,
          Serializer<K> keySerializer,
          Serializer<V> valueSerializer,
          ProducerMetadata metadata,
          KafkaClient kafkaClient,
          ProducerInterceptors<K, V> interceptors,
          Time time) {
try {
    this.producerConfig = config;
    this.time = time;

    // 获取事务id
    String transactionalId = config.getString(ProducerConfig.TRANSACTIONAL_ID_CONFIG);

    // 获取客户端id
    this.clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);

    ......
    // 监控kafka运行情况
    JmxReporter jmxReporter = new JmxReporter();
    jmxReporter.configure(config.originals(Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)));
    reporters.add(jmxReporter);
    MetricsContext metricsContext = new KafkaMetricsContext(JMX_PREFIX,
            config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX));
    this.metrics = new Metrics(metricConfig, reporters, time, metricsContext);
    // 获取分区器
    this.partitioner = config.getConfiguredInstance(
            ProducerConfig.PARTITIONER_CLASS_CONFIG,
            Partitioner.class,
            Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));
    long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
    // key和value的序列化
    if (keySerializer == null) {
        this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                                                                                 Serializer.class);
        this.keySerializer.configure(config.originals(Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)), true);
    }
    ......
    // 拦截器处理(拦截器可以有多个)
    List<ProducerInterceptor<K, V>> interceptorList = (List) config.getConfiguredInstances(
            ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
            ProducerInterceptor.class,
            Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));
    ......
    // 单条日志大小 默认1m
    this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
    // 缓冲区大小 默认32m
    this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
    // 压缩,默认是none
    this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));

    this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
    int deliveryTimeoutMs = configureDeliveryTimeout(config, log);

    this.apiVersions = new ApiVersions();
    this.transactionManager = configureTransactionState(config, logContext);
    // 缓冲区对象 默认是32m
    // 批次大小 默认16k
    // 压缩方式,默认是none
    // liner.ms 默认是0
    //  内存池
    this.accumulator = new RecordAccumulator(logContext,
            config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
            this.compressionType,
            lingerMs(config),
            retryBackoffMs,
            deliveryTimeoutMs,
            metrics,
            PRODUCER_METRIC_GROUP_NAME,
            time,
            apiVersions,
            transactionManager,
            new BufferPool(this.totalMemorySize, config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), metrics, time, PRODUCER_METRIC_GROUP_NAME));

    // 连接上kafka集群地址
    List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(
            config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG),
            config.getString(ProducerConfig.CLIENT_DNS_LOOKUP_CONFIG));
    // 获取元数据
    if (metadata != null) {
        this.metadata = metadata;
    } else {
        this.metadata = new ProducerMetadata(retryBackoffMs,
                config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG),
                config.getLong(ProducerConfig.METADATA_MAX_IDLE_CONFIG),
                logContext,
                clusterResourceListeners,
                Time.SYSTEM);
        this.metadata.bootstrap(addresses);
    }
    this.errors = this.metrics.sensor("errors");

    this.sender = newSender(logContext, kafkaClient, this.metadata);
    String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
    // 把sender线程放到后台
    this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
    // 启动sender线程
    this.ioThread.start();
    config.logUnused();
    AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics, time.milliseconds());
    log.debug("Kafka producer started");
}
......
}

生产者 sender 线程初始化,KafkaProducer.java中点击 newSender()方法,查看发送线程初始化

Sender newSender(LogContext logContext, KafkaClient kafkaClient, ProducerMetadata metadata) {
    // 缓存请求的个数 默认是5个
    int maxInflightRequests = configureInflightRequests(producerConfig);
    // 请求超时时间,默认30s
    int requestTimeoutMs = producerConfig.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
    ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(producerConfig, time, logContext);
    ProducerMetrics metricsRegistry = new ProducerMetrics(this.metrics);
    Sensor throttleTimeSensor = Sender.throttleTimeSensor(metricsRegistry.senderMetrics);

    // 创建一个客户端对象
    // clientId  客户端id
    // maxInflightRequests  缓存请求的个数 默认是5个
    // RECONNECT_BACKOFF_MS_CONFIG 重试时间
    // RECONNECT_BACKOFF_MAX_MS_CONFIG 总的重试时间
    // 发送缓冲区大小send.buffer.bytes  默认128kb
    // 接收数据缓存 receive.buffer.bytes 默认是32kb
    KafkaClient client = kafkaClient != null ? kafkaClient : new NetworkClient(
            new Selector(producerConfig.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),
                    this.metrics, time, "producer", channelBuilder, logContext),
            metadata,
            clientId,
            maxInflightRequests,
            producerConfig.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
            producerConfig.getLong(ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG),
            producerConfig.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
            producerConfig.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
            requestTimeoutMs,
            producerConfig.getLong(ProducerConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG),
            producerConfig.getLong(ProducerConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG),
            time,
            true,
            apiVersions,
            throttleTimeSensor,
            logContext);
    // 0 :生产者发送过来,不需要应答;  1 :leader收到,应答;  -1 :leader和isr队列里面所有的都收到了应答
    short acks = configureAcks(producerConfig, log);
    // 创建sender线程
    return new Sender(logContext,
            client,
            metadata,
            this.accumulator,
            maxInflightRequests == 1,
            producerConfig.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
            acks,
            producerConfig.getInt(ProducerConfig.RETRIES_CONFIG),
            metricsRegistry.senderMetrics,
            time,
            requestTimeoutMs,
            producerConfig.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG),
            this.transactionManager,
            apiVersions);
}

Sender 对象被放到了一个线程中启动,所有需要点击 newSender()方法中的 Sender,并找到 sender 对象中的 run()方法

@Override
public void run() {
  ......
  while (!forceClose && ((this.accumulator.hasUndrained() || this.client.inFlightRequestCount() > 0) || hasPendingTransactionalRequests())) {
      try {
      // sender 线程从缓冲区准备拉取数据,刚启动拉不到数据
          runOnce();
      } catch (Exception e) {
          log.error("Uncaught error in kafka producer I/O thread: ", e);
      }
  }
......
}

2、发送数据到缓冲区

2.1 发送总体流程

从send()方法进入

public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {
    ProducerRecord<K, V> interceptRecord = record;
    for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
        try {
            // 拦截器对数据进行加工
            interceptRecord = interceptor.onSend(interceptRecord);
          ......
    return interceptRecord;
}


    //从拦截器处理中返回,点击 doSend()方法
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
    TopicPartition tp = null;
    try {
        throwIfProducerClosed();
        // first make sure the metadata for the topic is available
        long nowMs = time.milliseconds();
        ClusterAndWaitTime clusterAndWaitTime;
        try {
            // 获取元数据
            clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), nowMs, maxBlockTimeMs);
        } catch (KafkaException e) {
            if (metadata.isClosed())
                throw new KafkaException("Producer closed while send in progress", e);
            throw e;
        }
        nowMs += clusterAndWaitTime.waitedOnMetadataMs;
        long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
        Cluster cluster = clusterAndWaitTime.cluster;
        // 序列化相关操作
        byte[] serializedKey;
        try {
            serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
        ......
        // 分区操作
        int partition = partition(record, serializedKey, serializedValue, cluster);
        tp = new TopicPartition(record.topic(), partition);

        setReadOnly(record.headers());
        Header[] headers = record.headers().toArray();

        int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
                compressionType, serializedKey, serializedValue, headers);
        // 保证数据大小能够传输(序列化后的  压缩后的)
        ensureValidRecordSize(serializedSize);

        ......
        // accumulator缓存  追加数据  result是是否添加成功的结果
        RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
                serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs);

        ......
        // 批次大小已经满了 获取有一个新批次创建
        if (result.batchIsFull || result.newBatchCreated) {
            log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
            // 唤醒发送线程
            this.sender.wakeup();
        }
        return result.future;
        ......
}

2.2 分区选择

private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
    Integer partition = record.partition();
    // 如果指定分区,按照指定分区配置
    return partition != null ?
            partition :
            partitioner.partition(
                    record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}

//点击 partition,跳转到 Partitioner 接口,选择默认的分区器 DefaultPartitioner
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster,
                     int numPartitions) {
    // 没有指定key
    if (keyBytes == null) {
        // 按照粘性分区处理
        return stickyPartitionCache.partition(topic, cluster);
    }
    // 如果指定key,按照key的hashcode值 对分区数求模
    // hash the keyBytes to choose a partition
    return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}

2.3 发送消息大小校验

private void ensureValidRecordSize(int size) {
    // 单条信息最大值 maxRequestSize 1m
    if (size > maxRequestSize)
        throw new RecordTooLargeException("The message is " + size +
                " bytes when serialized which is larger than " + maxRequestSize + ", which is the value of the " +
                ProducerConfig.MAX_REQUEST_SIZE_CONFIG + " configuration.");
    // totalMemorySize  缓存大小 默认32m
    if (size > totalMemorySize)
        throw new RecordTooLargeException("The message is " + size +
                " bytes when serialized which is larger than the total memory buffer you have configured with the " +
                ProducerConfig.BUFFER_MEMORY_CONFIG +
                " configuration.");
}

2.4 内存池

public RecordAppendResult append(TopicPartition tp,
                                 long timestamp,
                                 byte[] key,
                                 byte[] value,
                                 Header[] headers,
                                 Callback callback,
                                 long maxTimeToBlock,
                                 boolean abortOnNewBatch,
                                 long nowMs) throws InterruptedException {
    ......
    try {
        // check if we have an in-progress batch
        // 获取或者创建一个队列(按照每个主题的分区)
        Deque<ProducerBatch> dq = getOrCreateDeque(tp);
        synchronized (dq) {
            if (closed)
                throw new KafkaException("Producer closed while send in progress");
            // 尝试向队列里面添加数据(正常添加不成功)
            RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs);
            if (appendResult != null)
                return appendResult;
        }

        // we don't have an in-progress record batch try to allocate a new batch
        if (abortOnNewBatch) {
            // Return a result that will cause another call to append.
            return new RecordAppendResult(null, false, false, true);
        }

        byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
        // this.batchSize 默认16k    数据大小17k
        int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
        log.trace("Allocating a new {} byte message buffer for topic {} partition {} with remaining timeout {}ms", size, tp.topic(), tp.partition(), maxTimeToBlock);
        // 申请内存  内存池分配内存  双端队列
        buffer = free.allocate(size, maxTimeToBlock);

        // Update the current time in case the buffer allocation blocked above.
        nowMs = time.milliseconds();
        synchronized (dq) {
            // Need to check if producer is closed again after grabbing the dequeue lock.
            if (closed)
                throw new KafkaException("Producer closed while send in progress");

            RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs);
            if (appendResult != null) {
                // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
                return appendResult;
            }
            // 封装内存buffer
            MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
            // 再次封装(得到真正的批次大小)
            ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, nowMs);
            FutureRecordMetadata future = Objects.requireNonNull(batch.tryAppend(timestamp, key, value, headers,
                    callback, nowMs));
            // 向队列的末尾添加批次
            dq.addLast(batch);
            incomplete.add(batch);

            // Don't deallocate this buffer in the finally block as it's being used in the record batch
            buffer = null;
            return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true, false);
        }
    } finally {
        if (buffer != null)
            free.deallocate(buffer);
        appendsInProgress.decrementAndGet();
    }
}

3、sender 线程发送数据

void runOnce() {
    // 事务相关操作
    if (transactionManager != null) {
        try {
            transactionManager.maybeResolveSequences();

            // do not continue sending if the transaction manager is in a failed state
            if (transactionManager.hasFatalError()) {
                RuntimeException lastError = transactionManager.lastError();
                if (lastError != null)
                    maybeAbortBatches(lastError);
                client.poll(retryBackoffMs, time.milliseconds());
                return;
            }

            // Check whether we need a new producerId. If so, we will enqueue an InitProducerId
            // request which will be sent below
            transactionManager.bumpIdempotentEpochAndResetIdIfNeeded();

            if (maybeSendAndPollTransactionalRequest()) {
                return;
            }
        } catch (AuthenticationException e) {
            // This is already logged as error, but propagated here to perform any clean ups.
            log.trace("Authentication exception while processing transactional request", e);
            transactionManager.authenticationFailed(e);
        }
    }

    long currentTimeMs = time.milliseconds();
    // 发送数据
    long pollTimeout = sendProducerData(currentTimeMs);
    // 获取发送结果
    client.poll(pollTimeout, currentTimeMs);
}


private long sendProducerData(long now) {
    // 获取元数据
    Cluster cluster = metadata.fetch();
    // get the list of partitions with data ready to send
    // 1、判断32m缓存是否准备好
    RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
    // 如果 Leader 信息不知道,是不能发送数据的
    // if there are any partitions whose leaders are not known yet, force metadata update
    if (!result.unknownLeaderTopics.isEmpty()) {
        // The set of topics with unknown leader contains topics with leader election pending as well as
        // topics which may have expired. Add the topic again to metadata to ensure it is included
        // and request metadata update, since there are messages to send to the topic.
        for (String topic : result.unknownLeaderTopics)
            this.metadata.add(topic, now);

        log.debug("Requesting metadata update due to unknown leader topics from the batched records: {}",
            result.unknownLeaderTopics);
        this.metadata.requestUpdate();
    }

    ......

    // create produce requests
    // 发送每个节点数据,进行封装,这样一个分区的就可以打包一起发送
    Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);
    addToInflightBatches(batches);
    ......

    // 发送请求
    sendProduceRequests(batches, now);
    return pollTimeout;
}


// 是否准备发送
public ReadyCheckResult ready(Cluster cluster, long nowMs) {
    Set<Node> readyNodes = new HashSet<>();
    long nextReadyCheckDelayMs = Long.MAX_VALUE;
    Set<String> unknownLeaderTopics = new HashSet<>();

    boolean exhausted = this.free.queued() > 0;
    for (Map.Entry<TopicPartition, Deque<ProducerBatch>> entry : this.batches.entrySet()) {
        Deque<ProducerBatch> deque = entry.getValue();
        synchronized (deque) {
            // When producing to a large number of partitions, this path is hot and deques are often empty.
            // We check whether a batch exists first to avoid the more expensive checks whenever possible.
            ProducerBatch batch = deque.peekFirst();
            if (batch != null) {
                TopicPartition part = entry.getKey();
                Node leader = cluster.leaderFor(part);
                if (leader == null) {
                    // This is a partition for which leader is not known, but messages are available to send.
                    // Note that entries are currently not removed from batches when deque is empty.
                    unknownLeaderTopics.add(part.topic());
                } else if (!readyNodes.contains(leader) && !isMuted(part)) {
                    long waitedTimeMs = batch.waitedTimeMs(nowMs);
                    // 如果不是第一次拉取,  且等待时间小于重试时间 默认100ms ,backingOff=true
                    boolean backingOff = batch.attempts() > 0 && waitedTimeMs < retryBackoffMs;
                    // 如果backingOff是true 取retryBackoffMs; 如果不是第一次拉取取lingerMs,默认0
                    long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
                    // 批次大小满足发送条件
                    boolean full = deque.size() > 1 || batch.isFull();
                    // 如果超时,也要发送
                    boolean expired = waitedTimeMs >= timeToWaitMs;
                    boolean transactionCompleting = transactionManager != null && transactionManager.isCompleting();
                    // full linger.ms
                    boolean sendable = full
                        || expired
                        || exhausted
                        || closed
                        || flushInProgress()
                        || transactionCompleting;
                    if (sendable && !backingOff) {
                        readyNodes.add(leader);
                    } else {
                        long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
                        // Note that this results in a conservative estimate since an un-sendable partition may have
                        // a leader that will later be found to have sendable data. However, this is good enough
                        // since we'll just wake up and then sleep again for the remaining time.
                        nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
                    }
                }
            }
        }
    }
    return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics);
}

二、消费者源码

1、初始化

点击 main()方法中的 KafkaConsumer ()

KafkaConsumer(ConsumerConfig config, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
    try {
        // 消费组平衡
        GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig(config,
                GroupRebalanceConfig.ProtocolType.CONSUMER);
        // 获取消费者组id
        this.groupId = Optional.ofNullable(groupRebalanceConfig.groupId);
        // 客户端id
        this.clientId = config.getString(CommonClientConfigs.CLIENT_ID_CONFIG);

        ......
        
        // 客户端请求服务端等待时间request.timeout.ms 默认是30s
        this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
        this.defaultApiTimeoutMs = config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);
        this.time = Time.SYSTEM;
        this.metrics = buildMetrics(config, time, clientId);
        // 重试时间 100
        this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);

        // 拦截器
        List<ConsumerInterceptor<K, V>> interceptorList = (List) config.getConfiguredInstances(
                ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
                ConsumerInterceptor.class,
                Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId));
        this.interceptors = new ConsumerInterceptors<>(interceptorList);
        // key和value 的反序列化
        if (keyDeserializer == null) {
            this.keyDeserializer = config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
            this.keyDeserializer.configure(config.originals(Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId)), true);
        } else {
            config.ignore(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
            this.keyDeserializer = keyDeserializer;
        }
        if (valueDeserializer == null) {
            this.valueDeserializer = config.getConfiguredInstance(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
            this.valueDeserializer.configure(config.originals(Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId)), false);
        } else {
            config.ignore(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
            this.valueDeserializer = valueDeserializer;
        }
        // offset从什么位置开始消费 默认,latest
        OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.valueOf(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase(Locale.ROOT));
        this.subscriptions = new SubscriptionState(logContext, offsetResetStrategy);
        ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keyDeserializer,
                valueDeserializer, metrics.reporters(), interceptorList);
        // 元数据
        // retryBackoffMs 重试时间
        // 是否允许访问系统主题 exclude.internal.topics  默认是true,表示不允许
        // 是否允许自动创建topic  allow.auto.create.topics 默认是true
        this.metadata = new ConsumerMetadata(retryBackoffMs,
                config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG),
                !config.getBoolean(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG),
                config.getBoolean(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG),
                subscriptions, logContext, clusterResourceListeners);
        // 连接kafka集群
        List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(
                config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG), config.getString(ConsumerConfig.CLIENT_DNS_LOOKUP_CONFIG));
        this.metadata.bootstrap(addresses);
        String metricGrpPrefix = "consumer";

        FetcherMetricsRegistry metricsRegistry = new FetcherMetricsRegistry(Collections.singleton(CLIENT_ID_METRIC_TAG), metricGrpPrefix);
        ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config, time, logContext);
        this.isolationLevel = IsolationLevel.valueOf(
                config.getString(ConsumerConfig.ISOLATION_LEVEL_CONFIG).toUpperCase(Locale.ROOT));
        Sensor throttleTimeSensor = Fetcher.throttleTimeSensor(metrics, metricsRegistry);
        int heartbeatIntervalMs = config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG);

        ApiVersions apiVersions = new ApiVersions();
        // 创建客户端对象
        // 连接重试时间 默认50ms
        // 最大连接重试时间 默认1s
        // 发送缓存 默认128kb
        // 接收缓存  默认64kb
        // 客户端请求服务端等待时间request.timeout.ms 默认是30s
        NetworkClient netClient = new NetworkClient(
                new Selector(config.getLong(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, channelBuilder, logContext),
                this.metadata,
                clientId,
                100, // a fixed large enough value will suffice for max in-flight requests
                config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG),
                config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG),
                config.getInt(ConsumerConfig.SEND_BUFFER_CONFIG),
                config.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG),
                config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG),
                config.getLong(ConsumerConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG),
                config.getLong(ConsumerConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG),
                time,
                true,
                apiVersions,
                throttleTimeSensor,
                logContext);
        // 消费者客户端
        // 客户端请求服务端等待时间request.timeout.ms 默认是30s
        this.client = new ConsumerNetworkClient(
                logContext,
                netClient,
                metadata,
                time,
                retryBackoffMs,
                config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG),
                heartbeatIntervalMs); //Will avoid blocking an extended period of time to prevent heartbeat thread starvation
        // 消费者分区分配策略
        this.assignors = ConsumerPartitionAssignor.getAssignorInstances(
                config.getList(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG),
                config.originals(Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId))
        );

        // no coordinator will be constructed for the default (null) group id
        //  为消费者组准备的
        // auto.commit.interval.ms  自动提交offset时间 默认5s
        this.coordinator = !groupId.isPresent() ? null :
            new ConsumerCoordinator(groupRebalanceConfig,
                    logContext,
                    this.client,
                    assignors,
                    this.metadata,
                    this.subscriptions,
                    metrics,
                    metricGrpPrefix,
                    this.time,
                    enableAutoCommit,
                    config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG),
                    this.interceptors,
                    config.getBoolean(ConsumerConfig.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED));
        // 配置抓数据的参数
        // fetch.min.bytes 默认最少一次抓取1个字节
        // fetch.max.bytes 默认最多一次抓取50m
        // fetch.max.wait.ms 抓取等待最大时间 500ms
        // max.partition.fetch.bytes 默认是1m
        // max.poll.records  默认一次处理500条
        this.fetcher = new Fetcher<>(
                logContext,
                this.client,
                config.getInt(ConsumerConfig.FETCH_MIN_BYTES_CONFIG),
                config.getInt(ConsumerConfig.FETCH_MAX_BYTES_CONFIG),
                config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG),
                config.getInt(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG),
                config.getInt(ConsumerConfig.MAX_POLL_RECORDS_CONFIG),
                config.getBoolean(ConsumerConfig.CHECK_CRCS_CONFIG),
                config.getString(ConsumerConfig.CLIENT_RACK_CONFIG),
                this.keyDeserializer,
                this.valueDeserializer,
                this.metadata,
                this.subscriptions,
                metrics,
                metricsRegistry,
                this.time,
                this.retryBackoffMs,
                this.requestTimeoutMs,
                isolationLevel,
                apiVersions);

        this.kafkaConsumerMetrics = new KafkaConsumerMetrics(metrics, metricGrpPrefix);

        config.logUnused();
        AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics, time.milliseconds());
        log.debug("Kafka consumer initialized");
    } catch (Throwable t) {
        // call close methods if internal objects are already constructed; this is to prevent resource leak. see KAFKA-2121
        // we do not need to call `close` at all when `log` is null, which means no internal objects were initialized.
        if (this.log != null) {
            close(0, true);
        }
        // now propagate the exception
        throw new KafkaException("Failed to construct kafka consumer", t);
    }
}

2、消费者订阅主题

点击自己编写的 CustomConsumer.java 中的 subscribe ()方法

public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {
    acquireAndEnsureOpen();
    try {
        maybeThrowInvalidGroupIdException();
        // 要订阅的主题如果为null ,直接抛异常
        if (topics == null)
            throw new IllegalArgumentException("Topic collection to subscribe to cannot be null");
        // 要订阅的主题如果为空
        if (topics.isEmpty()) {
            // treat subscribing to empty topic list as the same as unsubscribing
            this.unsubscribe();
        } else {
            // 正常的处理操作
            for (String topic : topics) {
                // 如果为空  抛异常
                if (Utils.isBlank(topic))
                    throw new IllegalArgumentException("Topic collection to subscribe to cannot contain null or empty topic");
            }

            throwIfNoAssignorsConfigured();
            fetcher.clearBufferedDataForUnassignedTopics(topics);
            log.info("Subscribed to topic(s): {}", Utils.join(topics, ", "));
            //  订阅主题(判断你是否需要更新订阅的主题;  主题了一个监听器listener)
            if (this.subscriptions.subscribe(new HashSet<>(topics), listener))
                // 更新订阅信息
                metadata.requestUpdateForNewTopics();
        }
    } finally {
        release();
    }
}

public synchronized boolean subscribe(Set<String> topics, ConsumerRebalanceListener listener) {
    // 注册负载均衡监听器
    registerRebalanceListener(listener);
    // 按照主题自动订阅模式
    setSubscriptionType(SubscriptionType.AUTO_TOPICS);
    // 判断是否需要更改订阅的主题
    return changeSubscription(topics);
}


private boolean changeSubscription(Set<String> topicsToSubscribe) {
    // 如果传入的topics 和以前订阅的主题一致,那就不需要更改对应订阅的主题
    if (subscription.equals(topicsToSubscribe))
        return false;

    subscription = topicsToSubscribe;
    return true;
}

3、消费者拉取和处理数据

3.1 消费总体流程

点击自己编写的 CustomConsumer.java 中的 poll ()方法

private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout) {
  acquireAndEnsureOpen();
  try {
      this.kafkaConsumerMetrics.recordPollStart(timer.currentTimeMs());

      if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) {
          throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");
      }

      do {
          client.maybeTriggerWakeup();

          if (includeMetadataInTimeout) {
              // 1、消费者或者消费者组的初始化
              // try to update assignment metadata BUT do not need to block on the timer for join group
              updateAssignmentMetadataIfNeeded(timer, false);
          } else {
              while (!updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE), true)) {
                  log.warn("Still waiting for metadata");
              }
          }
          // 2 抓取数据
          final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(timer);
          if (!records.isEmpty()) {
              // before returning the fetched records, we can send off the next round of fetches
              // and avoid block waiting for their responses to enable pipelining while the user
              // is handling the fetched records.
              //
              // NOTE: since the consumed position has already been updated, we must not allow
              // wakeups or any other errors to be triggered prior to returning the fetched records.
              if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) {
                  client.transmitSends();
              }
              // 3 拦截器处理数据
              return this.interceptors.onConsume(new ConsumerRecords<>(records));
          }
      } while (timer.notExpired());

      return ConsumerRecords.empty();
  } finally {
      release();
      this.kafkaConsumerMetrics.recordPollEnd(timer.currentTimeMs());
  }
}

3.2 消费者/消费者组初始化

public boolean poll(Timer timer, boolean waitForJoinGroup) {
    maybeUpdateSubscriptionMetadata();

    invokeCompletedOffsetCommitCallbacks();

    if (subscriptions.hasAutoAssignedPartitions()) {
        // 如果没有指定分区分配策略  直接抛异常
        if (protocol == null) {
            throw new IllegalStateException("User configured " + ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG +
                " to empty while trying to subscribe for group protocol to auto assign partitions");
        }
        // Always update the heartbeat last poll time so that the heartbeat thread does not leave the
        // group proactively due to application inactivity even if (say) the coordinator cannot be found.
        // 3s心跳
        pollHeartbeat(timer.currentTimeMs());
        // 判断Coordinator 是否准备好了
        if (coordinatorUnknown() && !ensureCoordinatorReady(timer)) {
            return false;
        }

        ......

    maybeAutoCommitOffsetsAsync(timer.currentTimeMs());
    return true;
}

3.3 拉取数据

private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollForFetches(Timer timer) {
    long pollTimeout = coordinator == null ? timer.remainingMs() :
            Math.min(coordinator.timeToNextPoll(timer.currentTimeMs()), timer.remainingMs());

    // if data is available already, return it immediately
    // 第一次拉取不到数据
    final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
    if (!records.isEmpty()) {
        return records;
    }

    // send any new fetches (won't resend pending fetches)
    // 开始拉取数据,里面放了一个监听函数,
    fetcher.sendFetches();

    // We do not want to be stuck blocking in poll if we are missing some positions
    // since the offset lookup may be backing off after a failure

    // NOTE: the use of cachedSubscriptionHashAllFetchPositions means we MUST call
    // updateAssignmentMetadataIfNeeded before this method.
    if (!cachedSubscriptionHashAllFetchPositions && pollTimeout > retryBackoffMs) {
        pollTimeout = retryBackoffMs;
    }

    log.trace("Polling for fetches with timeout {}", pollTimeout);

    Timer pollTimer = time.timer(pollTimeout);
    client.poll(pollTimer, () -> {
        // since a fetch might be completed by the background thread, we need this poll condition
        // to ensure that we do not block unnecessarily in poll()
        return !fetcher.hasAvailableFetches();
    });
    timer.update(pollTimer.currentTimeMs());

    return fetcher.fetchedRecords();
}

// 首先抓取数据为空,然后发送请求监听并将数据放入队列,最后再抓取数据,拦截器处理数据

3.4 消费者 Offset 提交

三、服务端源码

生产者消费者源码使用java编写,而服务端源码使用scala编写

程序入口在core→src→main→scala→Kafka→kafka.scala

def main(args: Array[String]): Unit = {
  try {
    // 获取相关参数
    val serverProps = getPropsFromArgs(args)
    // 创建服务
    val server = buildServer(serverProps)

    try {
      if (!OperatingSystem.IS_WINDOWS && !Java.isIbmJdk)
        new LoggingSignalHandler().register()
    } catch {
      case e: ReflectiveOperationException =>
        warn("Failed to register optional signal handler that logs a message when the process is terminated " +
          s"by a signal. Reason for registration failure is: $e", e)
    }

    // attach shutdown handler to catch terminating signals as well as normal termination
    Exit.addShutdownHook("kafka-shutdown-hook", {
      try server.shutdown()
      catch {
        case _: Throwable =>
          fatal("Halting Kafka.")
          // Calling exit() can lead to deadlock as exit() can be called multiple times. Force exit.
          Exit.halt(1)
      }
    })

    // 启动服务
    try server.startup()
    catch {
      case _: Throwable =>
        // KafkaServer.startup() calls shutdown() in case of exceptions, so we invoke `exit` to set the status code
        fatal("Exiting Kafka.")
        Exit.exit(1)
    }

    server.awaitShutdown()
  }
  catch {
    case e: Throwable =>
      fatal("Exiting Kafka due to fatal exception", e)
      Exit.exit(1)
  }
  Exit.exit(0)
}

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
THE END
分享
二维码
< <上一篇

)">
下一篇>>