Quick Start for Kafka - Introduction to Confluent Kafka

Quick Start for Kafka (8) - Introduction to Confluent Kafka

1. Introduction to Confluent Kafka

1. Introduction to Confluent Kafka

In 2014, Jay Kreps, NahaNarkhede and Rao Jun, Kafka's founders, left LinkedIn to create Confluent, which focused on providing Kafka-based enterprise stream processing solutions and released Confluent Kafka.Confluent Kafka is divided into open source and enterprise editions, which are charged for the enterprise edition.

2. Confluent Kafka characteristics

Confluent Kafka has the following open source features:
(1) Confluent Kafka Connectors: supports Kafka Connect JDBC Connector, Kafka Connect HDFS Connector, Kafka Connect Elasticsearch Connector, Kafka Connect S3 Connector.
(2) Multi-client support: support C/C++, Python, Go,.Net, Java clients.
(3)Confluent Schema Registry
(4)Confluent Kafka REST Proxy
Confluent Kafka Enterprise Edition features the following:
(1)Automatic Data Balancing
(2)Multi-DataCenter Replication
(3)Confluent Control Center
(4)JMS Client

3. Confluent Kafka Client

4. Confluent Kafka Client Feature Support

2. Confluent Kafka components

1,Schema Registry

When data is sent over the network or stored in a file, it needs to be serialized.Common cross-language serialization libraries, such as Avro, Thrift, and Proocol buffer, have two obvious drawbacks:
(1) Lack of contract between data producer and data consumer.If upstream producers randomly change the data format, it is difficult to ensure that downstream consumers interpret the data correctly.
(2) Cost and redundancy.All messages show that they hold the same field name and type information, and there is a lot of redundant data.
Whether you use the traditional Avro API to customize serialized and deserialized classes or use Twitter's Bijection library to serialize and deserialize Avro, schema is embedded in each Kafka record, doubling the size of the record.The entire schema is required to read records, and Schema Registry allows all records to share a schema.

The producer saves the schema needed to write the message to the Schema Registry and references the schema ID in the message.Consumers use IDs to retrieve schemas from Schema Registry lira to deserialize messages.
User's schema file is as follows:
{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"},
{"name": "age", "type": "int"}
]
}

Confluent Schema Registry provides a RESTful interface for developers to define standard schemas for events, share them throughout the organization, and safely improve them in a way that is backward compatible and proven in the future.

2,Control Center

Confluent Control Center is a control center that manages the entire product. The primary function is to monitor the performance of individual producers and consumers in the Kafka cluster, while easily managing Kafka connections, creating, editing, and managing connections to other systems.
Confluent Control Center only supports Confluent Kafk Enterprise Edition.

3,KSQL

KSQL is a streaming SQL engine for Apache Kafka, providing a simple, fully interactive SQL interface for data processing using Kafka, supporting a number of powerful data flow processing operations, including aggregation, connection, windowing, and sessionization (capturing all click-stream events within a single visitor's site session time range), etc.
KSQL is currently unable to execute queries. KSQL provides continuous queries against the data streams in Kafka, that is, as new data continues to flow in, conversion continues.

4,Confluent Replicator

Confluent Platform makes it easy to maintain multiple Kafka clusters within multiple data centers.Confluent Replicator provides the ability to manage data replication and topic configuration between data centers in the following scenarios:
(1) Active-active geolocation deployment: Allows users to access the nearest (nearby) data center to optimize its architecture for low latency and high performance.
(2) Centralized analysis: aggregate data from multiple Kafka clusters into one place for organization-wide analysis.
(3) Cloud migration: You can use Kafka to complete data migration between local applications and the cloud.

5,Confluent Auto Data Balancer

As clusters grow, topic s and partitions grow at different rates, and over time, adding and deleting can lead to uneven workloads (data skewing) across data center resources.Confluent Auto Data Balancer monitors the number of brokers, partitions, and partitions in the cluster, allows data to be transferred to create an even workload across the cluster, while limiting rebalance traffic to minimize the impact on production workloads when rebalancing.

6,Kafka Connect

Kafka Connect is an open source component of Kafka and is the basic framework for connecting Kafka to external systems such as databases, key-value storage systems, search systems, file systems, and so on.
By using the Kafka Connect framework and existing Connector s, it is possible to read messages from source data to Kafka and then from Kafka to destination.
Confluent provides support for a variety of common systems on the basis of Kafka Connect:
(1)Kafka Connect ActiveMQ Connector
(2)Kafka FileStream Connectors
(3)Kafka Connect HDFS
(4)Kafka Connect JDBC Connector
(5)Confluent Kafka Replicator
(6)Kafka Connect S3
(7)Kafka Connect Elasticsearch Connector
(8)Kafka Connect IBM MQ Connector
(9)Kafka Connect JMS Connector

7,MQTT Proxy333

MQTT Proxy provides a scalable, lightweight interface that allows MQTT clients to send messages directly to Kafka in a native way.
MQTT Proxy is used to integrate Internet of Things (IoT)-based applications with Kafka and can be used in networked automobiles, manufacturing lines, and predictive maintenance applications.
MQTT Proxy uses Transport Layer Security (TLS) encryption and basic authentication, supports the MQTT 3.1.1 protocol, and can publish messages at all three MQTT service quality levels, including sending messages to consumers: 1) at most once (send and discard); 2) at least once (require confirmation); and 3) exactly once.

III. Confluent Kafka Docker Deployment

1. Confluent Kafka One-stop Deployment

Confluent provides a one-stop deployment of all components of Confluent Kafka, the cp-all-in-one project, which is divided into the enterprise version and the source community version, with more Control Center services in the enterprise version than in the community version.
GitHub project address:
https://github.com/confluentinc/cp-all-in-one

2. Confluent Kafka Enterprise Edition

Docker-Compose.ymlFiles:

version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:5.5.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-server:5.5.0
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
      CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: 'true'
      CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'

  schema-registry:
    image: confluentinc/cp-schema-registry:5.5.0
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
      - zookeeper
      - broker
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'

  connect:
    image: cnfldemos/cp-server-connect-datagen:0.3.2-5.5.0
    hostname: connect
    container_name: connect
    depends_on:
      - zookeeper
      - broker
      - schema-registry
    ports:
      - "8083:8083"
    environment:
      CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      # CLASSPATH required due to CC-2422
      CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-5.5.0.jar
      CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
      CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
      CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
      CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR

  control-center:
    image: confluentinc/cp-enterprise-control-center:5.5.0
    hostname: control-center
    container_name: control-center
    depends_on:
      - zookeeper
      - broker
      - schema-registry
      - connect
      - ksqldb-server
    ports:
      - "9021:9021"
    environment:
      CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
      CONTROL_CENTER_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      CONTROL_CENTER_CONNECT_CLUSTER: 'connect:8083'
      CONTROL_CENTER_KSQL_KSQLDB1_URL: "http://ksqldb-server:8088"
      CONTROL_CENTER_KSQL_KSQLDB1_ADVERTISED_URL: "http://localhost:8088"
      CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      CONTROL_CENTER_REPLICATION_FACTOR: 1
      CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
      CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
      CONFLUENT_METRICS_TOPIC_REPLICATION: 1
      PORT: 9021

  ksqldb-server:
    image: confluentinc/cp-ksqldb-server:5.5.0
    hostname: ksqldb-server
    container_name: ksqldb-server
    depends_on:
      - broker
      - connect
    ports:
      - "8088:8088"
    environment:
      KSQL_CONFIG_DIR: "/etc/ksql"
      KSQL_BOOTSTRAP_SERVERS: "broker:29092"
      KSQL_HOST_NAME: ksqldb-server
      KSQL_LISTENERS: "http://0.0.0.0:8088"
      KSQL_CACHE_MAX_BYTES_BUFFERING: 0
      KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
      KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
      KSQL_KSQL_CONNECT_URL: "http://connect:8083"

  ksqldb-cli:
    image: confluentinc/cp-ksqldb-cli:5.5.0
    container_name: ksqldb-cli
    depends_on:
      - broker
      - connect
      - ksqldb-server
    entrypoint: /bin/sh
    tty: true

  ksql-datagen:
    image: confluentinc/ksqldb-examples:5.5.0
    hostname: ksql-datagen
    container_name: ksql-datagen
    depends_on:
      - ksqldb-server
      - broker
      - schema-registry
      - connect
    command: "bash -c 'echo Waiting for Kafka to be ready... && \
                       cub kafka-ready -b broker:29092 1 40 && \
                       echo Waiting for Confluent Schema Registry to be ready... && \
                       cub sr-ready schema-registry 8081 40 && \
                       echo Waiting a few seconds for topic creation to finish... && \
                       sleep 11 && \
                       tail -f /dev/null'"
    environment:
      KSQL_CONFIG_DIR: "/etc/ksql"
      STREAMS_BOOTSTRAP_SERVERS: broker:29092
      STREAMS_SCHEMA_REGISTRY_HOST: schema-registry
      STREAMS_SCHEMA_REGISTRY_PORT: 8081

  rest-proxy:
    image: confluentinc/cp-kafka-rest:5.5.0
    depends_on:
      - zookeeper
      - broker
      - schema-registry
    ports:
      - 8082:8082
    hostname: rest-proxy
    container_name: rest-proxy
    environment:
      KAFKA_REST_HOST_NAME: rest-proxy
      KAFKA_REST_BOOTSTRAP_SERVERS: 'broker:29092'
      KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
      KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'

Start container service:
docker-compose -f docker-compose.yml up -d
Close Container Service:
docker-compose -f docker-compose.yml down

3. Confluent Kafka Community Edition

Docker-Compose.ymlFiles:

version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:5.5.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-kafka:5.5.0
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "29092:29092"
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0

  schema-registry:
    image: confluentinc/cp-schema-registry:5.5.0
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
      - zookeeper
      - broker
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'

  connect:
    image: cnfldemos/kafka-connect-datagen:0.3.2-5.5.0
    hostname: connect
    container_name: connect
    depends_on:
      - zookeeper
      - broker
      - schema-registry
    ports:
      - "8083:8083"
    environment:
      CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
      CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR

  ksqldb-server:
    image: confluentinc/cp-ksqldb-server:5.5.0
    hostname: ksqldb-server
    container_name: ksqldb-server
    depends_on:
      - broker
      - connect
    ports:
      - "8088:8088"
    environment:
      KSQL_CONFIG_DIR: "/etc/ksql"
      KSQL_BOOTSTRAP_SERVERS: "broker:29092"
      KSQL_HOST_NAME: ksqldb-server
      KSQL_LISTENERS: "http://0.0.0.0:8088"
      KSQL_CACHE_MAX_BYTES_BUFFERING: 0
      KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
      KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
      KSQL_KSQL_CONNECT_URL: "http://connect:8083"

  ksqldb-cli:
    image: confluentinc/cp-ksqldb-cli:5.5.0
    container_name: ksqldb-cli
    depends_on:
      - broker
      - connect
      - ksqldb-server
    entrypoint: /bin/sh
    tty: true

  ksql-datagen:
    image: confluentinc/ksqldb-examples:5.5.0
    hostname: ksql-datagen
    container_name: ksql-datagen
    depends_on:
      - ksqldb-server
      - broker
      - schema-registry
      - connect
    command: "bash -c 'echo Waiting for Kafka to be ready... && \
                       cub kafka-ready -b broker:29092 1 40 && \
                       echo Waiting for Confluent Schema Registry to be ready... && \
                       cub sr-ready schema-registry 8081 40 && \
                       echo Waiting a few seconds for topic creation to finish... && \
                       sleep 11 && \
                       tail -f /dev/null'"
    environment:
      KSQL_CONFIG_DIR: "/etc/ksql"
      STREAMS_BOOTSTRAP_SERVERS: broker:29092
      STREAMS_SCHEMA_REGISTRY_HOST: schema-registry
      STREAMS_SCHEMA_REGISTRY_PORT: 8081

  rest-proxy:
    image: confluentinc/cp-kafka-rest:5.5.0
    depends_on:
      - zookeeper
      - broker
      - schema-registry
    ports:
      - 8082:8082
    hostname: rest-proxy
    container_name: rest-proxy
    environment:
      KAFKA_REST_HOST_NAME: rest-proxy
      KAFKA_REST_BOOTSTRAP_SERVERS: 'broker:29092'
      KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
      KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'

Start container service:
docker-compose -f docker-compose.yml up -d
Close Container Service:
docker-compose -f docker-compose.yml down

4. Confluent Kafka Cloud Service Edition

Docker-Compose.ymlFiles:

version: '2'
services:

  schema-registry:
    image: confluentinc/cp-schema-registry:5.5.0
    hostname: schema-registry
    container_name: schema-registry
    ports:
      - '8085:8085'
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8085
      SCHEMA_REGISTRY_KAFKASTORE_SSL_ENDPOINT_IDENTIFIED_ALGORITHM: "https"
      SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: "SASL_SSL"
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: $BOOTSTRAP_SERVERS
      SCHEMA_REGISTRY_KAFKASTORE_SASL_JAAS_CONFIG: $SASL_JAAS_CONFIG
      SCHEMA_REGISTRY_KAFKASTORE_SASL_MECHANISM: "PLAIN"

  ksqldb-server:
    image: confluentinc/cp-ksqldb-server:5.5.0
    hostname: ksqldb-server
    container_name: ksqldb-server
    ports:
      - "8089:8089"
    environment:
      KSQL_HOST_NAME: ksqldb-server
      KSQL_CONFIG_DIR: "/etc/ksql"
      KSQL_LISTENERS: "http://0.0.0.0:8089"
      KSQL_AUTO_OFFSET_RESET: "earliest"
      KSQL_COMMIT_INTERVAL_MS: 0
      KSQL_CACHE_MAX_BYTES_BUFFERING: 0
      KSQL_KSQL_SCHEMA_REGISTRY_URL: $SCHEMA_REGISTRY_URL
      KSQL_KSQL_SCHEMA_REGISTRY_BASIC_AUTH_CREDENTIALS_SOURCE: $BASIC_AUTH_CREDENTIALS_SOURCE
      KSQL_KSQL_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO: $SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO
      KSQL_BOOTSTRAP_SERVERS: $BOOTSTRAP_SERVERS
      KSQL_SECURITY_PROTOCOL: "SASL_SSL"
      KSQL_SASL_JAAS_CONFIG: $SASL_JAAS_CONFIG
      KSQL_SASL_MECHANISM: "PLAIN"
      KSQL_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "HTTPS"
      KSQL_KSQL_STREAMS_PRODUCER_RETRIES: 2147483647
      KSQL_KSQL_STREAMS_PRODUCER_CONFLUENT_BATCH_EXPIRE_MS: 9223372036854775807
      KSQL_KSQL_STREAMS_PRODUCER_REQUEST_TIMEOUT_MS: 300000
      KSQL_KSQL_STREAMS_PRODUCER_MAX_BLOCK_MS: 9223372036854775807
      KSQL_KSQL_STREAMS_PRODUCER_DELIVERY_TIMEOUT_MS: 2147483647
      KSQL_KSQL_STREAMS_REPLICATION_FACTOR: 3
      KSQL_KSQL_INTERNAL_TOPIC_REPLICAS: 3
      KSQL_KSQL_SINK_REPLICAS: 3
      # Producer Confluent Monitoring Interceptors for Control Center streams monitoring
      KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
      KSQL_PRODUCER_CONFLUENT_MONITORING_INTERCEPTOR_SASL_MECHANISM: PLAIN
      KSQL_PRODUCER_CONFLUENT_MONITORING_INTERCEPTOR_SECURITY_PROTOCOL: "SASL_SSL"
      KSQL_PRODUCER_CONFLUENT_MONITORING_INTERCEPTOR_SASL_JAAS_CONFIG: $SASL_JAAS_CONFIG
      # Consumer Confluent Monitoring Interceptors for Control Center streams monitoring
      KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
      KSQL_CONSUMER_CONFLUENT_MONITORING_INTERCEPTOR_SASL_MECHANISM: PLAIN
      KSQL_CONSUMER_CONFLUENT_MONITORING_INTERCEPTOR_SECURITY_PROTOCOL: "SASL_SSL"
      KSQL_CONSUMER_CONFLUENT_MONITORING_INTERCEPTOR_SASL_JAAS_CONFIG: $SASL_JAAS_CONFIG
      KSQL_KSQL_CONNECT_URL: "http://connect:8083"

  ksqldb-cli:
    image: confluentinc/cp-ksqldb-cli:5.5.0
    container_name: ksqldb-cli
    entrypoint: /bin/sh
    tty: true

  control-center:
    image: confluentinc/cp-enterprise-control-center:5.5.0
    hostname: control-center
    container_name: control-center
    ports:
      - "9021:9021"
    environment:
      CONTROL_CENTER_BOOTSTRAP_SERVERS: $BOOTSTRAP_SERVERS
      CONTROL_CENTER_KSQL_KSQLDB1_URL: "http://ksqldb-server:8089"
      CONTROL_CENTER_KSQL_KSQLDB1_ADVERTISED_URL: "http://localhost:8089"
      CONTROL_CENTER_SCHEMA_REGISTRY_URL: $SCHEMA_REGISTRY_URL
      CONTROL_CENTER_SCHEMA_REGISTRY_BASIC_AUTH_CREDENTIALS_SOURCE: $BASIC_AUTH_CREDENTIALS_SOURCE
      CONTROL_CENTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO: $SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO
      CONTROL_CENTER_CONNECT_CLUSTER: "connect:8083"
      CONTROL_CENTER_STREAMS_SECURITY_PROTOCOL: SASL_SSL
      CONTROL_CENTER_STREAMS_SASL_JAAS_CONFIG: $SASL_JAAS_CONFIG
      CONTROL_CENTER_STREAMS_SASL_MECHANISM: PLAIN
      CONTROL_CENTER_REPLICATION_FACTOR: 3
      CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_REPLICATION: 3
      CONTROL_CENTER_INTERNAL_TOPICS_REPLICATION: 3
      CONTROL_CENTER_COMMAND_TOPIC_REPLICATION: 3
      CONTROL_CENTER_METRICS_TOPIC_REPLICATION: 3
      CONFLUENT_METRICS_TOPIC_REPLICATION: 3
      CONTROL_CENTER_STREAMS_NUM_STREAM_THREADS: 3
      CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
      CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
      # Workaround for MMA-3564
      CONTROL_CENTER_METRICS_TOPIC_MAX_MESSAGE_BYTES: 8388608
      PORT: 9021

  connect:
    image: cnfldemos/cp-server-connect-datagen:0.3.2-5.5.0
    hostname: connect
    container_name: connect
    ports:
      - "8083:8083"
    volumes:
      - mi2:/usr/share/java/monitoring-interceptors/
    environment:
      CONNECT_BOOTSTRAP_SERVERS: $BOOTSTRAP_SERVERS
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: "connect"
      CONNECT_CONFIG_STORAGE_TOPIC: demo-connect-configs
      CONNECT_OFFSET_STORAGE_TOPIC: demo-connect-offsets
      CONNECT_STATUS_STORAGE_TOPIC: demo-connect-status
      CONNECT_REPLICATION_FACTOR: 3
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 3
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 3
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 3
      CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.storage.StringConverter"
      CONNECT_VALUE_CONVERTER: "io.confluent.connect.avro.AvroConverter"
      CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "true"
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: $SCHEMA_REGISTRY_URL
      CONNECT_VALUE_CONVERTER_BASIC_AUTH_CREDENTIALS_SOURCE: $BASIC_AUTH_CREDENTIALS_SOURCE
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO: $SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO
      CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_REST_ADVERTISED_HOST_NAME: "connect"
      CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
      CONNECT_LOG4J_ROOT_LOGLEVEL: INFO
      CONNECT_LOG4J_LOGGERS: org.reflections=ERROR
      # CLASSPATH required due to CC-2422
      CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-5.5.0.jar
      # Connect worker
      CONNECT_SECURITY_PROTOCOL: SASL_SSL
      CONNECT_SASL_JAAS_CONFIG: $SASL_JAAS_CONFIG
      CONNECT_SASL_MECHANISM: PLAIN
      CONNECT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "HTTPS"
      # Connect producer
      CONNECT_PRODUCER_SECURITY_PROTOCOL: SASL_SSL
      CONNECT_PRODUCER_SASL_JAAS_CONFIG: $SASL_JAAS_CONFIG
      CONNECT_PRODUCER_SASL_MECHANISM: PLAIN
      CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
      CONNECT_PRODUCER_CONFLUENT_MONITORING_INTERCEPTOR_SECURITY_PROTOCOL: SASL_SSL
      CONNECT_PRODUCER_CONFLUENT_MONITORING_INTERCEPTOR_SASL_JAAS_CONFIG: $SASL_JAAS_CONFIG
      CONNECT_PRODUCER_CONFLUENT_MONITORING_INTERCEPTOR_SASL_MECHANISM: PLAIN
      # Connect consumer
      CONNECT_CONSUMER_SECURITY_PROTOCOL: SASL_SSL
      CONNECT_CONSUMER_SASL_JAAS_CONFIG: $SASL_JAAS_CONFIG
      CONNECT_CONSUMER_SASL_MECHANISM: PLAIN
      CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
      CONNECT_CONSUMER_CONFLUENT_MONITORING_INTERCEPTOR_SECURITY_PROTOCOL: SASL_SSL
      CONNECT_CONSUMER_CONFLUENT_MONITORING_INTERCEPTOR_SASL_JAAS_CONFIG: $SASL_JAAS_CONFIG
      CONNECT_CONSUMER_CONFLUENT_MONITORING_INTERCEPTOR_SASL_MECHANISM: PLAIN

  rest-proxy:
    image: confluentinc/cp-kafka-rest:5.5.0
    ports:
      - 8082:8082
    hostname: rest-proxy
    container_name: rest-proxy
    environment:
      KAFKA_REST_HOST_NAME: rest-proxy
      KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
      KAFKA_REST_SCHEMA_REGISTRY_URL: $SCHEMA_REGISTRY_URL
      KAFKA_REST_CLIENT_BASIC_AUTH_CREDENTIALS_SOURCE: $BASIC_AUTH_CREDENTIALS_SOURCE
      KAFKA_REST_CLIENT_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO: $SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO
      KAFKA_REST_BOOTSTRAP_SERVERS: $BOOTSTRAP_SERVERS
      KAFKA_REST_SSL_ENDPOINT_IDENTIFIED_ALGORITHM: "https"
      KAFKA_REST_SECURITY_PROTOCOL: "SASL_SSL"
      KAFKA_REST_SASL_JAAS_CONFIG: $SASL_JAAS_CONFIG
      KAFKA_REST_SASL_MECHANISM: "PLAIN"
      KAFKA_REST_CLIENT_BOOTSTRAP_SERVERS: $BOOTSTRAP_SERVERS
      KAFKA_REST_CLIENT_SSL_ENDPOINT_IDENTIFIED_ALGORITHM: "https"
      KAFKA_REST_CLIENT_SECURITY_PROTOCOL: "SASL_SSL"
      KAFKA_REST_CLIENT_SASL_JAAS_CONFIG: $SASL_JAAS_CONFIG
      KAFKA_REST_CLIENT_SASL_MECHANISM: "PLAIN"

volumes:
    mi2: {}

Start container service:
docker-compose -f docker-compose.yml up -d
Close Container Service:
docker-compose -f docker-compose.yml down

Tags: Big Data kafka Zookeeper Docker REST

Posted on Wed, 27 May 2020 09:17:52 -0700 by ranam