Using skills of Flume

1. Flume overview

Flume is a distributed system for massive log collection, aggregation and transmission. Flume's main function is to read the data of the server's local disk in real time and write the data to HDFS.

Agent: send data from Source to destination in the form of events. Including Source, Channel and Sink.

Source is the component responsible for receiving data to Flume Agent. The source component can handle various types and formats of log data, including avro, thrift, exec, jms, spooling directory, netcat, Taildir, sequence generator, syslog, http, legacy.

Sink continuously polls events in the Channel and removes them in batches, and writes them to the storage or index system in batches, or sends them to another Flume Agent.

Sink component destinations include hdfs, logger, avro, thrift, ipc and file_ Roll (local directory), HBase, solr, custom.

Flume's data transmission unit is Event, which is composed of Header and Body. The Header is K-V structure, and the Body is used to store data in the form of byte array.

2. Flume simple case

Multiple additional files in the real-time monitoring directory are uploaded to HDFS:

Exec source is suitable for monitoring a real-time appended file and can't realize breakpoint retransmission; Spooldir Source is suitable for synchronizing new files, but it's not suitable for monitoring and synchronizing files with real-time appended logs; and Taildir Source is suitable for monitoring multiple real-time appended files and can realize breakpoint retransmission.

a3.sources = r3
a3.sinks = k3
a3.channels = c3

# Describe/configure the source
a3.sources.r3.type = TAILDIR
#Progress storage location of collected documents
a3.sources.r3.positionFile = /opt/module/flume/tail_dir.json
#File location to collect
a3.sources.r3.filegroups = f1 f2
a3.sources.r3.filegroups.f1 = /opt/module/flume/files/.*file.*
a3.sources.r3.filegroups.f2 = /opt/module/flume/files/.*log.*

# Describe the sink upload to HDFS
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://hadoop102:9820/flume/upload/%Y%m%d/%H
#Prefix of uploaded file
a3.sinks.k3.hdfs.filePrefix = upload-
#Scroll folders by time
a3.sinks.k3.hdfs.round = true
#How many time units to create a new folder
a3.sinks.k3.hdfs.roundValue = 1
#Redefining time units
a3.sinks.k3.hdfs.roundUnit = hour
#Use local time stamp or not
a3.sinks.k3.hdfs.useLocalTimeStamp = true
#How many events are accumulated before flush to HDFS once
a3.sinks.k3.hdfs.batchSize = 100
#Set file type to support compression
a3.sinks.k3.hdfs.fileType = DataStream
#How often to generate a new file
a3.sinks.k3.hdfs.rollInterval = 60
#Set the scrolling size of each file to about 128M
a3.sinks.k3.hdfs.rollSize = 134217700
#Scrolling of files is independent of the number of events
a3.sinks.k3.hdfs.rollCount = 0

# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3

Taildir Source maintains a json format position File, which periodically updates the latest position read by each file to the position File, so it can implement breakpoint renewal.

3. Use of Flume

1. Understanding of affairs

2. Internal principle of Agent in Flume

DefaultSinkProcessor corresponds to a single Sink, loadbalancesinkprocessor and FailoverSinkProcessor correspond to Sink Group, loadbalancesinkprocessor can realize the function of load balancing and FailoverSinkProcessor can recover wrongly.

3. Common topology of Flume

① Simple concatenation

② Replication and multiplexing

③ Load balancing and failover

④ Aggregation

4. Example

① Replication and multiplexing

It is mainly to configure the type of Channel Selector.

Monitor logs and send them to different places.

# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# Copy data flow to all channel s
//Main operations
a1.sources.r1.selector.type = replicating

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/hive/logs/hive.log
a1.sources.r1.shell = /bin/bash -c

# Describe the sink
# avro at sink is a data sender
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102 
a1.sinks.k1.port = 4141

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4142

# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

② Load balancing and failover

It is mainly to configure SinkProcessor as failover

If one Sink fails, another Sink replacement can be enabled:

# Name the components on this agent
a1.sources = r1
a1.channels = c1
a1.sinkgroups = g1
a1.sinks = k1 k2

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
#Configure failover
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000

# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4141

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4142

# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1

③ Aggregation

The results of multiple monitoring can be aggregated into one Flume.

Core configuration:

# Sink description the sink on Hadoop 102
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop104
a1.sinks.k1.port = 4141

# Sink description the sink on Hadoop 103
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = hadoop104
a2.sinks.k1.port = 4141

#Source source on Hadoop 104
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop104
a3.sources.r1.port = 4141

When executing, start the downstream Flume (Hadoop 104):

5. Custom Interceptor

Multiplexing filters which Channel to enter according to the Header in the event, so you can customize the Interceptor to attach different values to the Header.

For example: depending on whether the first character is a letter or a number, enter a different Sink.

public class CustomInterceptor implements Interceptor {

    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {

        byte[] body = event.getBody();
        if (body[0] < 'z' && body[0] > 'a') {
            event.getHeaders().put("type", "letter");
        } else if (body[0] > '0' && body[0] < '9') {
            event.getHeaders().put("type", "number");
        }
        return event;

    }

    @Override
    public List<Event> intercept(List<Event> events) {
        for (Event event : events) {
            intercept(event);
        }
        return events;
    }

    @Override
    public void close() {

    }

    public static class Builder implements Interceptor.Builder {

        @Override
        public Interceptor build() {
            return new CustomInterceptor();
        }

        @Override
        public void configure(Context context) {
        }
    }
}

Then add the above Interceptor in the configuration file:

a1.channels = c1 c2
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.SQLBody.flume.interceptor.CustomInterceptor$Builder
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = type
a1.sources.r1.selector.mapping.letter = c1
a1.sources.r1.selector.mapping.number = c2

6. Will Flume collect data be lost?

According to the architecture principle of Flume, Flume is impossible to lose data. There is a complete transaction mechanism in Flume. The Source to Channel is transactional, and the Channel to Sink is transactional. Therefore, there will be no data loss in these two links. The only possible case of data loss is that the Channel uses memoryChannel, and the agent downtime leads to data loss, or the Channel storage data is full , which causes the Source to write no more and the unwritten data to be lost.

Flume will not lose data, but it may cause data duplication. For example, if the data has been successfully sent by Sink but no response has been received, Sink will send the data again, which may cause data duplication.

According to the architecture principle of Flume, Flume is impossible to lose data. There is a complete transaction mechanism in Flume. The Source to Channel is transactional, and the Channel to Sink is transactional. Therefore, there will be no data loss in these two links. The only possible case of data loss is that the Channel uses memoryChannel, and the agent downtime leads to data loss, or the Channel storage data is full , which causes the Source to write no more and the unwritten data to be lost.

Flume will not lose data, but it may cause data duplication. For example, if the data has been successfully sent by Sink but no response has been received, Sink will send the data again, which may cause data duplication.

Tags: Hadoop JSON hive HBase

Posted on Thu, 04 Jun 2020 10:59:09 -0700 by jamz310