微信小程序 购物网站开发,企业手机网站cms系统,wordpress图片视频主题,网络组建与维护试题一、kafka.zookeeper介绍
Kafka 简介#xff1a; Apache Kafka 是一个开源的分布式流处理平台和消息队列系统。它最初由LinkedIn开发#xff0c;并于2011年成为Apache软件基金会的顶级项目。
特点#xff1a;
高吞吐量#xff1a; Kafka 能够处理大规模的消息流#xf…一、kafka.zookeeper介绍
Kafka 简介 Apache Kafka 是一个开源的分布式流处理平台和消息队列系统。它最初由LinkedIn开发并于2011年成为Apache软件基金会的顶级项目。
特点
高吞吐量 Kafka 能够处理大规模的消息流并具有很高的吞吐量。 持久性 它将消息持久化到磁盘上因此即使消费者不在线也能保证消息不会丢失。 可伸缩性 Kafka 可以很容易地水平扩展以处理大量数据。 实时性 Kafka 可以提供几乎实时的消息传递适用于大多数实时数据处理需求。 用途
日志收集 Kafka 可以用作集中式的日志收集系统收集来自不同源头的日志数据。 消息队列 Kafka 可以用作分布式应用程序之间的消息队列用于解耦和异步通信。 流处理 Kafka 可以与流处理框架如Apache Spark、Apache Flink等结合使用用于实时数据处理和分析。 ZooKeeper 简介 Apache ZooKeeper 是一个开源的分布式协调服务最初也是由Yahoo开发的并于2010年成为Apache软件基金会的顶级项目。
特点
分布式协调 ZooKeeper 提供了分布式应用程序的协调服务包括配置管理、命名服务、分布式锁等。 高可用性 ZooKeeper 通过在集群中保持多个节点的复制来实现高可用性和容错性。 一致性 ZooKeeper 提供了严格的一致性确保所有的客户端在同一时间看到相同的数据视图。 用途
配置管理 ZooKeeper 可以用于分布式系统的配置管理例如动态配置更新。 命名服务 ZooKeeper 可以提供命名服务帮助分布式系统中的节点发现和通信。 分布式锁 ZooKeeper 可以用于实现分布式锁确保在分布式系统中对共享资源的互斥访问。 Kafka 和 ZooKeeper 的关系
在 Kafka 中ZooKeeper 主要用于管理集群的元数据如主题、分区、副本分配等、领导者选举以及生产者和消费者的协调。Kafka 依赖于 ZooKeeper 来确保分布式系统的稳定运行。通常情况下Kafka 和 ZooKeeper 会一起部署但它们是两个独立的项目各自提供不同的功能。
二、创建存储卷
nfs动态供给直通车
三、搭建Kafka集群
# 操作系统
# CentOS Linux release 7.9.2009 (Core)
lsb_release -a# 内核版本
# 3.10.0-1160.90.1.el7.x86_64
uname -a
# k8s 版本 1.21
# zookeeper 版本 3.4.10 kafka镜像版本0.11嫌低可以自己换kafka需要依赖zookeeper kafka的生产者与消费者需要在zookeeper中注册不然消费者怎么知道生产者是否存活之类的哈哈。废话不多说直接上干货
本文用的是statefulset和动态存储部署zookeeper和kafka集群。
zookeeper.yaml
apiVersion: v1
kind: Service
metadata:name: zk-hslabels:app: zk
spec:ports:- port: 2888name: server- port: 3888name: leader-electionclusterIP: Noneselector:app: zk
---
apiVersion: v1
kind: Service
metadata:name: zk-cslabels:app: zk
spec:ports:- port: 2181targetPort: 2181name: clientselector:app: zk
---
apiVersion: policy/v1beta1
kind: PodDisruptionBudget
metadata:name: zk-pdb
spec:selector:matchLabels:app: zkmaxUnavailable: 2
---
apiVersion: apps/v1
kind: StatefulSet
metadata:name: zk
spec:selector:matchLabels:app: zkserviceName: zk-hsreplicas: 3updateStrategy:type: RollingUpdatepodManagementPolicy: OrderedReadytemplate:metadata:labels:app: zkspec:affinity:podAntiAffinity:requiredDuringSchedulingIgnoredDuringExecution:- labelSelector:matchExpressions:- key: appoperator: Invalues:- zktopologyKey: kubernetes.io/hostnamecontainers:- name: kubernetes-zookeeperimagePullPolicy: IfNotPresentimage: zhaoguanghui6/kubernetes-zookeeper:1.0-3.4.10resources:requests:memory: 0.5Gicpu: 0.5ports:- containerPort: 2181name: client- containerPort: 2888name: server- containerPort: 3888name: leader-electioncommand:- sh- -c- start-zookeeper \--servers3 \--data_dir/var/lib/zookeeper/data \--data_log_dir/var/lib/zookeeper/data/log \--conf_dir/opt/zookeeper/conf \--client_port2181 \--election_port3888 \--server_port2888 \--tick_time2000 \--init_limit10 \--sync_limit5 \--heap512M \--max_client_cnxns60 \--snap_retain_count3 \--purge_interval12 \--max_session_timeout40000 \--min_session_timeout4000 \--log_levelINFOreadinessProbe:exec:command:- sh- -c- zookeeper-ready 2181initialDelaySeconds: 10timeoutSeconds: 5livenessProbe:exec:command:- sh- -c- zookeeper-ready 2181initialDelaySeconds: 10timeoutSeconds: 5volumeMounts:- name: datadirmountPath: /var/lib/zookeepersecurityContext:runAsUser: 1000fsGroup: 1000volumeClaimTemplates:- metadata:name: datadirspec:storageClassName: nfs-clientaccessModes: [ ReadWriteOnce ]resources:requests:storage: 1Gifor i in 0 1 2; do kubectl exec zk-$i – hostname -f; done zk-0.zk-headless.default.svc.cluster.local
zk-1.zk-headless.default.svc.cluster.local
zk-2.zk-headless.default.svc.cluster.local kafka.yaml
---
apiVersion: v1
kind: Service
metadata:name: kafka-hslabels:app: kafka
spec:ports:- port: 9092name: serverclusterIP: Noneselector:app: kafka
---
apiVersion: v1
kind: Service
metadata:name: kafka-cslabels:app: kafka
spec:selector:app: kafkatype: NodePortports:- name: clientport: 9092nodePort: 30092
---
apiVersion: policy/v1beta1
kind: PodDisruptionBudget
metadata:name: kafka-pdb
spec:selector:matchLabels:app: kafkaminAvailable: 1
---
apiVersion: apps/v1
kind: StatefulSet
metadata:name: kafka
spec:serviceName: kafka-hsreplicas: 3selector:matchLabels:app: kafkatemplate:metadata:labels:app: kafkaspec:affinity:podAntiAffinity:requiredDuringSchedulingIgnoredDuringExecution:- labelSelector:matchExpressions:- key: appoperator: Invalues:- kafkatopologyKey: kubernetes.io/hostnamepodAffinity:preferredDuringSchedulingIgnoredDuringExecution:- weight: 1podAffinityTerm:labelSelector:matchExpressions:- key: appoperator: Invalues:- zktopologyKey: kubernetes.io/hostnameterminationGracePeriodSeconds: 300containers:- name: kafkaimagePullPolicy: IfNotPresentimage: registry.cn-hangzhou.aliyuncs.com/jaxzhai/k8skafka:v1resources:requests:memory: 1Gicpu: 500mports:- containerPort: 9092name: servercommand:- sh- -c- exec kafka-server-start.sh /opt/kafka/config/server.properties --override broker.id${HOSTNAME##*-} --override listenersPLAINTEXT://:9092 --override zookeeper.connectzk-0.zk-hs.default.svc.cluster.local:2181,zk-0.zk-hs.default.svc.cluster.local:2181,zk-0.zk-hs.default.svc.cluster.local:2181 --override log.dir/var/lib/kafka --override auto.create.topics.enabletrue --override auto.leader.rebalance.enabletrue --override background.threads10 --override compression.typeproducer --override delete.topic.enabletrue --override leader.imbalance.check.interval.seconds300 --override leader.imbalance.per.broker.percentage10 --override log.flush.interval.messages9223372036854775807 --override log.flush.offset.checkpoint.interval.ms60000 --override log.flush.scheduler.interval.ms9223372036854775807 --override log.retention.bytes-1 --override log.retention.hours168 --override log.roll.hours168 --override log.roll.jitter.hours0 --override log.segment.bytes1073741824 --override log.segment.delete.delay.ms60000 --override message.max.bytes1000012 --override min.insync.replicas1 --override num.io.threads8 --override num.network.threads3 --override num.recovery.threads.per.data.dir1 --override num.replica.fetchers1 --override offset.metadata.max.bytes4096 --override offsets.commit.required.acks-1 --override offsets.commit.timeout.ms5000 --override offsets.load.buffer.size5242880 --override offsets.retention.check.interval.ms600000 --override offsets.retention.minutes1440 --override offsets.topic.compression.codec0 --override offsets.topic.num.partitions50 --override offsets.topic.replication.factor3 --override offsets.topic.segment.bytes104857600 --override queued.max.requests500 --override quota.consumer.default9223372036854775807 --override quota.producer.default9223372036854775807 --override replica.fetch.min.bytes1 --override replica.fetch.wait.max.ms500 --override replica.high.watermark.checkpoint.interval.ms5000 --override replica.lag.time.max.ms10000 --override replica.socket.receive.buffer.bytes65536 --override replica.socket.timeout.ms30000 --override request.timeout.ms30000 --override socket.receive.buffer.bytes102400 --override socket.request.max.bytes104857600 --override socket.send.buffer.bytes102400 --override unclean.leader.election.enabletrue --override zookeeper.session.timeout.ms6000 --override zookeeper.set.aclfalse --override broker.id.generation.enabletrue --override connections.max.idle.ms600000 --override controlled.shutdown.enabletrue --override controlled.shutdown.max.retries3 --override controlled.shutdown.retry.backoff.ms5000 --override controller.socket.timeout.ms30000 --override default.replication.factor1 --override fetch.purgatory.purge.interval.requests1000 --override group.max.session.timeout.ms300000 --override group.min.session.timeout.ms6000 --override inter.broker.protocol.version0.10.2-IV0 --override log.cleaner.backoff.ms15000 --override log.cleaner.dedupe.buffer.size134217728 --override log.cleaner.delete.retention.ms86400000 --override log.cleaner.enabletrue --override log.cleaner.io.buffer.load.factor0.9 --override log.cleaner.io.buffer.size524288 --override log.cleaner.io.max.bytes.per.second1.7976931348623157E308 --override log.cleaner.min.cleanable.ratio0.5 --override log.cleaner.min.compaction.lag.ms0 --override log.cleaner.threads1 --override log.cleanup.policydelete --override log.index.interval.bytes4096 --override log.index.size.max.bytes10485760 --override log.message.timestamp.difference.max.ms9223372036854775807 --override log.message.timestamp.typeCreateTime --override log.preallocatefalse --override log.retention.check.interval.ms300000 --override max.connections.per.ip2147483647 --override num.partitions1 --override producer.purgatory.purge.interval.requests1000 --override replica.fetch.backoff.ms1000 --override replica.fetch.max.bytes1048576 --override replica.fetch.response.max.bytes10485760 --override reserved.broker.max.id1000 env:- name: KAFKA_HEAP_OPTSvalue : -Xmx1G -Xms1G- name: KAFKA_OPTSvalue: -Dlogging.levelINFOvolumeMounts:- name: datadirmountPath: /var/lib/kafkareadinessProbe:exec:command:- sh- -c- /opt/kafka/bin/kafka-broker-api-versions.sh --bootstrap-serverlocalhost:9092timeoutSeconds: 5periodSeconds: 5initialDelaySeconds: 70securityContext:runAsUser: 1000fsGroup: 1000volumeClaimTemplates:- metadata:name: datadirannotations:volume.beta.kubernetes.io/storage-class: nfs-clientspec:accessModes: [ ReadWriteMany ]resources:requests:storage: 5Gi四、验证集群 验证kafka是否可用
1、进入kafka-0命令: kubectl exec -it kafka-0 bash 进入容器目录cd /opt/kafka/config
2、创建一个名为aaa的topc命令kafka-topics.sh --create --topic aaa --zookeeper zk-0.zk-headless.default.svc.cluster.local:2181,zk-1.zk-headless.default.svc.cluster.local:2181,zk-2.zk-headless.default.svc.cluster.local:2181 --partitions 3 --replication-factor 2 结果为 Created topic “aaa”.
3、进入topic为aaa的生产者消息中心kafka-console-consumer.sh --topic aaa --bootstrap-server localhost:9092
4、复制新的会话进入另一个容器kafka-1kubectl exec -it kafka-1 bash
进入消费者输入命令kafka-console-producer.sh --topic aaa --broker-list localhost:9092
输入:
hello
i lovle you
回车后可在生产者消息中心看到消息
最新文章链接含镜像制作v3.5.2