一、Kafka介绍
Kafka创建背景
Kafka 是一个消息系统,原本开发自 LinkedIn,用作 LinkedIn 的活动流(Activity Stream)和运营数据处理管道(Pipeline)的基础。现在它已被多家不同类型的公司 作为多种类型的数据管道和消息系统使用。
活动流数据是几乎所有站点在对其网站使用情况做报表时都要用到的数据中最常规的部分。活动数据包括页面访问量(Page View)、被查看内容方面的信息以及搜索情况等内容。这种数据通常的处理方式是先把各种活动以日志的形式写入某种文件,然后周期性地对这些文件进行统计分析。运营数据指的是服务器的性能数据(CPU、IO 使用率、请求时间、服务日志等等数据)。运营数据的统计方法种类繁多。
近年来,活动和运营数据处理已经成为了网站软件产品特性中一个至关重要的组成部分,这就需要一套稍微更加复杂的基础设施对其提供支持。
Kafka简介
Kafka是一种分布式的,基于发布/订阅的消息系统。
主要设计目标如下:
- 以时间复杂度为 O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间复杂度的访问性能。
- 高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒 100K 条以上消息的传输。
- 支持 Kafka Server 间的消息分区,及分布式消费,同时保证每个 Partition 内的消息顺序传输。
- 同时支持离线数据处理和实时数据处理。
- Scale out:支持在线水平扩展。
Kafka基础概念
专业名称解释
- Broker
一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic。 - Producer
消息生产者,就是向kafka broker发消息的客户端。 - Consumer
消息消费者,向kafka broker取消息的客户端。 - Topic
每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处) - Consumer Group
每个Consumer属于一个特定的Consumer Group,可为每个Consumer指定group name,若不指定group name则属于默认的分组。 - Partition
一个庞大大的topic可以分布到多个broker上,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id。kafka只保证按一个partition中的顺序将消息发给consumer,不保证一个topic的整体的顺序。Partition是物理上的概念,方便在集群中扩展,提高并发。
消息功能
- 点对点模式
点对点模型通常是一个基于拉取或者轮询的消息传递模型,消费者主动拉取数据,消息收到后从队列移除消息,这种模型不是将消息推送到客户端,而是从队列中请求消息。特点是发送到队列的消息被一个且只有一个消费者接收处理,即使有多个消费者监听队列也是如此。
- 发布订阅模式
发布订阅模型则是一个基于推送的消息传送模型,消息产生后,推送给所有订阅者。发布订阅模型可以有多种不同的订阅者,临时订阅者只在主动监听主题时才接收消息,而持久订阅者则监听主题的所有消息,即使当前订阅者不可用,处于离线状态。
Kafka如何保证可靠性
当我们讨论可靠性的时候,我们总会提到保证*这个词语。可靠性保证是基础,我们基于这些基础之上构建我们的应用。比如关系型数据库的可靠性保证是ACID,也就是原子性(Atomicity)、一致性(Consistency)、隔离性(Isolation)和持久性(Durability)。
Kafka 中的可靠性保证有如下四点:
- 对于一个分区来说,它的消息是有序的。如果一个生产者向一个分区先写入消息A,然后写入消息B,那么消费者会先读取消息A再读取消息B。
- 当消息写入所有in-sync状态的副本后,消息才会认为已提交(committed)。这里的写入有可能只是写入到文件系统的缓存,不一定刷新到磁盘。生产者可以等待不同时机的确认,比如等待分区主副本写入即返回,后者等待所有in-sync状态副本写入才返回。
- 一旦消息已提交,那么只要有一个副本存活,数据不会丢失。
- 消费者只能读取到已提交的消息。
使用这些基础保证,我们构建一个可靠的系统,这时候需要考虑一个问题:究竟我们的应用需要多大程度的可靠性?可靠性不是无偿的,它与系统可用性、吞吐量、延迟和硬件价格息息相关,得此失彼。因此,我们往往需要做权衡,一味的追求可靠性并不实际。
Quick Start
下载安装Kafka,解压安装包
- 从官网下载Kafka的安装包。
#1.1 浏览器打开链接 https://www.apache.org/dyn/closer.cgi?path=/kafka/2.6.0/kafka_2.13-2.6.0.tgz
#1.2 命令下载
wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.6.0/kafka_2.13-2.6.0.tgz
tar -xzf kafka_2.13-2.6.0.tgz
cd kafka_2.13-2.6.0 - 修改配置,启动服务
2.1 修改配置
vim config/server.properties
把参数修改成如下图:
2.2 启动服务
Start the ZooKeeper service
Note: Soon, ZooKeeper will no longer be required by Apache Kafka.
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
Start the Kafka broker service
bin/kafka-server-start.sh -daemon config/server.properties
生产消息
- Maven依赖配置
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!--spring-kafka-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency> - 生产消息application.properties配置
spring.application.name=springboot-kafka-app-provider
server.port=8081
# ============== kafka ==================
# 指定kafka 代理地址,可以多个
spring.kafka.bootstrap-servers=127.0.0.1:9092
# =============== provider =======================
spring.kafka.producer.retries=0
# 每次批量发送消息的数量
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
# 指定消息key和消息体的编解码方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer - 具体生产消息核心代码KafkaProducer
/**
* kafka消息生产
*/
@Component
@Slf4j
public class KafkaProducer {
@Resource
private KafkaTemplate<String, String> kafkaTemplate;
private Gson gson = new GsonBuilder().create();
AtomicLong atomicLong = new AtomicLong(0L);
public void send() {
String msg = "中国崛起:"+atomicLong.incrementAndGet();
send(msg);
}
public void send(final String msg) {
Message message = new Message();
message.setId(System.currentTimeMillis());
message.setMsg(msg);
message.setSendTime(new Date());
log.info("+++++++++++++++++++++send message = {}", gson.toJson(message));
kafkaTemplate.send(KafkaTopicConstant.testTopicName, message.getId().toString() , gson.toJson(message));
} - 具体生产消息核心代码MessageController
/**
* 消息发送controller
* @author vincent.li
*/
public class MessageController {
KafkaProducer kafkaProducer;
public String send(String msg){
try {
kafkaProducer.send(msg);
} catch (Exception e) {
log.error(e.getMessage(), e);
return "failure";
}
return "success";
}
} - 生产消息服务启动类SpringBootKafkaProducerApplication
/**
* kafka消息生产服务SpringBoot 启动类
* @author vincent.li
*/
@SpringBootApplication(scanBasePackages = {"com.vincent.app.kafka.producer"})
public class SpringBootKafkaProducerApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context = SpringApplication.run(SpringBootKafkaProducerApplication.class, args);
System.out.println("O(∩_∩)O哈哈~ 【TEST-KAFKA-APP-PRODUCER】启动成功 O(∩_∩)O哈哈~");
KafkaProducer sender = context.getBean(KafkaProducer.class);
for (int i = 0; i < 1000; i++) {
sender.send();
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}消费消息
- Maven配置
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!--spring-kafka-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency> - 消费消息application.properties配置
spring.application.name=springboot-kafka-app-consumer
server.port=8082
# ============== kafka ==================
# 指定kafka 代理地址,可以多个
spring.kafka.bootstrap-servers=127.0.0.1:9092
#=============== consumer =======================
# 指定默认消费者group id
spring.kafka.consumer.group-id=test-demo-consumer-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=2000
# 指定消息key和消息体的编解码方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer - 消费消息核心代码
/**
* kafka消息消费
* @author vincent.li
*/
@Component
@Slf4j
public class KafkaConsumer {
@KafkaListener(topics = {KafkaConsumerTopicConstant.testTopicName})
public void listen(ConsumerRecord<?, ?> record) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
log.info(KafkaConsumerTopicConstant.testTopicName+"----------------- record==>{}", record);
log.info(KafkaConsumerTopicConstant.testTopicName+"------------------ message==>{}" , message);
}
}
} - 消费消息启动服务类SpringBootKafkaConsumerApplication
/**
* kafka消息消费服务SpringBoot 启动类
* @author vincent.li
*/
@SpringBootApplication(scanBasePackages = {"com.vincent.app.kafka.consumer"})
public class SpringBootKafkaConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(SpringBootKafkaConsumerApplication.class, args);
System.out.println("O(∩_∩)O哈哈~ 【TEST-KAFKA-APP-CONSUMER】启动成功 O(∩_∩)O哈哈~");
}
}运行结果
- 通过controller方法生产消息
- 生产消息服务控制台打印日志
- 消费消息服务控制台打印日志
完整demo代码地址
Github地址: https://github.com/leechaoqiang/springcloud-app-parent
Gitee地址:https://gitee.com/chaoqianglee/springcloud-app-parent