Detailed explanation of important concepts of kafka and key configuration of cluster

Key concepts

broker

A broker is an instance of kafka, which is responsible for receiving, forwarding and storing messages. kafka cluster is composed of multiple brokers.

topic

kafka's topic is a logical concept, which is to group and classify messages, so as to distinguish and process messages of different business logic. The index concepts in topic and elastic search are similar.

partition

kafka's partition is a physical concept, which corresponds to a folder in the file system. Partition is for topic. It is mainly to consider the situation that there is a lot of topic data. By splitting the topic data into partitions, parallel processing can be performed and the concurrent amount can be increased.

The partition in kafka is similar to shard in elastic search, which is a physical split of data with unified logical classification.

segment

kafka's partition corresponds to a folder. If you think about it a little bit, you will find that the message store should be a file. What is the name of kafka's file for storing messages?

The answer is segment, which is translated into segments in many places.

Segment is a further physical split of kafka's topic. Through the reasonable configuration of segment size according to the actual situation of the machine, combined with kafka's own index mechanism, the read and write operations can be performed faster.

offset

Offset is the message offset, which is the number of messages, not bytes

replica

Replica is replica. Basically all distributed middleware has the concept of replica

kafka's copy is for partition, not for topic

In kafka cluster, the availability is improved by distributing different copies of partition on different brokers. One broker is hung, and other copies are available.

Replicas have two important attributes: LEO and HW

Log End Offset(LEO): offset of the next message in the log High Watermark(HW): the smallest LEO of all replicas

Why is the smallest LEO in all replicas called high water level (HW)?

This is mainly because Kafka does not allow consumers to consume more LEO messages than the smallest in all replicas, so it is called high water level (HW)

This is mainly for data inconsistency. For example, the LEO in the Leader is relatively large, and then it hangs. Other replicas become leaders

producer

Message producer, a service that publishes messages to kafka cluster

consumer

Message consumers, consuming message services from kafka cluster

Consumer group

Consumer group is a concept in the high-level consumer API. Each consumer belongs to a consumer group. Each message can only be consumed by one consumer of the consumer group, but can be consumed by multiple consumer groups.

By setting the Consumer group, a message can be consumed by different groups, which is very practical. For example, a login message may be required for both data statistics and activity businesses. If you only need to set different consumer groups, you can consume the same login message.

leader and follower

There are two roles in the replica: leader and follower.

There is only one leader in the same replica, and the others are follower s.

producer and consumer only interact with leader, and then leader and follower interact.

For example, producer sends a message to the leader, and the leader forwards the message to the follower. Different responses will be performed according to the ack configuration of producer, which will be described in detail later.

controller

The controller is for the broker. The broker in the kafka cluster will elect a leader to control the leader election, failover and other operations of the partition. The leader elected by the broker is the controller.

The broker election depends on Zookeeper. The broker nodes go to Zookeeper to register a temporary node together, because only one broker will register successfully, and others will fail. The broker who successfully registers a temporary node on Zookeeper will become the controller, and other brokers are called broker followers.

The controller will listen to all the information of other brokers. If the controller goes down, the temporary node on zookeeper will disappear. At this time, all the brokers will go to zookeeper to register a temporary node together. Because only one broker will register successfully, and all of them will fail, so the broker that successfully registers the temporary node on zookeeper will become For the new controller.

Once a broker goes down, the controller will read the status of all partitions on the broker in zookeeper, and select a replica in the ISR list as the partition leader.

If all replicas in the ISR list are linked, select a surviving replica as the leader; If all replicas of the partition are down, set the new leader to - 1, wait for recovery, wait for any replica in ISR to recover, and select it as the leader; or select the first surviving replica, which is not necessarily the leader in ISR.

When the broker goes down, the controller will also notify zookeeper, and zookeeper will notify other brokers.

broker's cleft brain problem: After the controller has successfully registered with Zookeeper, the default timeout value of its communication with Zookeeper is 6s. That is to say, if the controller does not heartbeat with Zookeeper in 6s, Zookeeper thinks that the controller is dead.

This temporary node will be deleted on Zookeeper, then other brokers will think that the controller is gone, and they will rush to register the temporary node again, and the successful broker will become the controller.

Then, the previous controller needs all kinds of shut down to turn off the listening of all kinds of nodes and events. However, when the read and write traffic of Kafka is very large, the message coming from the producer cannot land because there are two controllers in the Kafka cluster, resulting in data siltation.

coordinator

Group Coordinator is a service, and each Broker will start one when it starts.

The function of Group Coordinator is to store Group related Meta information and record the Offset information of corresponding Partition to Kafka's__ consumer_offsets in this topic.

Before 0.9, Kafka stores the Offset information of Partition based on Zookeeper (consumers/{group}/offsets/{topic}/{partition}). Because ZK is not suitable for frequent write operations, it records the Offset of Partition through built-in Topic after 0.9.

Important configuration of kafka

boker related

#Unique identity broker in cluster, non negative
broker.id=1

#broker server service port
port=9091

#The storage address of kafka data. If there are multiple addresses, use comma to separate D:\data11, D:\data12
log.dirs=D:\kafkas\datas\data1

#The address of ZK cluster can be multiple. The multiple addresses are separated by commas. hostname1:port1,hostname2:port2
zookeeper.connect=localhost:2181

#ZK connection timeout
zookeeper.connection.timeout.ms=6000

#ZK session timeout
zookeeper.session.timeout.ms=6000

#The index file size limit of segment log will be overwritten by the specified parameters when topic is created
log.index.size.max.bytes =10*1024*1024

#When the segment size reaches the specified size, a new segment file will be created, which will be overwritten by the specified parameters at the time of topic creation
log.segment.bytes =1024*1024*1024

# Maximum size of message body accepted by broker
message.max.bytes =	1000012

#Maximum number of threads broker can process messages
num.network.threads=3

#Number of threads broker processes disk IO
num.io.threads=8

#Send buffer of socket
socket.send.buffer.bytes=102400
#socket accept buffer
socket.receive.buffer.bytes=102400
#The maximum number of socket requests, message.max.bytes Must be less than socket.request.max.bytes
socket.request.max.bytes=104857600

#The default number of partitions for topic will be overridden by the specified parameters when topic is created
num.partitions=1

#partition replica quantity configuration. By default, 1 means there is no replica. 2 means there is a follower besides the leader
default.replication.factor =1

#Allow to create topic automatically. If it is false, you need to create topic manually
auto.create.topics.enable =true

Producer (producer related)

# 0 no matter whether the message is written successfully or not, 1 only needs the leader to write the message successfully, and all needs the leader and the follower in ISR to write successfully
acks = 1

#Sets the size of the producer's memory buffer, which the producer uses to buffer messages to be sent to the server.
#If the application is sending messages faster than it is sending to the server, it will result in insufficient producer space. At this point, the send() method call is either blocked or an exception is thrown
buffer.memory = 10240

# When buffer.memory Insufficient. How long does the block take to throw an exception
max.block.ms = 3000

# The default message is not compressed when it is sent. Can be set to snappy, gzip, lz4
compression.type = snappy

# retry count
retries = 0

# Retry interval
retry.backoff.ms = 100

# The size of each batch sent to the same partition, 16384 by default
batch.size = 10240

# batch.size Message generation is faster than sending
# linger.ms It can be controlled to wait n milliseconds for sending, so as to achieve the purpose of mass sending
linger.ms = 0

# Control the request size sent by the producer every time, 1M by default
max.request.size = 	1048576

# Specifies how many messages a producer can send before receiving a response from the server
max.in.flight.requests.per.connection = 1

# tcp buffer size
receive.buffer.bytes = 4096
send.buffer.bytes = 4096

snappy compression algorithm takes less CPU and has better performance and compression ratio gzip compression algorithm takes more CPU, but it will provide higher compression ratio

max.in.flight.requests.per.connection Causes message order problems if: retries > 0&& max.in.flight.requests.per.connection >1: If the first batch message fails to be written and the second batch is written successfully, the first batch will be written again. If the first batch is also written successfully at this time, the order of the two batches will be reversed.

max.in.flight.requests.per.connection=1 , even if a retry occurs, you can ensure that the messages are written in the order they were sent.

Consumer (consumer related)

# broker server list
bootstrap.servers=localhost:9092,localhost:9093,localhost:9094

# Maximum number of poll s per consumer
max.poll.records = 500

# true to commit the offset automatically
enable.auto.commit = true

# Auto commit offset period (time interval)
auto.commit.interval.ms = 5000

# If the consumer does not respond to the Coordinator's heartbeat detection within the configuration time, it is considered that the consumer is suspended. rebalance
session.timeout.ms = 10000

# Heartbeat detection cycle of Coordinator
heartbeat.interval.ms = 2000

# What to do when there is no initial offset? Default latest
# Earliest: automatically reset to the earliest offset
# latest: automatically reset to the last offset
# none: if there is no previous offset in the consumer group, throw an exception
auto.offset.reset=latest

# The minimum number of bytes pulled at a time, 1 byte by default
fetch.min.bytes=1

# How many bytes of data can be pulled at most at a time? Default: 50M
fetch.max.bytes=52428800

# How many milliseconds can I wait for a pull? The default is 500
fetch.max.wait.ms=500

Replica (replica related)

#leader waits for the most frequent time for the follower. After that, remove the follower from ISR (in sync replicas)
replica.lag.time.max.ms =10000

#How many messages does the follower lag behind the leader? Migrate this replicas to other followers. It is recommended to increase this value in an environment with a small number of broker s or insufficient network
replica.lag.max.messages =4000

#socket timeout between follower and leader
replica.socket.timeout.ms=30*1000

#socket cache size of leader replication
replica.socket.receive.buffer.bytes=64*1024

#The maximum size of data obtained by replicas each time
replica.fetch.max.bytes =1024*1024

#The maximum waiting time for communication between replicas and leader. If it fails, it will be retried
replica.fetch.wait.max.ms =500

#The minimum data size of the fetch. If the data in the leader that has not been synchronized is less than this value, it will block until the condition is met
replica.fetch.min.bytes =1

#The number of threads copied by the leader. Increasing this number will increase the IO of the follower
num.replica.fetchers=1

Log (log related)

#segment file size, which will be overwritten by the specified parameters when topic is created
log.segment.bytes =1024*1024*1024

#Segment scrolling time, not reached log.segment.bytes It also forces you to create a new segment and overwrite the topic parameter
log.roll.hours =24*7

#The log cleaning strategies are: delete and compact are mainly used for processing expired data
log.cleanup.policy = delete

#If the maximum time of data storage exceeds this time, it will be based on log.cleanup.policy Set policy processing data
log.retention.minutes=6000

#Topic size of each partition, size limit of one topic = number of partitions* log.retention.bytes , - 1 has no size
log.retention.bytes=-1
    
#Cycle time of file size check
log.retention.check.interval.ms=50000
    
#Whether to enable log cleaning? true by default
log.cleaner.enable=true

#Number of threads for log cleanup
log.cleaner.threads = 2

#Maximum size processed during log cleanup
log.cleaner.io.max.bytes.per.second=None

#The cache space for log cleaning and de duplication. The larger the space, the better
log.cleaner.dedupe.buffer.size=500*1024*1024
    
#Generally, the IO block size used in log cleaning does not need to be modified
log.cleaner.io.buffer.size=512*1024

#The higher the value is, the more cleanup is, the more serious the hash conflict is
log.cleaner.io.buffer.load.factor=0.9

#Check for log intervals that need to be cleaned up
log.cleaner.backoff.ms =15000

#The greater the frequency control of log cleaning, the more efficient it is. At the same time, there will be some space waste. The topic parameter covers
log.cleaner.min.cleanable.ratio=0.5

#The maximum retention time of compressed logs will be overwritten by the specified parameters when topic is created
log.cleaner.delete.retention.ms =100000

#For the index file size limit of segment log, it will be overwritten by the specified parameters when topic is created
log.index.size.max.bytes =10*1024*1024

#The larger the offset interval of the index, the faster the scan speed, but also more memory intensive
log.index.interval.bytes =4096

#How many messages to refresh to disk once
log.flush.interval.messages=9223372036854775807

#How many milliseconds later to refresh to disk once, no setting to use log.flush.scheduler.interval.ms
log.flush.interval.ms = null

#Check if refresh to disk interval is required
log.flush.scheduler.interval.ms =3000

#Generally, the retention time of files after being cleared in the index does not need to be modified
log.delete.delay.ms =60000

#Control the time point of last disk dropping for data recovery
log.flush.offset.checkpoint.interval.ms =60000

log.cleanup.policy The parameter control log is clear. It is deleted by default. You can log.cleanup.policy Parameter set to "delete,compact"

The compact here is not compression, but integration for the key of each message. For different value values with the same key, only the last version is reserved.

Pay attention to the difference between compact and compression. Compact is more like the mark arrangement of memory recycling. Compression means compression. Compression in kafka is for message content.

The deletion of kafka can be based on 3:

  1. Time based
  2. Based on size
  3. Based on offset

log.retention.hours , log.retention.minutes as well as log.retention.ms To configure, where Time based configuration with priority from high to low:

  1. log.retention.ms
  2. log.retention.minutes
  3. log.retention.hours

Only configured by default log.retention.hours Parameter, the value is 168, so the retention time of log segmented file is 7 days by default.

Size based delete passed log.retention.bytes Parameter control, default is - 1, no size limit.

log.retention.bytes and log.retention.minutes Any one that meets the requirements will be deleted and will be overwritten by the specified parameters when topic is created

Every time kafka cleans up the log, it will merge the segments. After merging, the size will not exceed log.segments.bytes Configuration, 1GB by default.

controller

#Whether to allow the broker to be closed. If it is set to true, all the leader s on the broker will be closed and transferred to other brokers
controlled.shutdown.enable=false

#Number of attempts to shut down the controller
controlled.shutdown.max.retries=3

#Time interval per shutdown attempt
controlled.shutdown.retry.backoff.ms=5000
    
#Timeout of socket when communicating between partition leader and replica
controller.socket.timeout.ms =30000

#When the partition leader and replica data are synchronized, the message queue size
controller.message.queue.size=10

controlled.shutdown.enable=true Mainly for elegant closing:

  1. Can accelerate restart
  2. Make the leader switch faster and reduce the unavailable time of each partition to a few milliseconds

summary

Tags: Programming kafka Zookeeper socket Session

Posted on Wed, 27 May 2020 04:56:38 -0700 by mustang