Inside Kafka Technology (3) Consumers: High-level API and low-level API consumer rebalancing operations

Inside Kafka Technology (3) Consumers: High-level API and low-level API consumer rebalancing operations

Welcome everyone to pay attention to I hope it will be helpful to you. If you think it is possible, please give me some Star

Transfer from:

Consumer rebalancing operations

The core processing logic of the consumer connector is the rebalancing operation, which serves as a link between the previous and the next. Initializing the consumer connector just "creates the queue and message flow", and the rebalancing operation "reallocates the partition for the consumer".
Only when a partition is allocated to the consumer, the pull thread will start to pull messages from the partition. Because the partition is to be reassigned, the owner of the partition will change, so before the partition is reassigned,
all consumers must stop the existing process of pulling money.
All partitions assigned to consumers will record owner information in ZK, so the node data on ZK must be deleted first. Only when the ZK owner and pull thread related to the partition are released, the partition can be allocated.
The steps of the rebalancing operation are as follows.
(1) Close the data pull thread, clear the queue and message flow, and submit the offset.
(2) Release the ownership of the partition and delete the owner relationship between the partition and the consumer in ZK.
(3) Reallocate all partitions to each consumer, and each consumer will be assigned to a different partition.
(4) Write the consumer owner relationship corresponding to the partition into ZK, and record the ownership information of the partition.
(5) Restart the consumer's pull thread manager to manage the pull thread of each partition.
The sequence of closing and opening the pull thread and partition ownership is : stop the pull process release the ownership of the partition add the ownership of the partition start the pull thread **. **

Ownership of the partition The ownership of the
partition is recorded on the node of ZK, which means that the "topic-partition" will be consumed by the designated consumer thread, or that the partition is assigned to the consumer and the consumer owns
the partition. To release the ownership of the partition, you only need to delete the ZK node corresponding to the partition; to rebuild the ownership of the partition, in addition to the partition, the data source also has a consumer thread number.
** ZK in mind not only recorded consumers and partition ownership mapping relationship, and record the consumption of consumer groups who **** list partition list of topics, information distribution ** These partitions provide a source of data for consumers.

Allocate partitions to consumers.
Each consumer needs to be assigned to a partition to pull messages. When rebalancing occurs, consumers will re-allocate partitions. In order for each consumer to be assigned to a partition,
all partitions and all consumer member lists need to be queried from ZK. Partitions should limit the scope of topics, and consumers should limit the scope of consumer groups . For the consumer that triggers rebalancing,
the consumer group it belongs to is determined, and the subject and partition subscribed are also determined. Therefore, the list of consumer members subscribing to the same topic and the partition containing the same topic are not obtained from ZK. problem.
As shown in the figure below, consumer 1 subscribes to topic 1 and topic 2, consumer 2 subscribes to topic 1 and topic 3, and consumer 3 subscribes to topic 2 and topic 3. When the consumer is rebalanced,
because consumer 1 subscribes to topic 1 and topic 2, and the subscribers of topic 1 and topic 2 are consumer 1, consumer 2, consumer 3, so consumer 2 and consumer 3 Rebalancing will happen together.

In this example, we did not explain the reason for the rebalancing operation of consumer 1. It may be that the session of consumer 1 has timed out, or consumer 1 has just joined the consumer group, or the topic subscribed by consumer 1 (topic 1 and topic 2)
The partition has changed. It is also possible that other consumers have rebalanced, causing consumers to also need to perform rebalancing.
Assign all partitions to all consumers of the algorithm: the number of partitions divided by the number of threads, the table shows the average each consumer thread can be assigned to several partitions . If not divide as **** , left more than the will in turn give consumers the first few threads .
As shown in the figure below, there are 2 consumers, each consumer has 2 threads, and a total of 5 available partitions. Each consumer thread (4 threads in total) can get at least 1 partition (5%4=1), and the
remaining 1 partition is allocated to the first thread. Finally, the result of partition allocation to each consumer is: P0 C1_0, P1 C1_0, P2 C1_ 1, P3 C2_0, P3 C2_1.

The basic condition of the rebalancing operation is to assign the current consumer to the partition, so that the pull thread can know where to pull the message. The consumption progress of the partition is stored in ZK,
so ZK must also be read to obtain the latest offset. Only when all these tasks are prepared can the pull thread begin to work.

The rebalance() method, in addition to the previously analyzed ownership release and the closing and updating of adding and pulling money processes, the remaining steps related to partition allocation are as follows.
(1) Construct the distribution context of the consumer to obtain the subscribed topic partition and all consumer thread information.
(2) The partition allocation algorithm calculates the mapping relationship between each consumer's partition and consumer thread.
(3) Obtain the partition and consumer thread belonging to the current consumer from the global result of step (2).
(4) Read the latest consumption progress of the partition owned by the current consumer in ZK, that is, the offset of the partition owned by it.
(5) Construct PartitionToptcinfo and add it to topicRegisty that represents the consumer's topic registration information.
(6) Update topicRegistry, and subsequent pull threads will use this data structure.

Create partition information object
The offset of the partition read from ZK will be used to construct the partition information object (PartitionTopicinfo). The main content of the partition information object is: partition , which represents the "target" of the pull thread;
queue , as the "storage" medium of the message; offset value , as the pull "state". The consumer's pull thread will pull the "target" data with the latest "state" and fill it into the "storage" queue.

ZK's offsetCouter is the last consumption offset of this partition and the latest pull offset. When a consumer initiates a pull data request to the server, the pull offset (fetchOffset)
indicates where to start the pull. After the consumer pulls the message from the server and writes it locally, the consumption offset (consumedOffset) indicates where it was consumed.

The following figure summarizes the process of queue creation, data filling, and data consumption. The specific steps are as follows.
(1) The connector generates a mapping between the queue and the message flow based on the subscription information, and the queue is also passed to the message flow.
(2) When assigning partitions to consumers, the latest location consumed by the partitions will be read from ZK.
(3) Create partition information based on the offset, and the queue will also be passed to the partition information object.
(4) The partition information is used in the consumer's pull thread.
(5) The pull thread pulls messages from the partition of the server.
(6) After the consumer pulls the message, the latest offset will be updated to ZK.
(7) The pull thread fills the pulled message into the queue.
(8) The message flow can get messages from the queue.
(9) The application iteratively obtains messages from the message flow.

Partition information is related to queues, so it is also related to the thread model of the consumer client: a consumer thread can consume multiple partitions, and a consumer thread corresponds to a queue,
so a queue can store data from multiple partitions . That is, for different partitions, the same queue may be used to store messages pulled by consumers.
For example, if the consumer sets up one thread, there is only one queue, and the partition is divided into two, so that one queue will process two partitions. The following figure (top) is the data source
route of the queue in the partition information, and the figure (bottom) shows the relationship between the partition information and the client thread model.

The topicRegistry structure is a two-layer nested dictionary: topic (partition partition information). TopicRegistry represents all partition information allocated to the current consumer, and will be provided to the pull thread.
The partition information is generated on the ZKRebalanceListener side and transmitted to the pull thread to be actually used. Note: Pull threads and partitions are not directly related, but are related through
the consumer pull thread manager responsible for managing all pull threads.

Closing and updating the pull thread manager In the
rebalancing operation, we have analyzed the ownership of the partition and the allocation of the partition. The rest is related to the consumer FetcherThread: Close and update the consumer pull thread
manager ( ConsumerFetcherManager, hereinafter referred to as "pull manager"). Before the rebalancing operation, when the closeFetchersForQueues() method closes the pull manager, it
also closes all threads it manages.
In addition to the pull thread should be closed, the data structure related to the pull thread also needs to be cleaned up, for example, the queue of partition information objects needs to be cleared. In addition, consumers will periodically
submit the offset to ZK when pulling data, and submit the offset of all partitions once when the pull manager is closed.

After the rebalancing operation, the consumer is reassigned to the partition, and can start the pull process through the pull manager to pull the partition message. The updateFetcher() method will update
the partition information data managed by the pull manager , where the data of the allPartitioninfos variable comes from the topicRegistry during the rebalancing operation.

The offset of the partition information object
Let's take a look at how the offset of the partition information object is used in the process of pulling money. When the consumer's pull thread pulls a message for the first time, it
will read the fetchedOffset from the ZK to determine where in the partition to start pulling the message. After the consumer reads the message, it will update the consumedOffset of the partition.
At the same time, consumers will also use consumedOffset as the consumption progress of the partition and submit it to ZK regularly.

The offset disk of the partition information object plays an important role in the pull thread. The specific steps are as follows.
(1) Submit the consumedOffset offset to ZK when closing the pull thread.
(2) Read the offset in ZK when restarting the pull thread.
(3) Use the offset of ZK as the fetchedOffset at the beginning.
(4) After the client reads the message, it will update the consumedOffset.
(5) After that, the fetchedOffset used for each pull comes to the latest consumedOffset.
(6) The client process regularly submits the offset and is similar to (1), but also takes the consumedOff set and writes it into the ZK.

Summarize the main work of the consumer client using the consumer connector, the specific steps are as follows.
(1) Create a queue and a message flow, the former is used to save the messages pulled by the consumer, and the latter will read the messages.
(2) Register listeners for various events. When an event occurs, all consumer members of the consumer group will rebalance.
(3) Rebalancing will redistribute partitions for consumers, and construct partition information to join topicRegistry.
(4) The pull thread obtains all the partition information allocated to consumers in the topicRegistry and starts to work.