maxwell collects binlog s and sends data to kafka clusters via nginx in different network environments

maxwell can be used as a data synchronization tool by collecting binlog changes from mysql in real time.

However, sometimes when the application is deployed in a different environment, changes in the mysql database cannot be directly sent to the data center via maxwell for parsing and data synchronization. This time, using ngix as a proxy server, json data sent by maxwell is collected and sent to the kafka cluster on the back end.

The architecture is as follows:

1, multiple application platforms are distributed in different regions, remote mysql database can access the Internet.
2. In the local data center, use the nginx service to proxy multiple kafka clusters.
3. Map the nginx server IP through the public network IP+port, and access nginx through the public network ip.

With the above architecture design, however, maxwell does not support sending to http services, only kafka, redis, and so on.

After consulting the maxwell website, we found that there is a custom producer way, this time using a custom way to solve maxwell send json to nginx via post.

(The colors in the code in this article are system-built and need not be overly noticed.)

1. Code Development

1, use idea s, build maven projects, add pom dependencies, mainly design http-related

<dependency>
    <groupId>commons-httpclient</groupId>
    <artifactId>commons-httpclient</artifactId>
    <version>3.1</version>
</dependency>
<dependency>
    <groupId>org.apache.httpcomponents</groupId>
    <artifactId>httpclient</artifactId>
    <version>4.5.2</version>
</dependency>

<dependency>
    <groupId>org.apache.httpcomponents</groupId>
    <artifactId>httpasyncclient</artifactId>
    <version>4.1.2</version>
</dependency>

2, manually add the maxwell-1.22.3.jar file to the project.

3, create the HttpUtil class to call send post request

package com.test.utils;

import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;

public class HttpUtil {

    public void doPost(String url, String json){

        CloseableHttpClient httpclient = HttpClientBuilder.create().build();
        HttpPost post = new HttpPost(url);
        try {
            StringEntity s = new StringEntity(json.toString());
            s.setContentEncoding("UTF-8");
            s.setContentType("application/json");//Sending json data requires setting contentType
            post.setEntity(s);
            HttpResponse res = httpclient.execute(post);      
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

4, create CustomProducer custom class, inherit AbstractProducer

package com.test.producerfactory;

import com.test.utils.HttpUtil;
import com.zendesk.maxwell.MaxwellContext;
import com.zendesk.maxwell.producer.AbstractProducer;
import com.zendesk.maxwell.producer.EncryptionMode;
import com.zendesk.maxwell.producer.MaxwellOutputConfig;
import com.zendesk.maxwell.row.RowMap;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;

public class CustomProducer extends AbstractProducer {
    private final String headerFormat;
    private final Collection<RowMap> txRows = new ArrayList<>();
    private final HttpUtil httpUtil=new HttpUtil();
    private static MaxwellOutputConfig config=new MaxwellOutputConfig();
    private String url="";
    private String server_id="0";
    private String encrypt=null;
    private String secretKey=null;    

    public CustomProducer(MaxwellContext context) {
        super(context);
        // this property would be 'custom_producer.header_format' in config.properties
        headerFormat = context.getConfig().customProducerProperties.getProperty("header_format", "Transaction: %xid% >>>\n");

        //Get configuration information from maxwell's configuration file
        server_id=context.getConfig().customProducerProperties.getProperty("server_id");
        url=context.getConfig().customProducerProperties.getProperty("url");
        encrypt=context.getConfig().customProducerProperties.getProperty("encrypt");
        secretKey=context.getConfig().customProducerProperties.getProperty("secretKey");

        // Configuration output json field contains serverID
        config.includesServerId=true;

        //Configure whether to encrypt data
        if (encrypt.equals("data")){
            config.encryptionMode= EncryptionMode.ENCRYPT_DATA;
            config.secretKey=secretKey;
        }else if (encrypt.equals("all")){
            config.encryptionMode= EncryptionMode.ENCRYPT_ALL;
            config.secretKey=secretKey;
        }

    }

    @Override
    public void push(RowMap r) throws Exception
    {
        // filtering out DDL and heartbeat rows
        if(!r.shouldOutput(outputConfig)) {
            // though not strictly necessary (as skipping has no side effects), we store our position,
            // so maxwell won't have to "re-skip" this position if crashing and restarting.
            context.setPosition(r.getPosition());
            return;
        }

        //Set serverID
        r.setServerId(Long.parseLong(server_id));

        // store uncommitted row in buffer
        txRows.add(r);

        if(r.isTXCommit()) {
            // This row is the final and closing row of a transaction. Stream all rows of buffered
            // transaction to stdout
//            System.out.print(headerFormat.replace("%xid%", r.getXid().toString()));

            txRows.stream().map(CustomProducer::toJSON).forEach(string -> httpUtil.doPost(url,string));
            txRows.clear();
//            rows ++;

            // Only now, after finally having "persisted" all buffered rows to stdout is it safe to
            // store the producers position.
            context.setPosition(r.getPosition());
//            
        }
    }

    private static String toJSON(RowMap row) {
        try {
            return row.toJSON(config);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }   

}

5, Create CustomProducerFactory class

package com.test.producerfactory;

import com.zendesk.maxwell.MaxwellContext;
import com.zendesk.maxwell.producer.AbstractProducer;
import com.zendesk.maxwell.producer.ProducerFactory;

public class CustomProducerFactory implements ProducerFactory{

    @Override
    public AbstractProducer createProducer(MaxwellContext context) {
        return new CustomProducer(context);
    }
}

6. Package the data_sync.jar file with the idea tool and transfer it to the lib directory of the remote maxwell.

2. Configuration Work

Configuration work mainly consists of nginx and maxwell configurations, which are described below.

1, nginx configuration

After downloading Nginx for source code compilation, add kafka-supported plug-ins
[root@host1 nginx]#
./configure --add-module=/usr/local/src/ngx_kafka_module --add-module=/usr/logcal/nginx_tcp_proxy_module

The installation of nginx is not described. After installing nginx, edit the nginx.conf file in the / usr/local/nginx/conf directory

#user  nobody;
worker_processes  1;
error_log  logs/error.log;
error_log  logs/error.log  notice;
error_log  logs/error.log  info;
pid        logs/nginx.pid;
events {
    worker_connections  1024;
}
http {
    include       mime.types;
    default_type  application/octet-stream;
    sendfile        on;    
    keepalive_timeout  65;
    kafka;
    kafka_broker_list host2:9092 host3:9092 host4:9092;
    server {
        listen       19090;
        server_name  localhost;      

        location / {
            root   html;
        kafka_topic test1;  
            index  index.html index.htm;
        }       
        error_page   500 502 503 504  /50x.html;
        location = /50x.html {
            root   html;
        }        
    }       
}

Where kafka_top is sent to the specified topic after receiving data.
kafka_broker_list: This is the broker node and port of kafka, where the host name is used because host resolution is configured.

After the nginx configuration is complete, reload configuration can use servers with different network segments from kafka and nginx to test whether nginx is compatible using the following commands:
[root@master ~]# curl http://58.30.1.xxx:19007/ -d "aaaaaa"

In an intranet kafka cluster, use the following command to see if kafka can accept data:
[root@host3 ~]#kafka-console-consumer --bootstrap-server kafkahost:9092 --topic test1

When data is received in the kafka cluster, it means that the data sent by http is forwarded to the kafka cluster via nginx.

2, Maxwell configuration, you can download Maxwell software from the official website and unzip it under / opt/maxwell
(The specific way maxwell is installed and started is described in detail in the previous article)

Using a custom production consumer, report the dependent data_sync.jar upload to the / opt/maxwell/lib directory on the unzipped maxwell.

Create a config.properties file in the / opt/maxwell directory to write to the specified configuration:
vim config.properties

#[mysql]
user=maxwell   
password=123456  
host=hadoop1  
port=3306   
#[producer]
output_server_id=true   
custom_producer.factory=com.test.producerfactory.CustomProducerFactory  
custom_producer.server_id=23  
custom_producer.url=http://58.30.1.XX:19007/  
custom_producer.encrypt=data   
custom_producer.secretKey=0f1b122303xx44123  

Configuration Item Description:
user:#Connect mysql username
password:#password to connect mysql
Host:# mysql's host name (IP address)
Port:#mysql port

output_server_id:#output server_id, which identifies the data for which regional platform
custom_producer.factory:#Custom production consumption class
custom_producer.server_id:#server_id defined, consistent with server_id in my.cnf
custom_producer.url:#url of open data center

custom_producer.encrypt:#Encryption method, data, all, none
custom_producer.secretKey:#key value, which corresponds to server_id one-to-one through the key value assigned to the data center point

If data encryption is configured, after receiving the data, further decryption is required before the binlog data can be obtained, and the decryption method is subsequently written.

Once the above configuration is complete, you can start maxwell, turn on synchronization data to the local data center, and use flink, spark streaming receive for further processing when the data is synchronized to the local kafka cluster.

Tags: MySQL Nginx kafka JSON

Posted on Thu, 09 Apr 2020 15:14:25 -0700 by palace