1. Apache Kafka介绍
Kafka 是最初由Linkedin公司开发,是一个分布式、支持分区(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统。它的最大特性就是可以实时的处理大量数据,以满足各种需求场景:比如基于Hadoop的批处理系统、低延迟的实时系统、Storm/Spark Streaming流式引擎、web/Nginx日志、访问日志、消息服务等等,Kafka使用 Scala 语言编写,Linkedin于2010年将Kafka贡献给 Apache 基金会并称为顶级开源项目。
2.消息系统分类
一个消息系统负责将数据从一个应用传递到另外一个应用,应用只需关注于数据,无需关注数据在两个或多个应用间是如何传递的。分布式消息传递基于可靠的消息队列,在客户端应用和消息系统之间异步传递消息。有两种主要的消息传递模式:
- 点对点传递模式
- 发布-订阅模式
大部分的消息系统选用发布-订阅模式。
2.1 点对点消息介绍
2.1.1 介绍
在点对点消息系统中,消息持久化到一个队列中。此时,将有一个或多个消费者消费队列中的数据。但是一条消息只能被消费一次。当一个消费者消费了队列中的某条数据之后,该条数据则从消息队列中删除。该模式即使有多个消费者同时消费数据,也能保证数据处理的顺序。这种架构描述示意图如下:
2.1.2 特点
一般基于Pull(拉取) 或者Polling(轮询) 接收消息。
发送到队列的消息被一个且仅一个接收者接收,即使多个接收者在同一个队列中监听同一消息。
既支持异步 “即发即弃” 的消息传送方式,也支持同步请求/应答传送方式
- 即发即弃:发送消息之后就返回不用关心数据是否被处理。
- 同步请求/应答:在发送的消息被处理之后才会返回
2.2 发布-订阅消息系统
2.2.1 介绍
在发布-订阅消息系统中,消息被持久化到一个主题(topic)中。与点对点消息系统不同的是,消费者可以订阅一个或多个topic,消费者可以消费该topic中所有的数据,同一条数据可以被多个消费者消费,数据被消费后不会立马删除。在发布-订阅消息系统中,消息的生产者称为发布者,消费者称为订阅者。该模式的示例图如下:
2.2.2 特点
- 发布到一个主题的消息可以被多个订阅者所接收。
- 发布/订阅既可基于push消费数据,也可基于Pull或者Polling消费数据
- 解耦能力比点对点消息系统更强
- 支持多播
2.3 消息系统适用场景
场景 | 说明 |
---|---|
系统解耦 | 各个系统之间通过消息系统这个统一的接口交换数据,无需了解彼此的存在 |
数据冗余 | 部分消息系统具有消息持久化能力,可规避消息处理前丢失的风险 |
系统扩展 | 消息系统是统一的数据接口,各系统可独立扩展 |
峰值处理 | 消息系统可顶住峰值流量,业务系统可根据处理能力从消息系统中获取并处理对应量的请求 |
可恢复性 | 系统中部分组件失效并不影响整个系统,它回复后扔可从消息系统中获取并处理数据 |
异步通信 | 在不需要立即处理请求的场景下,可以将请求放入消息系统,适合的时候再处理 |
2.4 常见消息系统对比
消息系统 |
RabbitMQ | ActiveMQ | RocketMQ | Kafka |
---|---|---|---|---|
社区/公司 | Mozilla Publish License | Apache | Ali | Apache |
授权方式 | 开源 | 开源 | 开源 | 开源 |
开发语言 | Erlang | Java | Java | Scala&Java |
批量操作 | 不支持 | 支持 | 支持 | 支持 |
部署方式 | 单机/集群 | 单机/集群 | 单机/集群 | 单机/集群 |
HA | Master/slave模式,master提供服务,slave仅备份 | 基于Zookeeper + LevelDB的Master Slave实现方式 | 支持多Master模式、多Master多Slave模式、异步复制模式、多Master多Slave同步双写模式 | 支持Relica机制,leader宕机后备份自动顶替,并重新选举leader(基于zk) |
数据可靠性 | 仅保证数据不丢失,有slave用作备份 | master/slave | 支持异步实时刷盘,同步刷盘,同步复制,异步复制 | 数据可靠,并且有replica机制,有容错容灾能力 |
有序性 | 只有使用一个client才能保证有序 | 有序 | 有序 | 多Client保证有序 |
管理界面 | 较好 | 一般 | 命令行 | 官方只提供命令行,Yahoo开源了自己的Kafka管理界面,Kafka Manager |
负载均衡 | 支持 | 支持 | 支持 | 支持 |
3. kafka 部署
3.1 单机部署,使用kafka自带zookeeper
- 启动zookeeper
[root@node1 kafka_2.12-1.0.2]# cd /opt/kafka_2.12-1.0.2
# 启动zookeeper
[root@node1 kafka_2.12-1.0.2]# bin/zookeeper-server-start.sh config/zookeeper.properties # 前台启动
[2020-02-09 00:24:25,127] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...
[2020-02-09 00:24:25,178] INFO maxSessionTimeout set to -1 (org.apache.zookeeper.server.ZooKeeperServer)
[2020-02-09 00:24:25,187] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)
- 检测zookeeper是否启动成功
# 在新的命令行窗口执行
[root@node1 ~]# lsof -i:2181
COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
java 5108 root 90u IPv6 74947 0t0 TCP *:eforward (LISTEN)
- 启动kafka broker
# 在新的命令行窗口执行
# bin/kafka-server-start.sh [-daemon] server.properties [--override property=value]* 设置-daemon 则会后台运行 否则前台运行
[root@node1 kafka_2.12-1.0.2]# bin/kafka-server-start.sh config/server.properties
[2020-02-09 00:30:39,912] INFO KafkaConfig values:
advertised.host.name = null
advertised.listeners = null
advertised.port = null
alter.config.policy.class.name = null
authorizer.class.name =
auto.create.topics.enable = true
auto.leader.rebalance.enable = true
background.threads = 10
......
[2020-02-09 00:30:42,259] INFO Kafka version : 1.0.2 (org.apache.kafka.common.utils.AppInfoParser)
[2020-02-09 00:30:42,259] INFO Kafka commitId : 2a121f7b1d402825 (org.apache.kafka.common.utils.AppInfoParser)
[2020-02-09 00:30:42,262] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
- 检查kafka broker 是否启动成功
# 在新的命令行窗口执行
[root@node1 ~]# lsof -i:2181
COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
java 5108 root 90u IPv6 74947 0t0 TCP *:eforward (LISTEN)
java 5108 root 91u IPv6 75799 0t0 TCP localhost:eforward->localhost:58961 (ESTABLISHED)
java 5392 root 89u IPv6 75798 0t0 TCP localhost:58961->localhost:eforward (ESTABLISHED)
[root@node1 ~]#
[root@node1 ~]# lsof -i:9092
COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
java 5392 root 104u IPv6 75813 0t0 TCP *:XmlIpcRegSvc (LISTEN)
java 5392 root 111u IPv6 75818 0t0 TCP node1:40656->node1:XmlIpcRegSvc (ESTABLISHED)
java 5392 root 112u IPv6 75819 0t0 TCP node1:XmlIpcRegSvc->node1:40656 (ESTABLISHED)
这时kafka已经部署完成,接下来检测kafka是否能够正常使用
- 创建topic
# 在新的命令行窗口执行
# 测试创建topic
[root@node1 kafka_2.12-1.0.2]# bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test1 --partitions 3 --replication-factor 1
Created topic "test1".
# 查看topic 列表
# 检查topic 状态
[root@node1 kafka_2.12-1.0.2]# bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic test1
Topic:test1 PartitionCount:3 ReplicationFactor:1 Configs:
Topic: test1 Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: test1 Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: test1 Partition: 2 Leader: 0 Replicas: 0 Isr: 0
# 这里的leader是指 broker的id
- 启动producer 并发送数据
# 在新的命令行窗口执行
# 启动producer发送消息
[root@node1 kafka_2.12-1.0.2]# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test1
>1
>2
>3
>4
>5
- 启动consumer 并消费数据
# 在新的命令行窗口执行
# 启动consumer消费消息
[root@node1 kafka_2.12-1.0.2]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test1
1
2
3
4
5
3.2 部署kafka集群,使用外部zookeeper
- 修改配置文件
# 需要修改的配置项有四个 broker.id、listeners、log.dirs、zookeeper.connect
# 每个borker的id是唯一的,多个broker要设置不同的id
vim config/server.properties
broker.id=0
# listeners一定要配置成为IP地址;如果配置为localhost或服务器的hostname,
# 在使用java发送数据时就会抛出异 常:org.apache.kafka.common.errors.TimeoutException: Batch Expired 。
# 因为在没有配置advertised.host.name 的情况下,Kafka并没有像官方文档宣称的那样改为广播我们配置的host.name,
# 而是广播了主机配置的hostname。远端的客户端并没有配置 hosts,所以自然是连接不上这个hostname的
listeners=PLAINTEXT://172.16.72.150:9092
# 存储数据路径,默认是在/tmp目录下,需要修改 首先需要创建对应文件夹
log.dirs=/opt/kafka_2.12-1.0.2/kafka-logs
# zookeeper地址
zookeeper.connect=node1:2181,node2:2181,node3:2181
- 将kafka 安装文件拷贝到其他节点并修改broker.id 与listeners 中的IP地址
scp -r kafka_2.12-1.0.2/ root@node2:/opt
scp -r kafka_2.12-1.0.2/ root@node3:/opt
#在node2中修改 server.properties 配置文件
broker.id=1
listeners=PLAINTEXT://172.16.72.151:9092
#在node3中修改
broker.id=2
listeners=PLAINTEXT://172.16.72.152:9092
- 启动kafka 服务
#分别在每台服务器上执行启动命令
#前台运行
bin/kafka-server-start.sh config/server.properties
#后台运行
bin/kafka-server-start.sh -daemon config/server.properties
#日志运行 KAFKA_HOME/logs
-rw-r--r--. 1 root root 172 2月 10 15:47 log-cleaner.log #Kafka日志清理操作相关统计信息
-rw-r--r--. 1 root root 17K 2月 10 15:47 kafkaServer.out #KafkaServer运行日志
-rw-r--r--. 1 root root 17K 2月 10 15:47 server.log #KafkaServer运行日志
-rw-r--r--. 1 root root 4.5K 2月 10 15:47 kafkaServer-gc.log.0.current # Kafka运行过程,进行GC操作时的日志
-rw-r--r--. 1 root root 180 2月 10 15:47 state-change.log #Kafka分区角色切换等状态转换日志
-rw-r--r--. 1 root root 4.3K 2月 10 15:47 controller.log #KafkaController运行时日志
-rw-r--r--. 1 root root 0 2月 9 21:27 kafka-authorizer.log #Kafka权限认证相应操作日志
-rw-r--r--. 1 root root 0 2月 9 21:27 kafka-request.log #Kafka相应网络请求日志
-rw-r--r--. 1 root root 1976 2月 9 21:27 zookeeper-gc.log.0.current # kafka 自带的Zookeeper GC 日志
#然后查看端口使用情况
# node1
[root@node1 ~]# lsof -i:2181
COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
java 7728 root 46u IPv6 80681 0t0 TCP *:eforward (LISTEN)
java 7728 root 59u IPv6 84114 0t0 TCP node1:eforward->node1:43554 (ESTABLISHED)
java 8177 root 89u IPv6 84113 0t0 TCP node1:43554->node1:eforward (ESTABLISHED)
[root@node1 ~]# lsof -i:9092
COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
java 8177 root 104u IPv6 84127 0t0 TCP node1:XmlIpcRegSvc (LISTEN)
java 8177 root 108u IPv6 84133 0t0 TCP node1:40682->node1:XmlIpcRegSvc (ESTABLISHED)
java 8177 root 112u IPv6 84134 0t0 TCP node1:XmlIpcRegSvc->node1:40682 (ESTABLISHED)
java 8177 root 116u IPv6 84169 0t0 TCP node1:52665->node2:XmlIpcRegSvc (ESTABLISHED)
java 8177 root 120u IPv6 84171 0t0 TCP node1:46187->node3:XmlIpcRegSvc (ESTABLISHED)
#node2
[root@node2 ~]# lsof -i:2181
COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
java 25279 root 46u IPv6 50238 0t0 TCP *:eforward (LISTEN)
java 48812 root 89u IPv6 164762 0t0 TCP node2:34171->node3:eforward (ESTABLISHED)
[root@node2 ~]# lsof -i:9092
COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
java 48812 root 104u IPv6 164773 0t0 TCP node2:XmlIpcRegSvc (LISTEN)
java 48812 root 108u IPv6 164777 0t0 TCP node2:XmlIpcRegSvc->node1:52665 (ESTABLISHED)
#node3
[root@node3 ~]# lsof -i:2181
COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
java 24879 root 46u IPv6 49041 0t0 TCP *:eforward (LISTEN)
java 24879 root 60u IPv6 160074 0t0 TCP node3:eforward->node3:56261 (ESTABLISHED)
java 24879 root 61u IPv6 159881 0t0 TCP node3:eforward->node2:34171 (ESTABLISHED)
java 47833 root 89u IPv6 160073 0t0 TCP node3:56261->node3:eforward (ESTABLISHED)
[root@node3 ~]# lsof -i:9092
COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
java 47833 root 104u IPv6 160116 0t0 TCP node3:XmlIpcRegSvc (LISTEN)
java 47833 root 109u IPv6 160120 0t0 TCP node3:XmlIpcRegSvc->node1:46187 (ESTABLISHED)
- 创建topic
[root@node1 kafka_2.12-1.0.2]# bin/kafka-topics.sh --zookeeper node1:2181,node2:2181,node3:2181 --create --topic test1 --partitions 3 --replication-factor 1
Created topic "test1".
[root@node2 kafka_2.12-1.0.2]#
#查看所有topic
[root@node1 kafka_2.12-1.0.2]# bin/kafka-topics.sh --zookeeper node1:2181,node2:2181,node3:2181 --list
test1
# 查看topic 状态
[root@node1 kafka_2.12-1.0.2]# bin/kafka-topics.sh --zookeeper node1:2181,node2:2181,node3:2181 --describe --topic test1
Topic:test1 PartitionCount:3 ReplicationFactor:1 Configs:
Topic: test1 Partition: 0 Leader: 2 Replicas: 2 Isr: 2
Topic: test1 Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: test1 Partition: 2 Leader: 1 Replicas: 1 Isr: 1
- 启动消费者
#待生产者发送数据之后收到以下消息
[root@node2 kafka_2.12-1.0.2]# bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic test1
hello kafka
1
4
2
3
5
- 启动生产者并发送数据
[root@node1 kafka_2.12-1.0.2]# bin/kafka-console-producer.sh --broker-list node1:9092 --topic test1
>hello kafka
>1
>2
>3
>4
>5
4.kafka 架构介绍
Kafka发布订阅的对象是Topic。我们可以为每一类数据创建一个Topic,把向Topic发送消息的客户端称作为producer,把从Topic订阅消息的客户端称作是consumer。Producer和Consumer可以同时从多个Topic读写数据。一个Kafka集群由一个或多个broker 服务器组成,它负责持久化和备份具体的Kafka消息。Kafka架构如下图
- Zookeeper:可以是Zookeeper 单节点或者集群,也可以是Kafka自带的Zookeeper,做集群主要是为了Zookeeper的HA。
- Kafka Server: 也叫Broker,需要先启动Zookeeper,然后在启动Kafka Server,这两个都启动之后,kafka服务就可用了。
- Producer :也称为生产者或发布者,从外部拿数据,然后将数据发送到kafka topic当中。
- Consumer:也称为消费者或订阅者,从kafka topic 中消费数据。
消息系统通常都会由生产者、消费者和broker三大部分组成,生产者将消息写入broker,消费者从broker中读取数消息数据,具体步骤如下:
- 生产者客户端应用程序产生消息:
1.客户端连接对象将消息包装到请求发送到服务端
2.服务端的入口也有一个连接对象负责接收请求,并将消息以文件的形式存储起来。
3.服务端返回相应结果给生产者客户端。
- 消费者客户端应用消费数据:
1.客户端连接对象将消费信息包装到请求发送给服务端。
2.服务端从文件存储系统中取出消息数据。
3.服务端返回相应结果给消费者客户端。
4.客户端将响应结果还原成消息并开始处理消息数据。
Kafka中的Producer和Consumer采用的是push-and-pull模式,即Producer只管向Broker push消息,Consumer只管从Broker pull消息,两者对消息的生产和消费是异步的。
- kafka设计目标
- 高吞吐率:在廉价的商用机器上单机可支持每秒100w条消息的读写
- 消息持久化:所有消息均被持久化到磁盘(不会造成性能低下),无消息丢失(单节点磁盘问题支持多节点间的消息复制),支持消息重放
- 完全分布式:Producer,broker,consumer均支持水平扩展
- 同时满足适应在线流处理和离线批处理