赌求网站开发,wordpress修改主题教程,wordpress淘宝推广,百度竞价登录入口背景#xff1a;我之前安装的kafka没有开启安全鉴权#xff0c;在没有任何凭证的情况下都可以访问kafka。搜了一圈资料#xff0c;发现有关于sasl、acl相关的#xff0c;准备试试。
简介
Kafka是一个高吞吐量、分布式的发布-订阅消息系统。Kafka核心模块使用Scala语言开发…背景我之前安装的kafka没有开启安全鉴权在没有任何凭证的情况下都可以访问kafka。搜了一圈资料发现有关于sasl、acl相关的准备试试。
简介
Kafka是一个高吞吐量、分布式的发布-订阅消息系统。Kafka核心模块使用Scala语言开发支持多语言如Java、Python、Go等客户端它可以水平扩展和具有高吞吐量特性而被广泛使用并与多类开源分布式处理系统进行集成使用。 Kafka作为一款开源的、轻量级的、分布式、可分区和具备复制备份的、基于ZooKeeper协调管理的分布式流平台的功能强大的消息系统。与传统消息系统相比Kafka能够更好的处理活跃的流数据让数据在各个子系统中高性能、低延迟地不停流转。
自0.9.0.0版本开始Kafka社区添加了许多功能用于提高Kafka集群的安全性Kafka提供SSL或者SASL两种安全策略。SSL方式主要是通过CA令牌实现此方案主要介绍SASL方式。
SASL验证分类 验证方式 kafka版本 特点 SASL/PLAIN 0.10.0.0 不能动态添加用户 SASL/SCRAM 0.10.2.0 可以动态添加用户 SASL/Kerberos 0.9.0.0 需要独立部署验证服务 SASL/oauthbearer 2.0.0 需要自己实现接口实现token的创建和验证需要额外的oauth服务
使用SSL加密在代理和客户端之间代理之间或代理和工具之间传输的数据
SCRAM认证配置的优点: 如果使用PLAIN认证有个问题就是不能动态新增用户每次添加用户后需要重启正在运行的Kafka集群才能生效。 因此在生产环境中这种认证方式不符合实际业务场景不利于后期扩展。然而使用SCRAM认证可以动态新增用户添加用户后可以不用重启正在运行的Kafka集群即可进行鉴权。所以生产环境推荐使用SCRAMPLAIN搭配的认证方案。
配置zookeeper集群启用SASL
1. 配置zookeeper启用sasl认证cat zoo.cfg查看到如下内容
tickTime2000
initLimit1
syncLimit5
dataDir/tmp/zookeeper/data
dataLogDir/tmp/zookeeper/log
clientPort2181
admin.serverPort8888
maxClientCnxns3000
autopurge.snapRetainCount3
autopurge.purgeInterval24
server.1localhost:2888:3888
4lw.commands.whitelistconf,stat,srvr,mntr.envi
#zk SASL
authProvider.1org.apache.zookeeper.server.auth.SASLAuthenticationProvider
jaasLoginRenew3600000
requireClientAuthSchemesasl
zookeeper.sasl.clienttrue
2. 配置zookeeper JAAS
cat zk_jaas.conf文件内容如下如果没有改文件则使用vi命令编辑 Server {org.apache.zookeeper.server.auth.DigestLoginModule requiredusernameadminpasswordadmin123user_kafkakafka123;
};注意admin用户 是zk 集群之间使用的。kafka用户 是 broker 与 zk 之间使用的。
3. 修改zkEnv.sh
将上一步添加的 jaas 配置文件添加到zookeeper的环境变量中zkEnv.sh文件最后添加一行vim zkEnv.shZOOBINDIR${ZOOBINDIR:-/usr/bin}
ZOOKEEPER_PREFIX${ZOOBINDIR}/..# 添加如下 新增变量SERVER_JVMFLAGSexport SERVER_JVMFLAGS-Djava.security.auth.login.config../conf/zk_jaas.conf
配置kafka sasl动态认证
SASL/SCRAM认证是把凭证(credential)存储在Zookeeper使用kafka-configs.sh在Zookeeper中创建凭据。对于每个SCRAM机制必须添加具有机制名称的配置来创建凭证所以在启动Kafka broker之前需要创建代理间通信的凭据。这里配置的 Kafka和生产者/消费者之间 采用SASL/PLAIN和SASL/SCRAM两种方式共同完成认证授权使用ACL方式。PLAIN方式的用户是在jaas文件中写死的不能动态的添加SCRAM支持动态的添加用户。1. 创建用户
配置SASL/SCRAM认证的第一步是配置可以连接到kafka集群的用户。本案例创建了3个用户adminproducerconsumer。kafka_server_admin用户用于broker之间的认证通信producer用户用于生产者连接kafkaconsumer用户用于消费者连接kafka 。
./kafka-configs.sh --zookeeper localhost:2181 --alter --add-config SCRAM-SHA-256[iterations8192,passwordadmin123],SCRAM-SHA-512[passwordadmin123] --entity-type users --entity-name admin./kafka-configs.sh --zookeeper localhost:2181 --alter --add-config SCRAM-SHA-256[iterations8192,passwordadmin123],SCRAM-SHA-512[passwordadmin123] --entity-type users --entity-name producer./kafka-configs.sh --zookeeper localhost:2181 --alter --add-config SCRAM-SHA-256[iterations8192,passwordadmin123],SCRAM-SHA-512[passwordadmin123] --entity-type users --entity-name consumer 2. 查看创建的用户信息
kafka-configs 脚本是用来设置主题级别参数的。其实它的功能还有很多。比如在这个例子中我们使用它来创建 SASL/SCRAM 认证中的用户信息。可以使用下列命令来查看刚才创建的用户数据。
./kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users #可以单独指定某个用户 --entity-name producer如下
./kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users --entity-name producer ZK客户端命令行查看./zkCli.sh -server localhost:2181ls /config/users 3. 配置kafka jaas文件
配置了用户之后我们需要为 Broker 创建一个对应的 JAAS 文件。在实际场景中需要为每台单独的物理 Broker 机器都创建一份 JAAS 文件。
Kafka 的 jaas认证配置文件配置的是登录类超管密码和管理的帐号密码列表
vim kafka_server_jaas.conf
KafkaServer {org.apache.kafka.common.security.scram.ScramLoginModule requiredusername adminpasswordadmin123user_adminadmin123user_producerproducer123user_consumerconsumer123;
};
KafkaClient {org.apache.kafka.common.security.scram.ScramLoginModule requiredusernameadminpasswordadmin123user_producerproducer123user_consumerconsumer123;
};
Client {org.apache.kafka.common.security.scram.ScramLoginModule requiredusernamekafkapasswordkafka123;
};KafkaServer中usename配置的是kafka服务端使用的账号和密码后面的user_xxx事预设的普通帐号认证信息。 中间部分配置的是PLAIN认证方式的账户和密码其中producer1是账户名producer123是密码。 Client配置了broker到Zookeeper的连接用户名密码这里要和前面zookeeper配置中的zk_jaas.conf.conf 中 user_kafka 的账号和密码相同。 关于这个文件内容需要注意以下两点 1不要忘记最后一行和倒数第二行结尾处的分号 2JAAS 文件中不需要任何空格键。
4. kafka 配置文件启用SASL认证
Kafka 服务配置文件 server.propertis配置认证协议及认证实现类 cat server.properties其它内容都注释掉然后追加如下内容
broker.id0
listenersSASL_PLAINTEXT://:9092
advertised.listenersSASL_PLAINTEXT://localhost:9092
sasl.enabled.mechanismsSCRAM-SHA-256,PLAIN
sasl.mechanism.inter.broker.protocolSCRAM-SHA-256
security.inter.broker.protocolSASL_PLAINTEXT
allow.everyone.if.no.acl.foundfalse
authorizer.class.namekafka.security.auth.SimpleAclAuthorizer
super.usersUser:admin
num.network.threads3
num.io.threads8
socket.send.buffer.bytes102400
socket.receive.buffer.bytes102400
socket.request.max.bytes104857600
log.dirs/tmp/kafka/logs
num.partitions3
num.recovery.threads.per.data.dir1
offsets.topic.replication.factor2
transaction.state.log.replication.factor1
transaction.state.log.min.isr1
log.flush.interval.messages10000
log.flush.interval.ms1000
log.retention.hours168
log.retention.bytes1073741824
log.segment.bytes1073741824
log.retention.check.interval.ms300000
delete.topic.enabletrue
auto.create.topics.enablefalse
zookeeper.connectlocalhost:2181
zookeeper.connection.timeout.ms60000
group.initial.rebalance.delay.ms0Host.name43.138.0.199 5. kafka 启动脚本添加认证文件路径的环境变量
Kafka 安全认证可以直接通过环境变量 -Djava.security.auth.login.config 设置修改 Kafka 启动脚本 kafka-start-server.sh 文件最后一行增加一个参数指向 jaas 配置文件的绝对路径
vi kafka-server-start.sh
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config/home/lighthouse/kafka_2.12-2.2.1/config/kafka_server_jaas.conf kafka.Kafka $ 6. kafka客户端配置
1) 配置consumer.properties和producer.properties,都要加入以下配置
security.protocolSASL_PLAINTEXT
sasl.mechanismSCRAM-SHA-512
2) 生产者配置
使用kafka-console-producer.sh脚本测试生产者由于开启安全认证和授权此时使用console-producer脚本来尝试发送消息那么消息会发送失败原因是没有指定合法的认证用户因此客户端需要做相应的配置需要创建一个名为producer.conf的配置文件给producer程序使用。
config目录下创建一个producer.conf的文件cat producer.conf文件内容如下
security.protocolSASL_PLAINTEXT
sasl.mechanismSCRAM-SHA-256
sasl.jaas.configorg.apache.kafka.common.security.scram.Scra
mLoginModule required usernameproducer passwordproducer123;
注意Topic设置写权限
3) 消费者配置
使用kafka-console-consumer.sh脚本测试生产者由于开启安全认证和授权因此客户端需要做相应的配置。需要为 consumer 用户创建consumer.conf给消费者程序同时设置对topic的读权限。
config目录下创建一个consumer.conf的文件cat consumer.conf文件内容如下
security.protocolSASL_PLAINTEXT
sasl.mechanismSCRAM-SHA-256
sasl.jaas.configorg.apache.kafka.common.security.scram.Scra
mLoginModule required usernameconsumer passwordconsumer123;
注意Topic设置读权限。
4) 在生产者和消费者启动脚本中引入JAAS文件
vim bin/kafka-console-producer.shif [ x$KAFKA_HEAP_OPTS x ]; thenexport KAFKA_HEAP_OPTS-Xmx512M
fi# 添加这行
export KAFKA_OPTS-Djava.security.auth.login.config../config/kafka_server_jaas.confexec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleProducer $# vim bin/kafka-console-consumer.shif [ x$KAFKA_HEAP_OPTS x ]; thenexport KAFKA_HEAP_OPTS-Xmx512M
fi# 添加这行
export KAFKA_OPTS-Djava.security.auth.login.config../config/kafka_server_jaas.confexec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleConsumer $
启动kafka
nohup kafka-server-start.sh /path-to-kafka/config/server.properties
我自己写了一个脚本同时启动zookeeper和kafka
文件名称叫startup.sh内容如下
cd /home/lighthouse/zk-3.4.14/zookeeper-3.4.14/bin
./zkServer.sh start
cd /home/lighthouse/kafka_2.12-2.2.1/
nohup bin/kafka-server-start.sh config/server.properties output.txt
zookeeper没有启动成功我找下原因。把整个过程重新整理了一遍发现zookeeper、kafka启动成功了。 检查验证
./zkCli.sh -server localhost:2181ls /brokers/ids 1. 生产者测试
1 创建一个测试主题test_topic
./kafka-topics.sh --zookeeper localhost:2181 --create --replication-factor 1 --partitions 1 --topic test_topic 2查看创建的Topic
./kafka-topics.sh --list --zookeeper localhost:2181 test_topic ./kafka-topics.sh --describe --zookeeper localhost:2181 --topic test_topic 注kafka开启认证后当生产者往Topic写数据需要为主题配置权限write即对生产者赋予写的权限。
这里使用producer用户认证授权通过ACL为producer 用户分配操作test_topic权限
./kafka-acls.sh --authorizer-properties zookeeper.connectlocalhost:2181 --add --allow-principal User:producer --operation Write --topic test_topic
注意allow-后面不要换行(会报错)注意整段命令要放在一行上。 启动生产者发送消息
特别注意在生产者生产消息之前如果不设置生产者用户的ACL权限会报错:
./kafka-console-producer.sh --broker-list localhost:9092 --topic test_topic --producer.config ../config/producer.conf./kafka-console-producer.sh --broker-list localhost:9092 --topic test_topic --producer.config /home/lighthouse/kafka_2.12-2.2.1/config/producer.conf
使用相对路径和绝对路径都报错了错误相同 org.apache.kafka.common.KafkaException: Failed to construct kafka producerat org.apache.kafka.clients.producer.KafkaProducer.init(KafkaProducer.java:431)at org.apache.kafka.clients.producer.KafkaProducer.init(KafkaProducer.java:299)at kafka.tools.ConsoleProducer$.main(ConsoleProducer.scala:44)at kafka.tools.ConsoleProducer.main(ConsoleProducer.scala)
Caused by: java.lang.IllegalArgumentException: Login module control flag not specified in JAAS configat org.apache.kafka.common.security.JaasConfig.parseAppConfigurationEntry(JaasConfig.java:110)at org.apache.kafka.common.security.JaasConfig.init(JaasConfig.java:63)at org.apache.kafka.common.security.JaasContext.load(JaasContext.java:90)at org.apache.kafka.common.security.JaasContext.loadClientContext(JaasContext.java:84)at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:124)at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:67)at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:99)at org.apache.kafka.clients.producer.KafkaProducer.newSender(KafkaProducer.java:439)at org.apache.kafka.clients.producer.KafkaProducer.init(KafkaProducer.java:420)... 3 more