Kafka Official Document Notes

Articles Catalogue

Kafka can now be used not only as a message queue, but also as a platform for stream processing computing.
Its main functions are as follows:

  1. Publish and Subscribe: A message flow is used as an information delivery system for publish and subscribe.
  2. Processing: Data streams can respond to events efficiently and in real time
  3. Storage: Store data streams securely in a distributed system with multiple copies.

Kafka < is used to build real-time data pipelines and streaming applications. It has horizontal scalability, fault tolerance and rapidity.

Okay, the home page of the official website has been translated. Here's an overview of Kafka itself.

Overview of Kafka

Kafka is somewhat similar to MQTT or AMQ we've learned before, and its main role is message middleware.
Message middleware: In vernacular terms, the boss makes meat buns for me to eat, I eat one second, the boss makes five in a second, I can not finish the four in a second, so I have to put the four bowls in the bowl, the bowl is equivalent to middleware, one bowl is middleware, two bowls are middleware expansion.


Kafka Architecture has the following main roles:

  1. Producer: the producer, the butcher
  2. consumer: consumer, that is, I eat meat with steamed buns.
  3. broker: That's Kafka's own container, a bowl for meat steamed buns.
  4. topic: After you order, the boss gives you a card, which means that the card becomes a header for you to eat meat with steamed buns.

Kafka is usually used in two categories of applications:

  • Constructing Real-time Streaming Data Pipeline for Reliable Data Acquisition between Systems or Applications
  • Building real-time streaming applications for transforming or responding to data streams

In addition, Kafka has three characteristics:

  • Kafka runs as a cluster on one or more servers that can span multiple data centers.
  • The Kafka cluster stores record streams in a category called a topic.
  • Each record consists of a key, a value and a timestamp.

Then start with the introductory case.

Goal 1: Deploy and use a single node, single Broker

Start Kafka first. You can also start Kafka in the background. Look at the person.

[centos01@linux01 config]$ kafka-server-start.sh 
$KAFKA_HOME/config/server.properties 

Second, create topic

[centos01@linux01 config]$ kafka-topics.sh \
--create \
--zookeeper linux01:2181 \
--replication-factor 1\
 --partitions 1 \
 --topic test

You can also look at existing topic s

[centos01@linux01 config]$ kafka-topics.sh \
--list \
--zookeeper linux01:2181

Then send a message to the topic

[centos01@linux01 config]$ kafka-console-producer.sh \
--broker-list linux01:9092 \
--topic test

Then the message can be sent directly to the broker:
You can use a consumer to monitor the information sent by the producer (- from - starting means monitoring from the beginning of the producer, otherwise only after the command is started)

[centos01@linux01 config]$ kafka-console-consumer.sh \
--zookeeer linux01:2181 \
--topic test \
--from-beginning

You can also view the details of all topic s

[centos01@linux01 config]$ kafka-topics.sh \
--describe \
--zookeeper linux01:2181

And details of a topic

[centos01@linux01 config]$ kafka-topics.sh \
--describe \
--zookeeper linux01:2181\
--topic test

Goal 2: Deploy and use single-node multi-Broker

According to the case of the official website, we also create three nodes under the cluster. First, we copy our properties.

Then modify the port number, roker_id and the name of the log file output. The configuration of the key parts of server1.properties is shown below.

broker.id=1

listeners=PLAINTEXT://:9093

log.dirs=/tmp/kafka-logs-1

The other configuration files are the same. After configuration, start three broker s in the background.

[centos01@linux01 config]$ kafka-server-start.sh server1.properties &
[centos01@linux01 config]$ kafka-server-start.sh server2.properties &
[centos01@linux01 config]$ kafka-server-start.sh server3.properties &

Then the old routine creates the topic, but this time the specification is a partition with three copies

[centos01@linux01 ~]$ kafka-topics.sh \
--create   \
--zookeeper linux01:2181  \
--replication-factor 3  \
--partitions 1  \
--topic test2 
Created topic "test2".

Create Success
Then take a look at the details of the topic we created.

Then, as before, create our message producer

[centos01@linux01 ~]$ kafka-console-producer.sh \
--broker-list linux01:9093,linux01:9096,linux01:9095 \
--topic test2

Consumer

kafka-console-consumer.sh --zookeeper linux01:2181 --topic test2

The test method is the same as above.
If you are interested, you can see the log file created. Index is the index file. Log is the log file.

Goal 3: Kafka API programming - Producer end development

Next, we use IDEA programming to write distributed message queue Kafka, which focuses on creating producer consumer.

  1. Dependence on self-introduction
<dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka_2.11</artifactId>
       <version>0.11.0.2</version>
</dependency>
  1. Configuration of Kafka-related zookeeper, topic, broker-list


3. Write a producer class, write a loop, and constantly send message s to the specified port

/**
 * kafka Producers, need top and broker-list
 */
public class KafkaProducer extends Thread{

    private String topic;

    private Producer<Integer, String> producer;

    public KafkaProducer(String topic) {
        this.topic = topic;

        Properties properties = new Properties();
        //Establishing handshake mechanism: requiring the server to give feedback to the client's message
        // 0: Don't wait for feedback from the handshake mechanism
        // 1: The server writes the request to the log and immediately responds to the handshake. In order not to lose data, most of them choose 1.
        // - 1: The most secure handshake mechanism is to wait for all copies synchronously.
        properties.put("request.required.acks", "1");
        //Setting serialization
        properties.put("serializer.class", "kafka.serializer.StringEncoder");
        //Establishing broker-list
        properties.put("metadata.broker.list", KafKaProperties.BROKER_LIST);


        producer = new Producer<Integer, String>(new ProducerConfig(properties));
    }
    //Start testing with threads
    @Override
    public void run() {
        int messageNo = 1;
        while (true) {
            String message = "message_" + messageNo;//Get message content
            producer.send(new KeyedMessage<Integer, String>(topic, message));
            System.out.println("Sent" + message);
            messageNo++;

            try {
                Thread.sleep(2000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        
    }

}
  1. Background Start consumer to Collect Producer Messages
[centos01@linux01 ~]$ kafka-console-consumer.sh \
--zookeeper linux01:2181 \
--topic test

Goal 4: Kafka API programming - Consumer development

  1. Or set up our configuration file first and add a group_id to the original one

  2. Then we write our Consumer class.

/**
 * When consuming, the main use is zookeeper.
 * Production mainly depends on broker-list
 */
public class KafkaComsumer extends Thread{

    private String topic;

    public KafkaComsumer(String topic) {
        this.topic = topic;
    }

    private ConsumerConnector createConnector() {

        Properties properties = new Properties();
        properties.put("zookeeper.connect", KafKaProperties.ZK);
        properties.put("group.id", KafKaProperties.GROUP_ID);

        return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
    }

    @Override
    public void run() {
        ConsumerConnector consumerConnector = createConnector();

        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, 1);
        //  String:topic
        //  List < KafkaStream < byte [], byte []> corresponding data stream
        Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumerConnector.createMessageStreams(topicCountMap);

        KafkaStream<byte[], byte[]> messageAndMetadata = messageStreams.get(topic).get(0);//Get data for each time

        ConsumerIterator<byte[], byte[]> iterator = messageAndMetadata.iterator();//Use iterator traversal to get data for each time

        while (iterator.hasNext()) {
            String s = new String(iterator.next().message());
            System.out.println("Consumer"+s);
        }
    }
}

  1. Detect from the background whether the message is received synchronously.

Goal 5: Kafka API Programming - Integrating Flume to Achieve Real-time Data Acquisition

  1. Overall architecture

Flume directly uses kafka Sink as a Producer to consume Kafka consumers

  1. Configure a new avro-memory-kafka.conf
# Name the components on this agent
avro-memory-kafka.sources = avro-source
avro-memory-kafka.sinks = kafka-sink
avro-memory-kafka.channels = memory-channel

# Describe/configure the source
avro-memory-kafka.sources.avro-source.type = avro
avro-memory-kafka.sources.avro-source.bind = linux01
avro-memory-kafka.sources.avro-source.port = 44444

# Describe the sink
avro-memory-kafka.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink
avro-memory-kafka.sinks.kafka-sink.topic = test
avro-memory-kafka.sinks.kafka-sink.kafka.bootstrap.servers = linux01:9092
avro-memory-kafka.sinks.kafka-sink.kafka.producer.acks=1
avro-memory-kafka.sinks.kafka-sink.flumeBatchSize = 1

# Use a channel which buffers events in memory
avro-memory-kafka.channels.memory-channel.type = memory
avro-memory-kafka.channels.memory-channel.capacity = 1000
avro-memory-kafka.channels.memory-channel.transactionCapacity = 100

# Bind the source and sink to the channel
avro-memory-kafka.sources.avro-source.channels = memory-channel
avro-memory-kafka.sinks.kafka-sink.channel = memory-channel
  1. Start avro-memory-kafka.
[centos01@linux01 conf]$ flume-ng agent \
--name avro-memory-kafka \
--conf ./  \
--conf-file avro-memory-kafka.conf \
-Dflume.root.logger=INFO,console
  1. Restart exec-memory-avro
flume-ng agent \
--name avro \
--conf ./ \
--conf-file avro-memory-kafka.conf \
-Dflume.root.logger=INFO,console
  1. Start a consumer from the background
[centos01@linux01 ~]$ kafka-console-consumer.sh \
--zookeeper linux01:2181 \
--topic test
  1. Write files to modify flume configuration files

Note: It is necessary to note that in avro-memory-kafka.conf, we set up a flumeBatchSize. This property is to show that when the number of messages sent exceeds how many will be printed on the console. I set 1 here, so no problem. It is recommended that the value be lowered when testing.

Then it's printed out.

Tags: kafka Zookeeper Programming Apache

Posted on Sun, 11 Aug 2019 22:54:48 -0700 by jsbrown