Kafka 视频教程同步首发,欢迎观看!
Kafka Producer APIs
新版的 Producer API 提供了以下功能:
可以将多个消息缓存到本地队列里,然后异步的批量发送到 broker,可以通过参数 producer.type=async 做到。缓存的大小可以通过一些参数指定:queue.time 和 batch.size。一个后台线程((kafka.producer.async.ProducerSendThread)从队列中取出数据并让kafka.producer.EventHandler 将消息发送到 broker,也可以通过参数 event.handler 定制 handler,在 producer 端处理数据的不同的阶段注册处理器,比如可以对这一过程进行日志追踪,或进行一些监控。只需实现kafka.producer.async.CallbackHandler 接口,并在 callback.handler 中配置。
自己编写 Encoder 来序列化消息,只需实现下面这个接口。默认的 Encoder 是kafka.serializer.DefaultEncoder。
interface Encoder {
public Message toMessage(T data);
}
提供了基于 Zookeeper 的 broker 自动感知能力,可以通过参数 zk.connect 实现。如果不使用 Zookeeper,也可以使用 broker.list 参数指定一个静态的 brokers 列表,这样消息将被随机的发送到一个 broker 上,一旦选中的 broker 失败了,消息发送也就失败了。
通过分区函数kafka.producer.Partitioner 类对消息分区。
interface Partitioner {
int partition(T key, int numPartitions);
}
分区函数有两个参数:key 和可用的分区数量,从分区列表中选择一个分区并返回 id。默认的分区策略是 hash(key)%numPartitions.如果 key 是 null,就随机的选择一个。可以通过参数 partitioner.class 定制分区函数。
新的 api 完整实例如下:
package com.cuicui.kafkademon;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
/**
* @author 崔磊
* @date 2015 年 11 月 4 日 上午 11:44:15
*/
public class MyProducer {
public static void main(String[] args) throws InterruptedException {
Properties props = new Properties();
props.put(“serializer.class”, “kafka.serializer.StringEncoder”);
props.put(“metadata.broker.list”, KafkaProperties.BROKER_CONNECT);
props.put(“partitioner.class”, “com.cuicui.kafkademon.MyPartitioner”);
props.put(“request.required.acks”, “1”);
ProducerConfig config = new ProducerConfig(props);
Producer producer = new Producer(config);
// 单个发送
for (int i = 0; i <= 1000000; i++) { KeyedMessage message =
new KeyedMessage(KafkaProperties.TOPIC, i + “”, “Message” + i);
producer.send(message);
Thread.sleep(5000);
}
// 批量发送
List> messages = new ArrayList>(100);
for (int i = 0; i <= 10000; i++) { KeyedMessage message =
new KeyedMessage(KafkaProperties.TOPIC, i + “”, “Message” + i);
messages.add(message);
if (i % 100 == 0) {
producer.send(messages);
messages.clear();
}
}
producer.send(messages);
}
}
下面这个是用到的分区函数:
import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;
public class MyPartitioner implements Partitioner {
public MyPartitioner(VerifiableProperties props) {
}
/*
* @see kafka.producer.Partitioner#partition(java.lang.Object, int)
*/
@Override
public int partition(Object key, int partitionCount) {
return Integer.valueOf((String) key) % partitionCount;
}
}
KafKa Consumer APIs
Consumer API 有两个级别。低级别的和一个指定的 broker 保持连接,并在接收完消息后关闭连接,这个级别是无状态的,每次读取消息都带着 offset。
高级别的 API 隐藏了和 brokers 连接的细节,在不必关心服务端架构的情况下和服务端通信。还可以自己维护消费状态,并可以通过一些条件指定订阅特定的 topic,比如白名单黑名单或者正则表达式。
低级别的 API
package com.cuicui.kafkademon;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.cluster.Broker;
import kafka.common.TopicAndPartition;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.OffsetRequest;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.TopicMetadataResponse;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.Message;
import kafka.message.MessageAndOffset;
/**
* offset 自己维护 目标 topic、partition 均由自己分配
@author 崔磊
* @date 2015 年 11 月 4 日 上午 11:44:15
*
*/
public class MySimpleConsumer {
public static void main(String[] args) {
new MySimpleConsumer().consume();
}
/**
* 消费消息
*/
public void consume() {
int partition = 0;
// 找到 leader
Broker leaderBroker = findLeader(KafkaProperties.BROKER_CONNECT, KafkaProperties.TOPIC, partition);
// 从 leader 消费
SimpleConsumer simpleConsumer =
new SimpleConsumer(leaderBroker.host(), leaderBroker.port(), 20000, 10000, “mySimpleConsumer”);
long startOffet = 1;
int fetchSize = 1000;
while (true) {
long offset = startOffet;
// 添加 fetch 指定目标 tipic,分区,起始 offset 及 fetchSize(字节),可以添加多个 fetch
FetchRequest req =
new FetchRequestBuilder().addFetch(KafkaProperties.TOPIC, 0, startOffet, fetchSize).build();
// 拉取消息
FetchResponse fetchResponse = simpleConsumer.fetch(req);
ByteBufferMessageSet messageSet = fetchResponse.messageSet(KafkaProperties.TOPIC, partition);
for (MessageAndOffset messageAndOffset : messageSet) {
Message mess = messageAndOffset.message();
ByteBuffer payload = mess.payload();
byte[] bytes = new byte[payload.limit()];
payload.get(bytes);
String msg = new String(bytes);
offset = messageAndOffset.offset();
System.out.println(“partition : ” + 3 + “, offset : ” + offset + ” mess : ” + msg);
}
// 继续消费下一批
startOffet = offset + 1;
}
}
/**
* 找到制定分区的 leader broker
@param brokerHosts broker 地址,格式为:“host1:port1,host2:port2,host3:port3”
* @param topic topic
* @param partition 分区
* @return
*/
public Broker findLeader(String brokerHosts, String topic, int partition) {
Broker leader = findPartitionMetadata(brokerHosts, topic, partition).leader();
System.out.println(String.format(“Leader tor topic %s, partition %d is %s:%d”, topic, partition, leader.host(),
leader.port()));
return leader;
}
/**
* 找到指定分区的元数据
@param brokerHosts broker 地址,格式为:“host1:port1,host2:port2,host3:port3”
* @param topic topic
* @param partition 分区
* @return 元数据
*/
private PartitionMetadata findPartitionMetadata(String brokerHosts, String topic, int partition) {
PartitionMetadata returnMetaData = null;
for (String brokerHost : brokerHosts.split(“,”)) {
SimpleConsumer consumer = null;
String[] splits = brokerHost.split(“:”);
consumer = new SimpleConsumer(splits[0], Integer.valueOf(splits[1]), 100000, 64 * 1024, “leaderLookup”);
List topics = Collections.singletonList(topic);
TopicMetadataRequest request = new TopicMetadataRequest(topics);
TopicMetadataResponse response = consumer.send(request);
List topicMetadatas = response.topicsMetadata();
for (TopicMetadata topicMetadata : topicMetadatas) {
for (PartitionMetadata PartitionMetadata : topicMetadata.partitionsMetadata()) {
if (PartitionMetadata.partitionId() == partition) {
returnMetaData = PartitionMetadata;
}
}
}
if (consumer != null)
consumer.close();
}
return returnMetaData;
}
/**
* 根据时间戳找到某个客户端消费的 offset
@param consumer SimpleConsumer
* @param topic topic
* @param partition 分区
* @param clientID 客户端的 ID
* @param whichTime 时间戳
* @return offset
*/
public long getLastOffset(SimpleConsumer consumer, String topic, int partition, String clientID, long whichTime) {
TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
Map requestInfo =
new HashMap();
requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
OffsetRequest request = new OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientID);
OffsetResponse response = consumer.getOffsetsBefore(request);
long[] offsets = response.offsets(topic, partition);
return offsets[0];
}
}
低级别的 API 是高级别 API 实现的基础,也是为了一些对维持消费状态有特殊需求的场景,比如 Hadoop consumer 这样的离线 consumer。
高级别的 API
package com.cuicui.kafkademon;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
/**
- offset 在 zookeeper 中记录,以 group.id 为 key 分区和 customer 的对应关系由 Kafka 维护
- @author 崔磊
- @date 2015 年 11 月 4 日 上午 11:44:15
*/
public class MyHighLevelConsumer {/**- 该 consumer 所属的组 ID
*/
private String groupid;
- 该 consumer 的 ID
*/
private String consumerid;
- 每个 topic 开几个线程?
*/
private int threadPerTopic;
super();
this.groupid = groupid;
this.consumerid = consumerid;
this.threadPerTopic = threadPerTopic;
}public void consume() {
Properties props = new Properties();
props.put(“group.id”, groupid);
props.put(“consumer.id”, consumerid);
props.put(“zookeeper.connect”, KafkaProperties.ZK_CONNECT);
props.put(“zookeeper.session.timeout.ms”, “60000”);
props.put(“zookeeper.sync.time.ms”, “2000”);
// props.put(“auto.commit.interval.ms”, “1000”);ConsumerConfig config = new ConsumerConfig(props);ConsumerConnector connector = Consumer.createJavaConsumerConnector(config);Map<String, Integer> topicCountMap = new HashMap<String, Integer>();// 设置每个 topic 开几个线程topicCountMap.put(KafkaProperties.TOPIC, threadPerTopic);// 获取 streamMap<String, List<KafkaStream<byte[], byte[]>>> streams = connector.createMessageStreams(topicCountMap);// 为每个 stream 启动一个线程消费消息for (KafkaStream<byte[], byte[]> stream : streams.get(KafkaProperties.TOPIC)) { new MyStreamThread(stream).start();}
}/**- 每个 consumer 的内部线程
- @author cuilei05
*
*/
private class MyStreamThread extends Thread {
private KafkaStream stream;
super();
this.stream = stream;
}@Override
public void run() {
ConsumerIterator streamIterator = stream.iterator();// 逐条处理消息while (streamIterator.hasNext()) { MessageAndMetadata<byte[], byte[]> message = streamIterator.next(); String topic = message.topic(); int partition = message.partition(); long offset = message.offset(); String key = new String(message.key()); String msg = new String(message.message()); // 在这里处理消息,这里仅简单的输出 // 如果消息消费失败,可以将已上信息打印到日志中,活着发送到报警短信和邮件中,以便后续处理 System.out.println("consumerid:" + consumerid + ", thread : " + Thread.currentThread().getName() + ", topic : " + topic + ", partition : " + partition + ", offset : " + offset + " , key : " + key + " , mess : " + msg);}
}
}- @author cuilei05
String groupid = “myconsumergroup”;
MyHighLevelConsumer consumer1 = new MyHighLevelConsumer(groupid, “myconsumer1”, 3);
MyHighLevelConsumer consumer2 = new MyHighLevelConsumer(groupid, “myconsumer2”, 3);consumer1.consume();consumer2.consume();
}
}
这个 API 围绕着由 KafkaStream 实现的迭代器展开,每个流代表一系列从一个或多个分区多和 broker 上汇聚来的消息,每个流由一个线程处理,所以客户端可以在创建的时候通过参数指定想要几个流。一个流是多个分区多个 broker 的合并,但是每个分区的消息只会流向一个流。 - 该 consumer 所属的组 ID
每调用一次 createMessageStreams 都会将 consumer 注册到 topic 上,这样 consumer 和 brokers 之间的负载均衡就会进行调整。API 鼓励每次调用创建更多的 topic 流以减少这种调整。createMessageStreamsByFilter 方法注册监听可以感知新的符合 filter 的 tipic。
作者:磊磊崔
来源:CSDN
原文:https://blog.csdn.net/honglei915/article/details/37697655
版权声明:本文为博主原创文章,转载请附上博文链接!