网站开发开题报告引言,小人发射爱心代码html,大兴网站建设制作,番禺网站建设培训1. 下载安装
官网下载地址#xff1a;Apache Kafka
下载对应的文件 上传到服务器上#xff0c;解压
tar -xzf kafka_2.13-3.7.0.tgz目录结果如下
├── bin
│ └── windows
├── config
│ └── kraft
├── libs
├── licenses
└── site-docs官方文档…1. 下载安装
官网下载地址Apache Kafka
下载对应的文件 上传到服务器上解压
tar -xzf kafka_2.13-3.7.0.tgz目录结果如下
├── bin
│ └── windows
├── config
│ └── kraft
├── libs
├── licenses
└── site-docs官方文档Apache Kafka
kafka 有两种启动方式ZooKeeper 和 KRaft这里采用 KRaft 的方式使用 kraft 目录下的配置文件
config
├── connect-console-sink.properties
├── connect-console-source.properties
├── connect-distributed.properties
├── connect-file-sink.properties
├── connect-file-source.properties
├── connect-log4j.properties
├── connect-mirror-maker.properties
├── connect-standalone.properties
├── consumer.properties
├── kraft
│ ├── broker.properties
│ ├── controller.properties
│ └── server.properties
├── log4j.properties
├── producer.properties
├── server.properties
├── tools-log4j.properties
├── trogdor.conf
└── zookeeper.properties修改 server.properties 文件 process.rolesKRaft 模式角色 brokercontrollerbroker,controller node.id节点 ID需要为每个节点分配一个唯一的 IDcontroller.quorum.votersController 的投票者配置log.dirs日志目录 初始化集群先生成一个 UUID
./bin/kafka-storage.sh random-uuid再执行命令使用生成的 UUID 完成集群初始化
./bin/kafka-storage.sh format -t thCDFveGRleJro7zTaOOGA -c ./config/kraft/server.properties然后启动 Kafka
./bin/kafka-server-start.sh -daemon ./config/kraft/server.properties2. Spring Boot 集成
2.1 引入依赖
dependenciesdependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependencydependencygroupIdorg.springframework.kafka/groupIdartifactIdspring-kafka/artifactIdversion3.1.4/versionexclusionsexclusiongroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactId/exclusion/exclusions/dependencydependencygroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactIdversion3.7.0/version/dependency
/dependencies2.2 配置文件
spring:kafka:bootstrap-servers: 127.0.0.1:9092consumer:group-id: fable-group2.3 生产者
使用 KafkaTemplate 实现消息的生产
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;RestController
public class ProducerController {private final KafkaTemplateString, String kafkaTemplate;Autowiredpublic ProducerController(KafkaTemplateString, String kafkaTemplate) {this.kafkaTemplate kafkaTemplate;}GetMapping(/send/message)public String sendMessage() {kafkaTemplate.send(test, Hello, Kafka!);return Message sent successfully;}
}2.4 消费者
添加 KafkaListener 注解监听消息
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;Service
public class ConsumerService {KafkaListener(topics test, groupId fable-group)public void listen(ConsumerRecordString, String consumerRecord) {System.out.println(Received message: consumerRecord.value());}
}2.5 启动类
添加 EnableKafka 注解
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.EnableKafka;SpringBootApplication
EnableKafka
public class FableApplication {public static void main(String[] args) {SpringApplication.run(FableApplication.class, args);}
}2.6 测试
访问 /send/message 接口可以看到控制台打印出接收到的消息
Received message: Hello, Kafka!