网站开发 占位符,设计素材网站飘,网建贷款,注册平台下载zookeeper
Kafka 依赖 Zookeeper 进行分布式协调#xff0c;所以需要下载Zookeeper #xff0c;当然你也可以使用kafka包里自带的一个默认配置的 Zookeeper。这里我们单独下载一个
访问Zookeeper官方下载页面在页面中找到最新的稳定版本#xff0c;点击相应的下载链接…下载zookeeper
Kafka 依赖 Zookeeper 进行分布式协调所以需要下载Zookeeper 当然你也可以使用kafka包里自带的一个默认配置的 Zookeeper。这里我们单独下载一个
访问Zookeeper官方下载页面在页面中找到最新的稳定版本点击相应的下载链接下载 Zookeeper 的压缩包文件 解压 Zookeeper将下载的压缩包文件解压到一个目录例如 D:\zookeeper。解压后目录结构类似于
D:\zookeeper
├── bin
├── conf
├── lib
├── logs
└── ...配置 Zookeeper,打开D:\zookeeper\conf 目录将 zoo_sample.cfg 文件复制并重命名为 zoo.cfg。使用文本编辑器如 Notepad打开 zoo.cfg 文件并检查以下配置
tickTime2000
dataDirD:/zookeeper/data
clientPort2181
tickTime 是 Zookeeper 服务器和客户端之间的心跳时间以毫秒为单位。dataDir 是 Zookeeper 存储数据的目录确保路径是有效的并且存在。clientPort 是 Zookeeper 服务的端口默认是 2181。 确保 D:\zookeeper\data 目录存在如果不存在请手动创建。
启动 Zookeeper,打开命令提示符winR输入cmd,切换到D盘进入zookeeper\bin,执行zkServer.cmd在bin目录下执行zkServer.cmd status可以看到 Zookeeper 的运行状态 执行zkCli.cmd -server localhost:2181可以连接到 Zookeeper 客户端
下载kafka
访问Kafka 官方下载页面下载最新稳定版本的 Kafka 二进制文件解压 Kafka 将下载的 Kafka 压缩包解压到一个目录例如 D:\kafka。启动 Kafka 服务器,打开一个新的命令提示符窗口进入 Kafka 的 bin\windows 目录
cd D:\kafka\bin\windows
使用以下命令启动 Kafka 服务器
kafka-server-start.bat ..\..\config\server.properties
kafka的启动需要加载config\server.properties的配置。
窗口不要关闭至此你已启动了zookeeper和kafka。当然你也可以用kafka自带的zookeeper。自带的启动方式为 打开命令提示符进入 Kafka 的 bin\windows 目录
cd D:\kafka\bin\windows
zookeeper-server-start.bat ..\..\config\zookeeper.properties
注意如果你已经启动了单独下载安装的zookeeper就不要再启动kafka自带的zookeeper了否则可能出现端口被占用的情况如果出现端口被占用请杀死对应的进程。如果失败请检查2181端口是否被占用netstat -ano | findstr :2181如果有被占用结束进程taskkill /F /PID 1234。。这里的1234pid要写实际的pid
安装PHP 的 Kafka 扩展
为了让 PHP 能够与 Kafka 交互你需要安装 rdkafka 扩展并且可以选择合适的 Kafka PHP 客户端库如 longlang/phpkafka。
安装 rdkafka 扩展 访问 PECL: rdkafka 并下载适用于你 PHP 版本的 php_rdkafka.dll 文件。 点击查看自己对应的版本下载对应的包。确定php的版本号操作系统位数,nts或者ts。如果不清楚可以使用phpinfo来查看自己的信息。或者直接使用php -v 解压文件主要用到2个dll文件将librdkafka.dll放在php的安装目录下比如D:\phpstudy_pro\Extensions\php\php7.4.3nts将php_rdkafka.dll放在ext中比如 D:\phpstudy_pro\Extensions\php\php7.4.3nts\ext编辑 php.ini 文件开启rdkafka扩展
extensionphp_rdkafka.dll或者
extensionrdkafka
保存文件重启php
安装 Kafka PHP 客户端库
进入你的php项目使用composer安装longlang/phpkafka,或者nmred/kafka-php longlang
composer require longlang/phpkafka
nmred
composer require nmred/kafka-php
如果你的composer报错比如laravel框架中laravel/horizon的错误是缺少PCNTL扩展这个扩展windows下不支持可以忽略平台使用
composer require longlang/phpkafka --ignore-platform-reqsnmred
composer require nmred/kafka-php --ignore-platform-reqs
以下使用nmred 举例
配置和使用 Kafka
创建kafka的配置文件
laravel 中在config新增一个kafka.php
?phpreturn [brokers env(KAFKA_BROKERS, localhost:9092),consumer_group_id env(KAFKA_CONSUMER_GROUP_ID, laravel-consumer-group),topics [test-topic [partition 0,replica 1,],],
];创建 Kafka 生产者
创建一个生成类
?phpnamespace App\Http\Controllers;use RdKafka\Producer;class KafkaProducerController extends Controller
{public function produce(){$conf new \RdKafka\Conf();$producer new Producer($conf);$producer-addBrokers(config(kafka.brokers));$topic $producer-newTopic(test-topic);$topic-produce(RD_KAFKA_PARTITION_UA, 0, Hello Kafka);$producer-flush(10000);return Message produced successfully;}
}
配置好对应的api路由后访问一次就是生产一条消息
创建kafka消费者
这里使用的是laravel框架需要在app/console/commands下创建一个定时任务
?phpnamespace App\Console\Commands;use Illuminate\Console\Command;
use RdKafka\Consumer;
use RdKafka\TopicConf;class KafkaConsumer extends Command
{protected $signature kafka:consume;protected $description Consume messages from Kafka;public function handle(){$conf new \RdKafka\Conf();$conf-set(group.id, config(kafka.consumer_group_id));// 开启自动提交偏移量$conf-set(enable.auto.commit, true);$consumer new Consumer($conf);$consumer-addBrokers(config(kafka.brokers));$topicConf new TopicConf();$topicConf-set(auto.offset.reset, latest);$topic $consumer-newTopic(test-topic, $topicConf);// 开始从指定分区消费消息注意不再从头开始。$topic-consumeStart(0, RD_KAFKA_OFFSET_STORED);while (true) {// 消费消息等待时间为1000ms。$message $topic-consume(0, 1000);// 检查是否成功消费到消息并且没有错误。if ($message $message-err RD_KAFKA_RESP_ERR_NO_ERROR) {// 输出消息的内容。$this-info($message-payload);}}}
}已经拿到消息了具体的kafka的操作这里不细讲只注重windows下本地开发环境kafka的安装。记得在消费时开启你的zookeeper和kafka。