当前位置: 首页 > news >正文

郯城地建设局网站搜索引擎优化要考虑哪些方面

郯城地建设局网站,搜索引擎优化要考虑哪些方面,长沙房地产交易中心,织梦做的网站怎么样文章目录 概念部署方案磁盘网络CPUpartition的数量 命令查看版本找kafka和zookeeper的ip/porttopic创建 topic查看get topic 列表get topic 详情 修改topic修改分区级别参数(如增加partition) 删除topic设置消息大小上限 生产查看生产生产消息 查看消费server 查看 offset查看积… 文章目录 概念部署方案磁盘网络CPUpartition的数量 命令查看版本找kafka和zookeeper的ip/porttopic创建 topic查看get topic 列表get topic 详情 修改topic修改分区级别参数(如增加partition) 删除topic设置消息大小上限 生产查看生产生产消息 查看消费server 查看 offset查看积压server client has run out of available brokers to talk to (Is your cluster reachable?)报错的调试 原理UI 工具go sarama库使用consumer 概念 Kafka 是消息引擎Messaging System其是一组规范。企业利用这组规范在不同系统之间传递语义准确的消息实现松耦合的异步式数据传递。 实现的目标就是 系统A 发消息给 消息引擎系统B 从消息引擎读取 A发送的消息。 Kafka 用二进制存储数据。其同时支持点对点模型、发布/订阅模型两种。 点对点模型也叫消息队列模型。如果拿上面那个“民间版”的定义来说那么系统 A 发送的消息只能被系统 B 接收其他任何系统都不能读取 A 发送的消息。日常生活的例子比如电话客服就属于这种模型同一个客户呼入电话只能被一位客服人员处理第二个客服人员不能为该客户服务。发布 / 订阅模型与上面不同的是它有一个主题Topic的概念你可以理解成逻辑语义相近的消息容器。该模型也有发送方和接收方只不过提法不同。发送方也称为发布者Publisher接收方称为订阅者Subscriber。和点对点模型不同的是这个模型可能存在多个发布者向相同的主题发送消息而订阅者也可能存在多个它们都能接收到相同主题的消息。生活中的报纸订阅就是一种典型的发布 / 订阅模型。 消息引擎的作用 削峰填谷缓冲上下游瞬时突发流量使其更平滑。特别是对于那种发送能力很强的上游系统如果没有消息引擎的保护“脆弱”的下游系统可能会直接被压垮导致全链路服务“雪崩”。但是一旦有了消息引擎它能够有效地对抗上游的流量冲击真正做到将上游的“峰”填满到“谷”中避免了流量的震荡。发送方和接收方的松耦合这也在一定程度上简化了应用的开发减少了系统间不必要的交互。 名词术语如下 消息Record。Kafka 是消息引擎嘛这里的消息就是指 Kafka 处理的主要对象。主题Topic。主题是承载消息的逻辑容器在实际使用中多用来区分具体的业务。分区Partition。一个有序不变的消息序列。每个主题下可以有多个分区。消息位移Offset。表示分区中每条消息的位置信息是一个单调递增且不变的值。副本Replica。Kafka 中同一条消息能够被拷贝到多个地方以提供数据冗余这些地方就是所谓的副本。副本还分为领导者副本和追随者副本各自有不同的角色划分。副本是在分区层级下的即每个分区可配置多个副本实现高可用。生产者Producer。向主题发布新消息的应用程序。消费者Consumer。从主题订阅新消息的应用程序。消费者位移Consumer Offset。表征消费者消费进度每个消费者都有自己的消费者位移。消费者组Consumer Group。多个消费者实例共同组成的一个组同时消费多个分区以实现高吞吐。重平衡Rebalance。消费者组内某个消费者实例挂掉后其他消费者实例自动重新分配订阅主题分区的过程。Rebalance 是 Kafka 消费者端实现高可用的重要手段。 Kafka的副本为何不允许对外提供服务 如果允许follower副本对外提供读服务主写从读首先会存在数据一致性的问题消息从主节点同步到从节点需要时间可能造成主从节点的数据不一致。主写从读无非就是为了减轻leader节点的压力将读请求的负载均衡到follower节点如果Kafka的分区相对均匀地分散到各个broker上同样可以达到负载均衡的效果没必要刻意实现主写从读增加代码实现的复杂程度 Consumer Group 在一个消费者组下一个分区只能被一个消费者消费但一个消费者可能被分配多个分区因而在提交位移时也就能提交多个分区的位移。如果Consumer Group内 consumer的数量 partition 的数量则有一个消费者将无法分配到任何分区处于idle状态。 Producer 如果producer指定了要发送的目标分区消息自然是去到那个分区否则就按照producer端参数partitioner.class指定的分区策略来定如果你没有指定过partitioner.class那么默认的规则是看消息是否有key如果有则计算key的murmur2哈希值%topic分区数如果没有key按照轮询的方式确定分区。 监控 JMXTrans InfluxDB Grafana推荐Kafka managerkafka eagle 部署方案 磁盘 根据保留的消息数量预估磁盘占用 新增消息数消息留存时间平均消息大小备份数是否启用压缩 假设你所在公司有个业务每天需要向 Kafka 集群发送 1 亿条消息每条消息保存两份以防止数据丢失另外消息默认保存两周时间。现在假设消息的平均大小是 1KB那么你能说出你的 Kafka 集群需要为这个业务预留多少磁盘空间吗我们来计算一下每天 1 亿条 1KB 大小的消息保存两份且留存两周的时间那么总的空间大小就等于 1 亿 * 1KB * 2 / 1000 / 1000 200GB。一般情况下 Kafka 集群除了消息数据还有其他类型的数据比如索引数据等故我们再为这些数据预留出 10% 的磁盘空间因此总的存储容量就是 220GB。既然要保存两周那么整体容量即为 220GB * 14大约 3TB 左右。Kafka 支持数据的压缩假设压缩比是 0.75那么最后你需要规划的存储空间就是 0.75 * 3 2.25TB。网络 根据QPS和带宽预估服务器数量注意业界带宽资源一般用Mbps而不是MBps衡量 假设你公司的机房环境是千兆网络即 1Gbps现在你有个业务其业务目标或 SLA 是在 1 小时内处理 1TB 的业务数据。那么问题来了你到底需要多少台 Kafka 服务器来完成这个业务呢让我们来计算一下由于带宽是 1Gbps即每秒处理 1Gb 的数据假设每台 Kafka 服务器都是安装在专属的机器上也就是说每台 Kafka 机器上没有混布其他服务毕竟真实环境中不建议这么做。通常情况下你只能假设 Kafka 会用到 70% 的带宽资源因为总要为其他应用或进程留一些资源。根据实际使用经验超过 70% 的阈值就有网络丢包的可能性了故 70% 的设定是一个比较合理的值也就是说单台 Kafka 服务器最多也就能使用大约 700Mb 的带宽资源。稍等这只是它能使用的最大带宽资源你不能让 Kafka 服务器常规性使用这么多资源故通常要再额外预留出 2/3 的资源即单台服务器使用带宽 700Mb / 3 ≈ 240Mbps。需要提示的是这里的 2/3 其实是相当保守的你可以结合你自己机器的使用情况酌情减少此值。好了有了 240Mbps我们就可以计算 1 小时内处理 1TB 数据所需的服务器数量了。根据这个目标我们每秒需要处理 2336Mb 的数据除以 240约等于 10 台服务器。如果消息还需要额外复制两份那么总的服务器台数还要乘以 3即 30 台。CPU 通常情况下Kafka不太占用CPU因此没有这方面的最佳实践出来。但有些情况下Kafka broker是很耗CPU的 server和client使用了不同的压缩算法server和client版本不一致造成消息格式转换3broker端解压缩校验 不过相比带宽资源CPU通常都不是瓶颈 partition的数量 网上有一些分区制定的建议我觉得这个粗粒度的方法就很好值得一试 首先你要确定你业务的SLA比如说你希望你的producer TPS是10万条消息/秒假设是T1在你的真实环境中创建一个单分区的topic测试一下TPS假设是T2你需要的分区数大致可以等于T1 / T2 命令 2.2以上用–bootstrap-server, 2.2以下用–zookeeper 查看版本 cd kafka/libs 其中有kafka_2.12-2.8.0.jar则版本为2.8.0找kafka和zookeeper的ip/port rootmaster~# kubectl get svc -n kafka -o wide NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE SELECTOR bootstrap ClusterIP 10.109.83.55 none 9092/TCP 15d appkafka broker ClusterIP None none 9092/TCP 15d appkafka outside-0 NodePort 10.108.34.0 none 32400:32400/TCP 15d appkafka,kafka-broker-id0 outside-1 NodePort 10.104.65.215 none 32401:32401/TCP 15d appkafka,kafka-broker-id1 outside-2 NodePort 10.99.118.241 none 32402:32402/TCP 15d appkafka,kafka-broker-id2 pzoo ClusterIP None none 2888/TCP,3888/TCP 15d appzookeeper,storagepersistent zoo ClusterIP None none 2888/TCP,3888/TCP 15d appzookeeper,storagepersistent-regional zookeeper ClusterIP 10.103.22.130 none 2181/TCP 15d appzookeepertopic 如果遇到client向某些topic建立producer时报错kafka: client has run out of available brokers to talk to (Is your cluster reachable?), 可以手动删掉topic再手动重建topic 创建 topic ./kafka-topics.sh --zookeeper 10.103.22.130:2181 --create --topic ttt1 --partitions 1 --replication-factor 1查看 get topic 列表 ./kafka-topics.sh --list --bootstrap-server 192.168.2.165:9092./kafka-topics.sh --zookeeper 10.103.22.130:2181/kafka --list# out __consumer_offsets topic1 kafka-image-topic2get topic 详情 ./kafka-topics.sh --zookeeper 10.103.22.130:2181 --describe --topic topic-a修改topic 修改分区级别参数(如增加partition) ./kafka-topics.sh --zookeeper 10.103.22.130:2181 --alter --topic ttt1 --partitions 2## out WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected Adding partitions succeeded删除topic 前提是把kafka配置文件server.properties中的delete.topic.enable设置为true ./kafka-topics.sh --zookeeper 192.168.2.165:2181 --delete --topic topic_name设置消息大小上限 topic级别静态参数用–zookeeper, 动态参数才用–bootstrap-server ./kafka-configs.sh --zookeeper 10.103.22.130:2181 --entity-type topics --entity-name ttt1 --alter --add-config max.message.bytes10485760## out Completed Updating config for entity: topic ttt1.生产 查看生产 ./kafka-console-producer.sh --broker-list 192.168.2.158:9092 --topic topic-a生产消息 ./kafka-console-producer.sh --topic topic-a --bootstrap-server broker:9092进站去重车数据如下{Ts:1677507305663,Data:99990000,Info:1080}} 参考 查看消费 server # 输入./kafka-console-consumer.sh --bootstrap-server 192.168.2.111:9092 --topic topic-a 或 ./kafka-console-consumer.sh --zookeeper 127.0.0.1:2181/kafka --topic topic-a./kafka-console-consumer.sh --bootstrap-server 192.168.2.142:32400 --topic tttbash zk启动 brew services start zookeeper或zkServer start /usr/local/Cellar/kafka/2.4.0/libexec/bin查看 offset 如果希望根据时间找offset可以有如下方法 找到指定时间的 *.index 和 *.log 文件文件名即为 offset。 -rw-r--r-- 1 root root 43 Nov 28 2021 partition.metadata -rw-r--r-- 1 root root 10 Nov 8 12:30 00000000000000380329.snapshot -rw-r--r-- 1 root root 10485756 Nov 8 12:31 00000000000000380329.timeindex -rw-r--r-- 1 root root 10485760 Nov 8 12:31 00000000000000380329.index drwxr-xr-x 2 root root 4096 Nov 8 12:31 ./ -rw-r--r-- 1 root root 3622 Nov 8 12:33 00000000000000380329.log drwxr-xr-x 77 root root 4096 Nov 8 12:34 ../查看积压 server # 输入 cd ~/deep/kafka/kafka/bin watch -n 1 ./kafka-consumer-groups.sh --bootstrap-server 192.168.2.111:9092 --describe --group topic-a# 输出其中lag有值表示积压。 Note: This will only show information about consumers that use the Java consumer API (non-ZooKeeper-based consumers). TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID topic-a 0 159955 159955 0 UBUNTU.local-61e8b3d0-1456-49ce-8656-fe18cab4026a client has run out of available brokers to talk to (Is your cluster reachable?)报错的调试 kafka基于zookeeper做一致性校验一个topic必须对应一个broker才行如果importer出现报错 [ERROR] kafka %v create producer failedclient has run out of available brokers to talk to (Is your cluster reachable?) 的话可能是kafka有问题可能是因为offset不一致导致紊乱比较暴力的方式是删掉zookeeper和kafka的日志。 原理 producer》分为多个partition存例如分为p1、p2、p3…p10)》每个consumer分别从一个不同的partition读consumer1读p1的话consumer2就不能读p1了。 UI 工具 Kafka Offset Explorer 支持Mac、Win、Linux go sarama库使用 Shopify/sarama库 consumer 可以用 NewConsumerGroup() 创建也可以先 NewClient() 再 NewConsumerGroupFromClient() 创建。 我们需要实现 Setup()、ConsumeClaim()、CleanUp() 三个回调函数sarama 库会调度上述函数。 如果需要重置 Offset可以在 Setup() 内通过 ResetOffset() 实现。 完整代码如下 package kafkaimport (contextgithub.com/Shopify/saramalog github.com/siruspen/logruslonk/configsstrconvtime )func StartConsumerGroup(ctx context.Context, conf *configs.KafkaInputConfig, msgHandler KafkaMsgHandler) {cli, err : newConsumerGroup(conf) // 新建一个 client 实例if err ! nil {log.Fatalf([newConsumerGroup] conf: %v, err: %v, conf, err)}k : kafkaConsumerGroup{msgHandler: msgHandler,ready: make(chan bool, 0), // 标识 consumer 是否 readypartitionInitialOffsetMap: conf.PartitionInitialOffsetMap,}go func() {defer cli.Close()for {// Consume().newSession().newConsumerGroupSession() 先调用 Setup(); 再开多个协程(每个协程都用for循环持续调用consume().ConsumeClaim()来处理消息); Consume() 内部的 -sess.ctx.Done() 会阻塞if err : cli.Consume(ctx, []string{conf.Topic}, k); err ! nil {log.Errorln(Error from consumer, err)}if ctx.Err() ! nil { // 若 ctx.cancel() 而会引发 cli.Consume() 内部对 ctx.Done() 的监听从而结束 cli.Consume() 的阻塞, 并return}k.ready make(chan bool, 0) // 当 rebalanced 时 cli.Consume() 会退出且 ctx.Err() nil, 则重置此 channel, 继续在下一轮 for 循环调用 Consume()}}()-k.ready // 直到 close(consumer.ready) 解除阻塞log.Infoln(Sarama consumer up and running!...) }func newConsumerGroup(conf *configs.KafkaInputConfig) (sarama.ConsumerGroup, error) {sConf : sarama.NewConfig()sConf.Version sarama.V2_8_0_0sConf.Consumer.Offsets.Initial sarama.OffsetOldestsConf.Consumer.Offsets.Retention 7 * 24 * time.Hourcli, err : sarama.NewClient(conf.Brokers, sConf)if err ! nil {log.Fatalf([NewClient] conf: %v, err: %v, sConf, err)}consumerGroup, err : sarama.NewConsumerGroupFromClient(conf.Group, cli)if err ! nil {log.Fatalf([NewConsumerGroupFromClient] conf: %v, err: %v, sConf, err)}return consumerGroup, nil }// Consumer represents a Sarama consumer group consumer type kafkaConsumerGroup struct {msgHandler func(message *sarama.ConsumerMessage)ready chan boolpartitionInitialOffsetMap map[string]int64 }// Setup 回调函数 is run at the beginning of a new session, before ConsumeClaim func (k *kafkaConsumerGroup) Setup(session sarama.ConsumerGroupSession) error {for topic, partitions : range session.Claims() {for _, partition : range partitions {initialOffset, ok : k.partitionInitialOffsetMap[strconv.Itoa(int(partition))]if !ok {log.Fatalf(invalid topic:%v, partition: %v, topic, partition)}log.Infof(Sarama Consumer is resetting offset to %v:%v:%v, topic, partition, initialOffset)session.ResetOffset(topic, partition, initialOffset, )}}close(k.ready) // 启动后此处会标记 ready 使 StartKafkaConsumer() 不再阻塞return nil }// Cleanup 回调函数 is run at the end of a session, once all ConsumeClaim goroutines have exited func (k *kafkaConsumerGroup) Cleanup(sarama.ConsumerGroupSession) error {log.Infoln(Sarama Consumer is cleaning up!...)return nil }// ConsumeClaim 回调函数 must start a consumer loop of ConsumerGroupClaims Messages(). func (k *kafkaConsumerGroup) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {// NOTE: Do not move the code below to a goroutine. The ConsumeClaim itself is called within a goroutine, see: https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29for {select {case message : -claim.Messages():k.msgHandler(message)session.MarkMessage(message, )// Should return when session.Context() is done. If not, will raise ErrRebalanceInProgress or read tcp ip:port: i/o timeout when kafka rebalance. see: https://github.com/Shopify/sarama/issues/1192case -session.Context().Done():return nil}} }参考官方 consumerGroup 的 example 参考sarama consumer group 的使用 参考sarama partition consumer 根据 time 指定 offset
http://www.hkea.cn/news/14303832/

相关文章:

  • 江苏盐城有做淘宝网站的吗开发公司五一节前安全生产工作部署会
  • 厦门网站制作推广网站设计方案书
  • 厦门建设工程招标中心的网站有域名有服务器如何做网站
  • 商业网站如何备案公司网页设计
  • 专业的金融行业网站开发天猫商城支付方式
  • adsense用什么网站做wordpress 多站点 固定链接
  • 电子商务网站建设及维护管理配音阁在线制作网站
  • 青岛企业网站制作怎么做监测网站的浏览量
  • 宁波做网站软件wordpress 搜索出图片
  • 淄博网站建设邹城网站定制
  • 农庄网站wordpress park主题
  • 一家专门做原型的网站龙岗建设企业网站
  • 锦州网站seo门户网站规划方案
  • 河南网站营销seo电话必应搜索引擎入口官网
  • 如何制作自己的网站上海网站建设沪icp备
  • 知名高端网站建设服务商江门网站优化方案
  • 湖北网站推广方案photoshop网站设计
  • 长安英文网站建设网站空间租用哪家好
  • 支付宝手机网站支付自贡建设网站
  • 网站策划书案例wordpress添加小工具
  • 网站建设 添加本地视频教程百度收录网站但首页快照不更新
  • 如何建立公司网站网页厦门 网站建设 网站开发
  • html5网站有哪些软件定制一般多少钱
  • 广州各类外贸网站网络设计涉及到的核心标准是
  • 专业建站网网站运营推广网站建设asp文件怎么展现
  • 淘客网站如何建设自己数据库黄村做网站的公司
  • 长春南京小学网站建设江苏金地建设工程有限公司网站
  • 广东网站优化抖音代运营成功案例
  • 河南建设网站信息查询中心深圳宝安天气
  • 滨海新区建设和交通局网站做中医诊所网站