Introduction to Kafka Streams, the Simplest Stream Processing Engine

Before version 0.10.0.0, Kafka was positioned as a distributed, partitioned, log submission service with backup mechanism. Kafka did not provide data processing services before that. Our stream computing mainly depends on the stream processing frameworks such as Storm, Spark Streaming and Flink.

Storm, Spark Streaming and Flink have their own advantages.

Storm is low latency and has a certain position in the market, and many companies are still using it.

Spark Streaming takes advantage of Spark's system advantages, and active communities also have a certain share.

Flink is designed to be closer to stream processing, and has a convenient API, which is sure to develop in the future.

But they can not do without Kafka's message transfer, so Kafka launched its own streaming framework, Kafka Streams, in version 0.10.0.0. The positioning of Kafka is also officially becoming Apache Kafka < is * a distributed streaming platform,* a distributed stream processing platform.

Real-time Flow Computing

Real-time streaming computing has developed rapidly in recent years, mainly because of the value of real-time data and its impact on data processing architecture. Real-time streaming computing includes near-real-time consistent repeatable results of unbounded data and so on. a type of data processing engine that is designed with infinite data sets in mind is a data processing engine that considers wireless data sets.

1. Infinite data: a growing, essentially infinite data set. These are often referred to as "streaming data". Infinite streaming data sets can be called unbounded data, and relatively limited batch data is bounded data.

2. Unbounded Data Processing: A continuous data processing mode applied to the above unbounded data. Batch processing data (offline computing) can also be run repeatedly to process data, but there are performance bottlenecks.

3. Low latency, near real-time results: Compared with offline computing, offline computing does not consider the problem of latency.

Two problems have been solved. Flow processing can replace batch processing system.

1. Correctness: With this, it is equivalent to batch computing.

Streaming needs data that can still calculate a certain time window over time. Spark Streaming solves this problem by using the idea of micro-batch. Real-time and offline systems are stored consistently, which should be satisfied in future real-time computing systems.

2. Tool for reasoning time: This allows us to go beyond batch computing.

A good time reasoning tool is very important for dealing with unbounded and disordered data of different events.

Time is divided into event time and processing time.

There are many concepts related to real-time streaming computing, which are not discussed here.

Introduction to Kafka Streams

Kafka Streams is considered the simplest way to develop real-time applications. It is a client API Library of Kafka, which can be streamed by writing simple java and scala code.

Advantages:

  • Flexibility, High Scalability, Fault Tolerance

  • Deploy to Containers, VM, Bare Machine, Cloud

  • The same applies to small, medium, and large use cases

  • Fully integrated with Kafka security

  • Writing standard Java and Scala applications

  • Developing on Mac, Linux and Windows

  • Exactly-once semantics

Use cases:

The New York Times uses Apache Kafka Kafka Streams and Kafka Streams store and distribute published content in real time to various applications and systems for readers to use.

Pinterest Large-scale Using Apache Kafka and Kafka Streams To support the real-time predictive budget system of its advertising infrastructure. Using Kafka Streams, the prediction is more accurate than before.

As a leading online fashion retailer in Europe, Zalando uses Kafka as an ESB (Enterprise Service Bus) to help us transform from a single service architecture to a microservice architecture. Using Kafka process Event streams To enable our technical team to achieve near real-time business intelligence.

The Dutch Cooperative Bank is one of the three largest banks in the Netherlands. Its digital nervous system Business Event Bus is supported by Apache Kafka. It is used by more and more financial processes and services, one of which is Rabo Alerts. This service will alert customers in real time when financial events occur, and Build with Kafka Streams.

LINE uses Apache Kafka As the central database we serve, we can communicate with each other. Hundreds of millions of messages are generated every day to perform various business logic, threat detection, search indexing and data analysis. LINE uses Kafka Streams to transform and filter themes reliably, enabling consumers to effectively consume sub-themes, while maintaining easy maintainability due to its complex and simple code base.

Topology

Kafka Streams defines its computational logic through one or more topologies, where the topology is a graph composed of streams (edges) and stream processors (nodes).

There are two special processors in the topology

  • Source Processor: Source Processor is a special type of stream processor without any upstream processor. It generates input streams for its topology from one or more Kafka topics by using records from these topics and forwarding them to its downstream processor.
  • Receiver processor: Receiver processor is a special type of stream processor without downstream processor. It sends any records it receives from its upstream processor to the specified Kafka topic.

In normal processor nodes, data can also be sent to remote systems. Therefore, the processed results can be streamed back to Kafka or written to an external system.

Kafka provides the most commonly used data conversion operations, such as map, filter, join and aggregations, which are easy to use.

Of course, there are also some things about time, windows, aggregation, disorderly processing, etc. In the future, we will introduce in detail one by one, and then we will develop a simple introduction case.

quick get start

First, provide the java and scala versions of WordCount.

java8+:

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.state.KeyValueStore;
 
import java.util.Arrays;
import java.util.Properties;
 
public class WordCountApplication {
 
    public static void main(final String[] args) throws Exception {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
 
        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> textLines = builder.stream("TextLinesTopic");
        KTable<String, Long> wordCounts = textLines
            .flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\W+")))
            .groupBy((key, word) -> word)
            .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"));
        wordCounts.toStream().to("WordsWithCountsTopic", Produced.with(Serdes.String(), Serdes.Long()));
 
        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();
    }
 
}

scala:

import java.util.Properties
import java.util.concurrent.TimeUnit
 
import org.apache.kafka.streams.kstream.Materialized
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala._
import org.apache.kafka.streams.scala.kstream._
import org.apache.kafka.streams.{KafkaStreams, StreamsConfig}
 
object WordCountApplication extends App {
  import Serdes._
 
  val props: Properties = {
    val p = new Properties()
    p.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application")
    p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092")
    p
  }
 
  val builder: StreamsBuilder = new StreamsBuilder
  val textLines: KStream[String, String] = builder.stream[String, String]("TextLinesTopic")
  val wordCounts: KTable[String, Long] = textLines
    .flatMapValues(textLine => textLine.toLowerCase.split("\\W+"))
    .groupBy((_, word) => word)
    .count()(Materialized.as("counts-store"))
  wordCounts.toStream.to("WordsWithCountsTopic")
 
  val streams: KafkaStreams = new KafkaStreams(builder.build(), props)
  streams.start()
 
  sys.ShutdownHookThread {
     streams.close(10, TimeUnit.SECONDS)
  }
}

If kafka has already started, you can skip the first two steps.

1. Download

download Version 2.3.0 and decompress it. Note that there are several downloadable versions of Scala, and we choose to use the recommended version (2.12):

> tar -xzf kafka_2.12-2.3.0.tgz
> cd kafka_2.12-2.3.0

2. Start-up

Kafka uses ZooKeeper´╝î So if you don't have a ZooKeeper server, you need to start it first.

> bin/zookeeper-server-start.sh config/zookeeper.properties
[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...

Start the Kafka server:

> bin/kafka-server-start.sh config/server.properties
[2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
...

3. Create topic Start Producer

We create an input theme named streams-plaintext-input and an output theme named streams-wordcount-output:

> bin/kafka-topics.sh --create \
    --bootstrap-server localhost:9092 \
    --replication-factor 1 \
    --partitions 1 \
    --topic streams-plaintext-input
Created topic "streams-plaintext-input".


> bin/kafka-topics.sh --create \
    --bootstrap-server localhost:9092 \
    --replication-factor 1 \
    --partitions 1 \
    --topic streams-wordcount-output \
    --config cleanup.policy=compact
Created topic "streams-wordcount-output".

See:

> bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe
 
Topic:streams-plaintext-input   PartitionCount:1    ReplicationFactor:1 Configs:
    Topic: streams-plaintext-input  Partition: 0    Leader: 0   Replicas: 0 Isr: 0
Topic:streams-wordcount-output  PartitionCount:1    ReplicationFactor:1 Configs:cleanup.policy=compact
    Topic: streams-wordcount-output Partition: 0    Leader: 0   Replicas: 0 Isr: 0

4. Start WordCount

The following command starts the WordCount demo application:

> bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo

The demo application reads from the input topic stream-plaintext-input, performs WordCount algorithm calculation on each read message, and continuously writes its current results to the output topic streams-wordcount-output. Therefore, there will be no STDOUT output other than log entries, because the results will be written back to Kafka.

Now we can start the console generator in a separate terminal and write some input data for this topic:

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input

The output of the WordCount demo application is checked by using console users to read its output topic in a separate terminal:

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
    --topic streams-wordcount-output \
    --from-beginning \
    --formatter kafka.tools.DefaultMessageFormatter \
    --property print.key=true \
    --property print.value=true \
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

5. Processing data

We input some data on the producer side.

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input
all streams lead to kafka

Output:

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
    --topic streams-wordcount-output \
    --from-beginning \
    --formatter kafka.tools.DefaultMessageFormatter \
    --property print.key=true \
    --property print.value=true \
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
 
all     1
streams 1
lead    1
to      1
kafka   1

Continue to enter:

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input
all streams lead to kafka
hello kafka streams
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
    --topic streams-wordcount-output \
    --from-beginning \
    --formatter kafka.tools.DefaultMessageFormatter \
    --property print.key=true \
    --property print.value=true \
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
 
all     1
streams 1
lead    1
to      1
kafka   1
hello   1
kafka   2
streams 2

We see that with real-time data input, the results of wordcount are output in real time.

6. Stop Procedure

You can now stop console users, console producers, Wordcount applications, Kafka agents, and ZooKeeper servers sequentially through Ctrl-C.

What is Kafka? Summary of Kafka monitoring tools Kafka Quick Start Consumer, the Core of Kafka Producer of Kafka Core

A Brief Introduction to Alternative Flume-Kafka Connect

More Real-time Computing, Flink,Kafka and other related technical blogs, welcome to pay attention to real-time streaming computing

Tags: Big Data kafka Apache Scala Java

Posted on Thu, 05 Sep 2019 00:43:14 -0700 by nightkarnation