数据的每一个字节都有故事,他们预示着未来。
在数据驱动的企业中,数据迁移几乎变得和数据一样重要。
数据的速度和敏捷性决定了数据的价值。
Kafka 的前世今生
众所周知,Kafka 是一种高吞吐量的分布式发布订阅消息系统,是不同系统间消息传输的纽带,是一著名的消息处理中间件。
Kafka 最初产生于 LinkedIn,用于支撑 LinkedIn 的 activity stream data 和 operational metrics 分析,被誉为 LinkedIn 的“中枢神经系统”。2011 年完成 Apache 开源 ,2012 年 10 月完成孵化,2014 年 ApacheKafka 中三位核心人员 Jay Kreps,NehaNarkhede 和 Jun Rao 联合成立 Confluent 公司,致力于为企业提供实时数处理服务解决方案。
(图片来自 Kafka 官网)
Kafka Connect 的应运而生
许多公司都采用了 Apache Kafka 为他们的数据管道,包括 LinkedIn、Netflix 和 Airbnb。Kafka 处理高吞吐量实时数据的能力使得它非常适合于解决数据集成问题,成为所有数据的公共缓冲区,并弥补 streaming 和批处理系统之间的差距。
然而,基于 Kafka 构建数据管道是具有挑战性的,因为它结合各种各样的工具才能从不同的数据系统中收集数据。一个工具从数据库中导入更新,另一个导入日志,再有一个导出到 HDFS 等等。这就导致了构建一个数据管道需要大量的工程设计并带来了高昂的运维开销。另外,其中的一些工具并不胜任这项工作,数据集成工具生态系统的零碎性导致了创造性但误导性的解决方案,比如滥用流处理框架以实现数据集成的目的。
Kafka Connect 提供了一种整合 Kafka 和其他系统数据的框架,以 Kafka 为支撑,为 Kafka 和其它系统创建并管理可扩展的、可信赖的流数据提供了模型。它可以通过 Connect 简单快捷的将大数据从 Kafka 中导入导出,数据范围涵盖关系型数据库,日志和度量数据,Hadoop 和数据仓库,NoSQL 数据存储,搜索索引等等。还有一点也很重要,相对于 Consumer 和 Producer,KafkaConnect 省掉了更多的开发工作,尤其是编码部分,这使得应用开发人员更容易上手使用。
Kafka Connect 的看家本领
Kafka Connect 与用户交互的核心概念是 Connector。Connector 是一个逻辑作业,负责管理 Kafka 和另一个系统之间的数据复制,每个 Connector 可以实例化一组实际复制数据的 task。 通过简单配置将单个 job 分解为多个 task,Kafka Connect 提供了内置的并行支持和可扩展的数据复制。Connector 和 tasks 是工作的逻辑单元,必须在进程中执行。Kafka Connect 将这些进程称为 workers。
Kafka Connect 有两个核心概念:Source 和 Sink。 Source 负责导入数据到 Kafka,Sink 负责从 Kafka 导出数据,它们都被称为 Connector。
(图片来自 Confluent 官网)
Kafka Connect 提供了以下特性:
- Kafka 连接器通用框架:规范化其他数据系统与 Kafka 的集成,简化了连接器开发,部署和管理
- 分布式和单机模式:上至大型的 持整个 organization 的集中管理服务,下至开发、测试和小规模的生产部署
- REST 接口:使用 REST API 提交和管理 Connector
- 自动 offset 管理:只需从 Connector 获取一些信息,Kafka Connect 就可以自动管理 offset 提交,因此连接器开发人员不需要担心这个容易出错的部分
- 分布式和可扩展性:Kafka Connect 基于现有的组管理协议。可以添加更多的 worker 来扩展 Kafka Connect 群集。
- 流/批量集成:利用 Kafka 的现有功能,Kafka Connect 是桥接流媒体和批数据系统的理想解决方案
Kafka Connect 的实战导航
workers 有两种部署方式:Standalone 和 Distributed。
Standalone 模式
单机模式下,所有的工作运行在同一进程中完成。
启动方式:
第一个参数是 worker 的配置,序列化格式以及提交 offset 的频率等设置,紧接着的参数是 Connector 配置文件。可以配置多个 Connector,多个 Connector 运行在在同一进程的不同线程中。
worker 必须配置:
- bootstrap.servers:Kafka 服务器列表
- key.converter:Key 转换器类。常见格式的示例包括 JSON 和 Avro。
- value.converter:Value 的转换器类。常见格式的示例包括 JSON 和 Avro。
另外,Standalone 模式还有一项重要配置:
- offset.storage.file.filename – 存储 offset 的文件
以自带的 file 的 sink 和 source 为例:
test.txt 文件内容:
启动 source:
启动 sink:
sink 控制台会输出文件内容:
继续往文件中追加内容:
Distributed 模式
分布式模式能够自动平衡 worker,允许动态扩展/收缩,并在活动 task 以及配置和 offset 数据提交中提供容错机制。分布式模式下 worker 运行的一个基本场景:
(图片来自 Confluent 官网)
启动方式:
和单机模式的区别在于启动的类和配置参数,这些参数改变了 KafkaConnect 进程如何决定存储配置,如何分配工作,以及在哪里存储偏 offset 和 task 状态。 在分布式模式下,Kafka Connect 在 Kafka topic 存储 offset,配置和 task 状态。 建议手动创建 offset,配置和状态的 topic,这样方便控制分区数和复制因子。 如果在启动 Kafka Connect 时尚未创建主题,那么将使用默认分区数和复制因子自动创建主题,如果使用自动创建功能,task 启动时会无法获取到 topic 信息,task 会无法正常工作。
另外的一些重要配置:
- group.id(默认 topic:connect-cluster):Connect 集群组的唯一名称;请注意,不能与消费者组 ID 冲突
- config.storage.topic(默认 topic:connect-configs):用于存储 Connector 和 task 配置的 topic;请注意,这应该是单分区,多副本,压缩的主题。 最好手工创建
- offset.storage.topic(默认 topic:connect-offsets):用于存储 offset 的 topic;该 topic 应该有很多分区,多副本,使用压缩
- status.storage.topic(默认 connect-status):用于存储状态的 topic;此 topic 可以有多个分区,应该使用多副本,压缩机制
分布式的 Connector 需要通过 rest API 来维护。本文通过 firefox 的 rest 插件搞定:
以自带的 file 的 sink 和 source 为例:
启动:
添加消息头:
增加 sink 和 sourceConnector:
Sink:
Source:
REST API 还另外提供了查询/修改/删除等维护接口,可参照官网说明,在此不再赘述。
此时消费 test topic:
通过配置禁用 schema,可以清掉 topic 中的 schema 信息:
近期,Kafka 还提供了一项很实用的功能,可以通过配置 transformations 进行轻量级的消息即时修改, 它们可以方便数据传送和事件路由,Kafka Connect 的功能越来越强大。
Kafka Connect 的前景展望
目前 Kafka Connect 已经支持包括 HDFS、JDBC、Elasticsearch、HBsase 等 五六十种 Connector,并仍然在发展壮大中。Kafka 以稳定、健壮、高效的姿态在众多大数据组件中以时间和效率拔得头筹,Kafka Connect 功能也日益引人注目。
正如本文开始所描述,数据的速度和敏捷性决定了数据的价值,价值就是市场导向。