最近一直忙着各种设计和文档,终于有时间来更新一点儿关于kafka的东西。之前有一篇文章讲述的是kafka Producer端的程序,也就是日志的生产者,这部分比较容易理解,业务系统将运行日志或者业务日志发送到broker中,由broker代为存储。那讲的是如何收集日志,今天要写的是如何获取日志,然后再做相关的处理。
1 2 | # The directory under which to store log files log. dir = /tmp/kafka-logs |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 | package com.a2.kafka.consumer; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.message.Message; import kafka.message.MessageAndMetadata; public class CommonConsumer { public static void main(String[] args) { // specify some consumer properties Properties props = new Properties(); props.put( "zk.connect" , "" ); props.put( "zk.connectiontimeout.ms" , "1000000" ); props.put( "groupid" , "test_group" ); // Create the connection to the cluster ConsumerConfig consumerConfig = new ConsumerConfig(props); ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig); Map<String, Integer> map= new HashMap<String,Integer>(); map.put( "test" , 2 ); // create 4 partitions of the stream for topic “test”, to allow 4 threads to consume Map<String, List<KafkaStream<Message>>> topicMessageStreams = consumerConnector.createMessageStreams(map); List<KafkaStream<Message>> streams = topicMessageStreams.get( "test" ); // create list of 4 threads to consume from each of the partitions ExecutorService executor = Executors.newFixedThreadPool( 4 ); // consume the messages in the threads for ( final KafkaStream<Message> stream: streams) { executor.submit( new Runnable() { public void run() { for (MessageAndMetadata<Message> msgAndMetadata: stream) { // process message (msgAndMetadata.message()) System.out.println(msgAndMetadata.message()); } } }); } } } |
这是一个user level的API,还有一个low level的API可以从官网找到,这里就不贴出来了。这个consumer是底层采用的是一个阻塞队列,只要一有producer生产数据,那consumer就会将数据打印出来,这是不是十分符合实时性的要求。
简单说下产生这个问题的原因,这个问题类似于transaction commit,在消息系统中都会有这样一个问题存在,数据消费状态这个信息到底存哪里。是存在consumer端,还是存在broker端。对于这样的争论,一般会出现三种情况:
- At most once—this handles the first case described. Messages are immediately marked as consumed, so they can't be given out twice, but many failure scenarios may lead to losing messages.
- At least once—this is the second case where we guarantee each message will be delivered at least once, but in failure cases may be delivered twice.
- Exactly once—this is what people actually want, each message is delivered once and only once.
Kafka解决这个问题采用high water mark这样的标记,也就是设置offset:
1 | Kafka does two unusual things with respect to metadata. First the stream is partitioned on the brokers into a set of distinct partitions. The semantic meaning of these partitions is left up to the producer and the producer specifies which partition a message belongs to. Within a partition messages are stored in the order in which they arrive at the broker, and will be given out to consumers in that same order. This means that rather than store metadata for each message (marking it as consumed, say), we just need to store the "high water mark" for each combination of consumer, topic, and partition. Hence the total metadata required to summarize the state of the consumer is actually quite small. In Kafka we refer to this high-water mark as "the offset" for reasons that will become clear in the implementation section. |
1 2 3 | [FetchRunnable- 0 ] INFO : kafka.consumer.FetcherRunnable#info : FetchRunnable- 0 start fetching topic: test part: 0 offset: 0 from 192.168 . 181.128 : 9092 [FetchRunnable- 0 ] INFO : kafka.consumer.FetcherRunnable#info : FetchRunnable- 0 start fetching topic: test part: 0 offset: 15 from 192.168 . 181.128 : 9092 |
通过一些配置,就可以将线上产生的数据同步到镜像中去,然后再由特定的集群区处理大批量的数据,这种方式可以采用low level的API按照不同的partition和offset来抓取数据,以获得更好的并行处理效果。