A Brief Introduction to Alternative Flume-Kafka Connect

We know that in the past, Kafka was defined as a distributed, partitioned, log submission service with a backup mechanism. It's a distributed message queue, which is the most common use of it. But Kafka goes beyond that and opens up the latest official website.

We see that the latest definition of Kafka is Apache Kafka < is a distributed streaming platform.

Distributed stream processing platform.

The characteristics of Kafka are also clearly described here: Kafka is used to build real-time data pipelines and streaming applications. It is scalable, fault-tolerant and fast, and has been put into production in thousands of companies.

So now Kafka is not only a distributed message queue, but also a stream processing platform. This is due to the introduction of two new components, Kafka Connect and Kafka Streaming, in and

Introduction to Kafka Connect

We know that message queues must have upstream and downstream systems to move messages in and out. For example, the classical log analysis system reads and writes the log to kafka through flume, and stores downstream process the real-time data.

The function of Kafka Connect is to replace Flume so that the data transmission can be done by Kafka Connect. Kafka Connect is a tool for reliable and reliable data transmission between Apache Kafka and other systems. It can quickly move large data sets into and out of Kafka.

The import job of Kafka Connect can transfer data from database or application server to Kafka. The export job can transfer data from Kafka to query system or batch processing system for offline analysis.

Kafka Connect features include:

  • A general framework for Kafka connections - Kafka Connect standardizes the integration of other data systems with Kafka, simplifies connector development, deployment and management
  • Distributed and Independent Patterns - Support large distributed management services and deployment of small production environments
  • REST Interface - submitting and managing Kafka Connect through an easy-to-use REST API
  • Automatic offset management - Kafka Connect can automatically manage the offset submission process by simply getting some information from the connector, so connector developers need not worry about offset submission in connector development.
  • By default, distributed and extensible - Kafka Connect is built on existing group management protocols. Extended clusters can be added
  • Streaming Media/Batch Processing Integration - Using Kafka's existing functions, Kafka Connect is an ideal solution for bridging streaming media and batch data systems.

Run Kafka Connect

Kafka Connect currently supports two modes of operation: independent and clustered.

Independent model

In stand-alone mode, there is only one process, which is easier to set up and use. But there is no fault-tolerant function.


> bin/connect-standalone.sh config/connect-standalone.properties connector1.properties [connector2.properties ...]

Independent mode configuration

The first parameter, config/connect-standalone.properties, is some basic configuration:

These need to be set up in both stand-alone and cluster modes:

#Bootstrap. servers Kafka cluster list
#Serialization converters for key. converter key such as json's key.converter=org.apache.kafka.connect.json.JsonConverter
#Serialization Converter of Value. Converter Value

#Configuration specific to independent mode:
#offset.storage.file.filename for storing offset files
offset.storage.file.filename =/home/kafka/connect.offsets

Independent Mode Connector Configuration (Profile)

The latter parameter connector 1. properties [connector 2. properties...] can be multiple and is the connector configuration content.

Here we configure a configuration that reads data from a file and stores it in kafka:


  • Name - The unique name of the connector. Attempts to register again with the same name will fail.
  • connector.class - The Java class of a connector. The full name or alias of the class of this connector. Here we choose FileStream Sink
  • tasks.max - The maximum number of tasks that should be created for this connector. If the connector fails to achieve this level of parallelism, fewer tasks may be created.
  • key.converter - (optionally) overrides the default key converter set by the worker.
  • value.converter - (optionally) overrides the default value converter set by the worker.

The following two must be set up:

  • Topics - A comma-separated list of topics used as input to this connector
  • topics.regex - Java regular expressions for topics used as input for this connector

Converters can be configured in connectors

The parameters need to be specified:

  • Transfs - A list of aliases for a transformation, specifying the order in which the transformation will be applied.
  • Transfs. $alias. type - Fully qualified class name for the transformation.
  • Configuration properties of transforms.$alias.$transformationSpecificConfig transformation

For example, we add fields to the contents of the file converter we just did.

First, set connect-standalone.properties.

key.converter.schemas.enable = false
value.converter.schemas.enable = false

Set connect-file-source.properties

transforms=MakeMap, InsertSource

There is no result before conversion:

"hello world"

After conversion:

{"line":"hello world","data_source":"test-file-source"}

Common conversion types:

  • InsertField - Adding fields using static data or record metadata
  • ReplaceField - Filter or rename fields
  • MaskField - Replace fields with valid null values of types (0, null strings, etc.)
  • ValueToKey Value to Key Value
  • HoistField - Wrap the entire event as a single field in Struct ures or Map s
  • ExtractField - Extracts a specific field from Struct and Map and contains only that field in the result
  • SetSchemaMetadata - Modify schema name or version
  • TimestampRouter - Modify record topics based on the original theme and timestamp
  • RegexRouter - Modify record topics by replacing strings and regular expressions based on the original theme

Cluster model

In cluster mode, it can be extended and fault-tolerant.


> bin/connect-distributed.sh config/connect-distributed.properties

In cluster mode, Kafka Connect stores offset, configuration, and task status in Kafka topics.

Cluster Mode Configuration


#Basic configuration is also required.

#There are other configurations to note
#group.id (default connect-cluster) - Note that the group id of Connect should not conflict with the user's group id

#Theme for storing offsets; this topic should have many partitions

#Themes for storing connectors and task configurations can only be partitioned in one partition

#Topics for storing state; this topic can have multiple partitions

In cluster mode, configuration does not pass in at the command line, but requires the REST API to create, modify, and destroy connectors.

Cluster Mode Connector Configuration (REST API)

REST API server can be configured to support http and https


By default, if listeners are not specified, the REST server runs on port 8083 using the HTTP protocol.

The following is the REST API currently supported:

  • GET/connectors - Returns the list of active connectors
  • POST/connectors - Create a new connector; The request body should be a JSON object containing the string name field and an object field containing config connector configuration parameters
  • GET/connectors/{name} - Get information about specific connectors
  • GET/connectors/{name}/config - Gets configuration parameters for specific connectors
  • PUT/connectors/{name}/config - Update configuration parameters for specific connectors
  • GET/connectors/{name}/status - Gets the current status of the connector, including whether it is running, failing, pausing, etc., which staff member is assigned, error messages (if failing), and the status of all tasks.
  • GET/connectors/{name}/tasks - Gets the list of tasks currently running for connectors
  • GET/connectors/{name}/tasks/{taskid}/status - Gets the current status of the task, including whether it is running, failing, pausing, etc., which staff member is assigned, and whether the error message fails.
  • PUT/connectors/{name}/pause - Suspend the connector and its tasks, which will stop message processing until the connector is restored
  • PUT/connectors/{name}/resume - Restore the suspended connector (if the connector is not suspended, no action is performed)
  • POST/connectors/{name}/restart - Restart the connector (usually because it has failed)
  • POST/connectors/{name}/tasks/{taskId}/restart - Restart individual tasks (usually due to failure)
  • DELETE/connectors/{name} - Delete connectors, pause all tasks and delete their configuration

Connector Development Guide

kakfa allows developers to develop a connector themselves.

Core concepts

To replicate data between Kafka and other systems, users need to create a Connector

Connector has two forms:

Source Connectors import data from another system, for example, JDBC Source Connector imports relational databases into Kafka

Sink Connectors export data, for example, HDFS Sink Connector exports the contents of Kafka topics to HDFS files

And the corresponding Task:

SourceTask and InkTask

Task forms input and output streams, and attention should be paid to offset when developing Task.

Each stream should be a series of key value records. The offset of processed data also needs to be submitted regularly so that in case of failure, the processing can be recovered from the offset submitted last time. Connector also needs to be dynamic, and the implementation is also responsible for monitoring whether any changes exist in the external system.

Develop a simple connector

Developing connectors requires only two interfaces, Connector and Task.

Here we simply develop a FileStream Connector.

This connector is used in stand-alone mode. SourceConnector / SourceTask reads every line of a file and SinkConnector / SinkTask writes every record to a file.

Connector example:

Inherit SourceConnector, add fields (the name of the file to read and the subject to send data)

public class FileStreamSourceConnector extends SourceConnector {
    private String filename;
    private String topic;

Define the class that actually reads the data

public Class<? extends Task> taskClass() {
    return FileStreamSourceTask.class;

Define this class under FileStream SourceTask. Next, we add some standard lifecycle approaches, start() and stop().

public void start(Map<String, String> props) {
    // The complete version includes error handling as well.
    filename = props.get(FILE_CONFIG);
    topic = props.get(TOPIC_CONFIG);
public void stop() {
    // Nothing to do since no background monitoring is required.

Finally, the real core of implementation is taskConfigs()

public List<Map<String, String>> taskConfigs(int maxTasks) {
    ArrayList<Map<String, String>> configs = new ArrayList<>();
    // Only one input stream makes sense.
    Map<String, String> config = new HashMap<>();
    if (filename != null)
        config.put(FILE_CONFIG, filename);
    config.put(TOPIC_CONFIG, topic);
    return configs;

Task examples:

Source Tasks

Implement SourceTask to create FileStream SourceTask to inherit SourceTask

public class FileStreamSourceTask extends SourceTask {
    String filename;
    InputStream stream;
    String topic;
    public void start(Map<String, String> props) {
        filename = props.get(FileStreamSourceConnector.FILE_CONFIG);
        stream = openOrThrowError(filename);
        topic = props.get(FileStreamSourceConnector.TOPIC_CONFIG);
    public synchronized void stop() {

Next, we implement the main function of the task, that is, poll() retrieves events from the input system and returns the method List of the following contents:

public List<SourceRecord> poll() throws InterruptedException {
    try {
        ArrayList<SourceRecord> records = new ArrayList<>();
        while (streamValid(stream) && records.isEmpty()) {
            LineAndOffset line = readToNextLine(stream);
            if (line != null) {
                Map<String, Object> sourcePartition = Collections.singletonMap("filename", filename);
                Map<String, Object> sourceOffset = Collections.singletonMap("position", streamOffset);
                records.add(new SourceRecord(sourcePartition, sourceOffset, topic, Schema.STRING_SCHEMA, line));
            } else {
        return records;
    } catch (IOException e) {
        // Underlying stream was killed, probably as a result of calling stop. Allow to return
        // null, and driving thread will handle any shutdown if necessary.
    return null;

Receiving Tasks

Unlike SourceConnector and InkConnector, SourceTask and SinkTask have very different interfaces because SourceTask uses pull interfaces and SinkTask uses push interfaces. Both share a common life cycle approach, but SinkTask is completely different:

public abstract class SinkTask implements Task {
    public void initialize(SinkTaskContext context) {
        this.context = context;
    public abstract void put(Collection<SinkRecord> records);
    public void flush(Map<TopicPartition, OffsetAndMetadata> currentOffsets) {

This is a simple example, they have simple structured data - each row is just a string. Almost all practical connectors require patterns with more complex data formats. To create more complex data, you need to use the Kafka Connect data API.

Schema schema = SchemaBuilder.struct().name(NAME)
    .field("name", Schema.STRING_SCHEMA)
    .field("age", Schema.INT_SCHEMA)
    .field("admin", new SchemaBuilder.boolean().defaultValue(false).build())
Struct struct = new Struct(schema)
    .put("name", "Barbara Liskov")
    .put("age", 75);

More Kafka related articles:

What is Kafka?Summary of Kafka monitoring toolsKafka Quick StartConsumer, the Core of KafkaProducer of Kafka Core

More Real-time Computing, Flink,Kafka and other related technical blogs, welcome to pay attention to real-time streaming computing

Tags: kafka Apache REST JSON

Posted on Thu, 29 Aug 2019 20:35:50 -0700 by koenigsbote