根据官方介绍,Kafka Connect 是一种用于在 Kafka 和其他系统之间可扩展的、可靠的流式传输数据的工具。它使得能够快速定义将大量数据集合移入和移出 Kafka 的连接器变得简单。 Kafka Connect 可以获取整个数据库或从所有应用程序服务器收集指标到 Kafka 主题,使数据可用于低延迟的流处理。导出作业可以将数据从 Kafka topic 传输到二次存储和查询系统,或者传递到批处理系统以进行离线分析。 Kafka Connect 功能包括:
Kafka connector 通用框架,提供统一的集成 API
同时支持分布式模式和单机模式
REST 接口,用来查看和管理 Kafka connectors
自动化的 offset 管理,开发人员不必担心错误处理的影响
分布式、可扩展
流/批处理集成
KafkaCnnect 有两个核心概念:Source 和 Sink。 Source 负责导入数据到 Kafka,Sink 负责从 Kafka 导出数据,它们都被称为 Connector。
Kafka connect的几个重要的概念包括:connectors、tasks、workers 和 converters。
Connectors-通过管理任务来细条数据流的高级抽象
Tasks- 数据写入kafka和数据从kafka读出的实现
Workers-运行 connectors 和 tasks 的进程
Converters- kafka connect 和其他存储系统直接发送或者接受数据之间转换数据
1) Connectors:在kafka connect 中,connector 决定了数据应该从哪里复制过来以及数据应该写入到哪里去,一个 connector 实例是一个需要负责在 kafka 和其他系统之间复制数据的逻辑作业,connector plugin 是 jar 文件,实现了 kafka 定义的一些接口来完成特定的任务。
2) Tasks:task 是 kafka connect 数据模型的主角,每一个 connector 都会协调一系列的 task 去执行任务,connector 可以把一项工作分割成许多的 task,然后再把 task 分发到各个 worker 中去执行(分布式模式下),task 不自己保存自己的状态信息,而是交给特定的 kafka 主题去保存(config.storage.topic 和 status.storage.topic)。在分布式模式下有一个概念叫做任务再平衡(Task Rebalancing),当一个 connector 第一次提交到集群时,所有的 worker 都会做一个 task rebalancing 从而保证每一个 worker 都运行了差不多数量的工作,而不是所有的工作压力都集中在某个 worker 进程中,而当某个进程挂了之后也会执行 task rebalance。
3) Workers:connectors 和 tasks 都是逻辑工作单位,必须安排在进程中执行,而在 kafka connect 中,这些进程就是 workers,分别有两种 worker:standalone 和 distributed。这里不对 standalone 进行介绍,具体的可以查看官方文档。我个人觉得 distributed worker 很棒,因为它提供了可扩展性以及自动容错的功能,你可以使用一个 group.ip 来启动很多 worker 进程,在有效的 worker 进程中它们会自动的去协调执行 connector 和 task,如果你新加了一个 worker 或者挂了一个 worker,其他的 worker 会检测到然后在重新分配 connector 和 task。
4) Converters: converter 会把 bytes 数据转换成 kafka connect 内部的格式,也可以把 kafka connect 内部存储格式的数据转变成 bytes,converter 对 connector 来说是解耦的,所以其他的 connector 都可以重用,例如,使用了 avro converter,那么 jdbc connector 可以写 avro 格式的数据到 kafka,当然,hdfs connector 也可以从 kafka 中读出 avro 格式的数据。
3、kafka connect 的启动。
Kafka connect 的工作模式分为两种,分别是 standalone 模式和 distributed 模式。
在独立模式种,所有的 work 都在一个独立的进程种完成,如果用于生产环境,建议使用分布式模式,都在真的就有点浪费 kafka connect 提供的容错功能了。
standalone 启动的命令很简单,如下:
bin/connect-standalone.shconfig/connect-standalone.properties connector1.properties[connector2.properties …]
一次可以启动多个 connector,只需要在参数中加上 connector 的配置文件路径即可。
启动 distributed 模式命令如下:
bin/connect-distributed.shconfig/connect-distributed.properties
在 connect-distributed.properties 的配置文件中,其实并没有配置了你的 connector 的信息,因为在 distributed 模式下,启动不需要传递 connector 的参数,而是通过 REST API 来对 kafka connect 进行管理,包括启动、暂停、重启、恢复和查看状态的操作,具体介绍详见下文。
在启动 kafkaconnect 的 distributed 模式之前,首先需要创建三个主题,这三个主题的配置分别对应 connect-distributed.properties 文件中 config.storage.topic(default connect-configs)、offset.storage.topic (default connect-offsets) 、status.storage.topic (default connect-status)的配置,那么它们分别有啥用处呢?
config.storage.topic:用以保存 connector 和 task 的配置信息,需要注意的是这个主题的分区数只能是 1,而且是有多副本的。(推荐 partition 1,replica 3)
offset.storage.topic:用以保存 offset 信息。(推荐 partition50,replica 3)
status.storage.topic:用以保存 connetor 的状态信息。(推荐 partition10,replica 3)
以下是创建主题命令:
config.storage.topic=connect-configs
$ bin/kafka-topics –create –zookeeper localhost:2181 –topicconnect-configs –replication-factor 3 –partitions 1
offset.storage.topic=connect-offsets
$ bin/kafka-topics –create –zookeeper localhost:2181 –topicconnect-offsets –replication-factor 3 –partitions 50
status.storage.topic=connect-status
$ bin/kafka-topics –create –zookeeper localhost:2181 –topicconnect-status –replication-factor 3 –partitions 10
具体配置信息再次不在赘述,详见 kafka 官方文档:http://kafka.apache.org/documentation/#connect
4、通过 rest api 管理 connector
因为 kafka connect 的意图是以服务的方式去运行,所以它提供了 REST API 去管理 connectors,默认的端口是 8083,你也可以在启动 kafka connect 之前在配置文件中添加 rest.port 配置。
GET /connectors – 返回所有正在运行的 connector 名
POST /connectors – 新建一个 connector; 请求体必须是 json 格式并且需要包含 name 字段和 config 字段,name 是 connector 的名字,config 是 json 格式,必须包含你的 connector 的配置信息。
GET /connectors/{name} – 获取指定 connetor 的信息
GET /connectors/{name}/config – 获取指定 connector 的配置信息
PUT /connectors/{name}/config – 更新指定 connector 的配置信息
GET /connectors/{name}/status – 获取指定 connector 的状态,包括它是否在运行、停止、或者失败,如果发生错误,还会列出错误的具体信息。
GET /connectors/{name}/tasks – 获取指定 connector 正在运行的 task。
GET /connectors/{name}/tasks/{taskid}/status – 获取指定 connector 的 task 的状态信息
PUT /connectors/{name}/pause – 暂停 connector 和它的 task,停止数据处理知道它被恢复。
PUT /connectors/{name}/resume – 恢复一个被暂停的 connector
POST /connectors/{name}/restart – 重启一个 connector,尤其是在一个 connector 运行失败的情况下比较常用
POST /connectors/{name}/tasks/{taskId}/restart – 重启一个 task,一般是因为它运行失败才这样做。
DELETE /connectors/{name} – 删除一个 connector,停止它的所有 task 并删除配置。
作者:该取个啥名字呢
来源:CSDN
原文:https://blog.csdn.net/u011687037/article/details/57411790
版权声明:本文为博主原创文章,转载请附上博文链接!