Use of flume value kafkachannel

1. Install flume

2. Install kafka

3. Test whether kafka topic can deliver messages normally

4. After they are all ready, they start to connect kafkachannel. There is a question. I used to make kafka pure channel before. There are sources and sink, but the experiment is not successful. It is doubted that kafka can't just do channle. Either source plus channel, or channel plus sink

1) kafka's no sink

#Define agent name, source, channel name
a0.sources = r1
a0.channels = c1

#Specific definition source
a0.sources.r1.type = exec
a0.sources.r1.command = tail -F /data/logs.txt


a0.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a0.channels.c1.brokerList = 172.16.37.223:9092
a0.channels.c1.zookeeperConnect=172.16.37.107:2181,172.16.37.108:2181,172.16.37.223:2181
a0.channels.c1.topic = FLUME_TEST_TOPIC
#false means that it is written in the form of plain text, and true is written in the form of event. When it is written in the form of event, there will be garbled code, which is true by default
a0.channels.c1.parseAsFlumeEvent = false
a0.sources.r1.channels = c1

After starting flume. Open a kafka consumer

./bin/kafka-console-producer.sh --broker-list 172.16.37.223:9092 --topic FLUME_TEST_TOPIC

 

Insert information into the file specified by flume. kafka consumers can consume the information you put in

2) no source of kafka

agent.channels = kafka-channel
agent.sources = no-source
agent.sinks = k1

agent.channels.kafka-channel.type = org.apache.flume.channel.kafka.KafkaChannel

agent.channels.kafka-channel.brokerList = 172.16.37.223:9092
agent.channels.kafka-channel.zookeeperConnect = 172.16.37.107:2181,172.16.37.108:2181,172.16.37.223:2181
agent.channels.kafka-channel.topic = FLUME_TEST_TOPIC
#agent.channels.kafka-channel.consumer.group.id = groupM
agent.channels.kafka-channel.kafka.consumer.timeout.ms = 100
agent.channels.kafka-channel.parseAsFlumeEvent = false

#File sink mode
#agent.sinks.k1.type = file_roll
#agent.sinks.k1.sink.directory = /data/kafkachannel


#How programs are processed asynchronously. The way to use files for ordinary tests
agent.sinks.k1.type = asynchbase
agent.sinks.k1.table = monstor_mm7mt
agent.sinks.k1.columnFamily = cf1
agent.sinks.k1.batchSize = 5
agent.sinks.k1.serializer = com.caissa.chador_flume.AsyncHbaseAllLogEventSerializer
agent.sinks.k1.serializer.columns = xunqi_number,protocol_type,message_type,submit_number,smsreq_rid,message_number,company_code,user_name,channel_value,billingusers_number,billing_type,aimphone_number,phone_number,aim_phone,appcode,is_status,messagevalid_time,message_sendtime,mobilevalide_number,valid_type,expenses,link_id,tp_pid,tp_udhi,message_format,message_code,mobiledeal_number,moblie_result,titile_length,mmcresouce_id,mmc_titile


agent.sinks.k1.channel = kafka-channel

Start a producer of kafka after starting flume

./kafka-console-consumer.sh --zookeeper 172.16.37.107:2181 --from-beginning --topic FLUME_TEST_TOPIC

The producer then enters the data. flume will consume

5. Problems encountered.

1) an error is reported after the start of flume is configured:

flume java.lang.NoClassDefFoundError: org/apache/zookeeper/Watcher

The solution is to copy the jar package of zookeeper under the lib of kafka to the lib of flume.

2) another error of startup message:

 org.apache.kafka.common.errors.InterruptException: java.lang.InterruptedException

It does not solve the problem but does not affect the normal data transmission.

 

 

Tags: kafka Apache Zookeeper Java

Posted on Sun, 10 Nov 2019 12:06:19 -0800 by smook