为了保证正常业务不受影响,集群在启用安全功能的过程中还可正常提供服务,可以使用多阶段,多端口,多协议的方案。
本方案以confluentinc/cp-kafka:5.2.1
版本进行试验,SASL 机制选择:SASL/SCRAM
特此说明。SCRAM 全称为 Salted Challenge Response Authentication Mechanism。
Kafka 支持如下 SASL 机制:
SASL 机制 | Kafka 版本 | 特点 |
---|---|---|
SASL/OAUTHBEARER | 2.0.0 | 需自己实现接口实现 token 的创建和验证,需要额外 Oauth 服务 |
SASL/Kerberos | 0.9.0.0 | 需要独立部署验证服务 |
SASL/PLAIN | 0.10.0.0 | 不能动态增加用户 |
SASL/SCRAM | 0.10.2.0 | 可以动态增加用户 |
kafka-configs --zookeeper xx.xx.xx.xx:2181 --alter --add-config 'SCRAM-SHA-256=[password=admin]' --entity-type users --entity-name admin# 查看admin的SCRAM证书kafka-configs --zookeeper xx.xx.xx.xx:2181 --describe --entity-type users --entity-name admin
当然也可以只用--bootstrap-server
来创建证书。
kafka_server_jaas.conf
KafkaServer { org.apache.kafka.common.security.scram.ScramLoginModule requiredusername="admin"password="admin";};
在 docker volumes 中添加该文件
/opt/app/cp-kafka-5.2.1/secrets:/etc/kafka/secrets
KAFKA_OPTS=-Djava.security.auth.login.config=/etc/kafka/secrets/kafka_server_jaas.confKAFKA_LISTENERS=PLAINTEXT://{ip}:9092,SASL_PLAINTEXT://{ip}:9093KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://{ip}:9092,SASL_PLAINTEXT://{ip}:9093KAFKA_SASL_ENABLED_MECHANISMS=SCRAM-SHA-256KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL=SCRAM-SHA-256KAFKA_AUTHORIZER_CLASS_NAME=kafka.security.auth.SimpleAclAuthorizerKAFKA_SUPER_USERS=User:admin
在LISTENERS
和ADVERTISED_LISTENERS
我们保持 PLAINTEXT 和 SASL_PLAINTEXT 都存在。这样就可以保持在过度期间,不会影响 client 未启用安全的服务。但最后我们是需要删掉 PLAINTEXT。
其他参数是因为要使用 SASL/SCRAM 和 ACLs 增加的配置。
除了以上参数,在 Confluent Kafka 版本还需要添加如下参数:
KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE=falseKAFKA_ALLOW.EVERYONE.IF.NO.ACL.FOUND=true
本次方案没有开启 zookeeper 安全验证,因此也需要增加禁用 zookeeper SASL 功能,官方中推荐使用网略隔离策略来保证 zookeeper 安全性,当然也可以采用mutual TLS
,本次方案未考虑。
ZOOKEEPER_SASL_ENABLED=false
然后依次重启 Kafka broker,直至集群整体都启用安全验证。
此步为客户端修改配置为安全验证,在实践过程中,也可以延期做。
一般是通过 Properties 增加来启用安全配置。
Producer
public void producer() throws ExecutionException, InterruptedException, IOException {Properties props = new Properties();props.setProperty("bootstrap.servers", "xxx.xxx.xxx.xxx:9093");props.setProperty("key.serializer", StringSerializer.class.getName());props.setProperty("value.serializer", StringSerializer.class.getName());props.setProperty("security.protocol", "SASL_PLAINTEXT");props.setProperty("sasl.mechanism", "SCRAM-SHA-256");props.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"user\" password=\"user\";");KafkaProducerTemplate<String, String> producer = new KafkaProducerTemplate<>(props);while (true){producer.send("truman_test",System.currentTimeMillis()+"",System.currentTimeMillis()+"");Thread.sleep(1000);}}
Consumer:
public void consumer() {Properties props = new Properties();props.setProperty("bootstrap.servers", "xxx.xxx.xxx.xxx:9093");props.setProperty("group.id", "test");props.setProperty("enable.auto.commit", "true");props.setProperty("auto.commit.interval.ms", "1000");props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.setProperty("security.protocol", "SASL_PLAINTEXT");props.setProperty("sasl.mechanism", "SCRAM-SHA-256");props.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"user\" password=\"user\";");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList("truman_test"));while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());}}}
这步在实践中也是可以省略的,不过为了安全,推荐启用该配置
在 Env 中增加KAFKA_SECURITY_INTER_BROKER_PROTOCOL=SASL_PLAINTEXT
,然后再次挨个重启 broker.
修改 Env 中LISTENERS
和ADVERTISED_LISTENERS
,移除其中的 PLAINTEXT,挨个重启 Broker,即可完成在集群中启用安全配置。
KAFKA_LISTENERS=SASL_PLAINTEXT://{ip}:9093KAFKA_ADVERTISED_LISTENERS=SASL_PLAINTEXT://{ip}:9093
创建
kafka-configs --zookeeper x.x.x.x:2181 --alter --add-config 'SCRAM-SHA-256=[password=user]' --entity-type users --entity-name user
Kafka 授权原语主要有以下几种:
限制资源:
概括来说,通过 Acls 可以给指定用户,指定机器 host 赋予限制资源的操作(授权原语)。资源的匹配模式支持三种:1.literal(默认)2.match 3.prefixed
host 可以是白名单,也可以是黑名单。
#查询所有的topickafka-acls --authorizer-properties zookeeper.connect=x.x.x.x:2181 --list --topic *# 查询指定topickafka-acls --authorizer-properties zookeeper.connect=x.x.x.x:2181 --list --topic truman_test
kafka-acls --authorizer-properties zookeeper.connect=x.x.x.x:2181 --add --allow-principal User:user --producer --topic truman_testkafka-acls --authorizer-properties zookeeper.connect=x.x.x.x:2181 --add --allow-principal User:user --consumer --topic truman_test --group test
如果需要删除的话,只用将以上命令的 add 修改为 remove。
kafka-acls --authorizer-properties zookeeper.connect=x.x.x.x:2181 --add --allow-principal User:xaecbd --producer --resource-pattern-type prefixed --topic trumankafka-acls --authorizer-properties zookeeper.connect=x.x.x.x:2181 --add --allow-principal User:xaecbd --consumer --resource-pattern-type prefixed --topic truman --group test
这里要主要的是按前缀方式设置的话,会导致该前缀生效配置 Acls,其他未安全验证的 client 都会受此影响,因此在迁移的过程中注意。资源的匹配模式支持三种:1.literal(默认,这种模式支持*,指匹配所有)2.match 3.prefixed
kafka-acls --authorizer-properties zookeeper.connect=x.x.x.x:2181 --add --allow-principal User:xaecbd --allow-host x.x.x.x --producer --topic truman_testkafka-acls --authorizer-properties zookeeper.connect=x.x.x.x:2181 --add --allow-principal User:xaecbd --allow-host x.x.x.x --consumer --topic truman_test --group test
除了白名单,还可以设置黑名单(deny-host
),该参数支持设置*
,不填写该参数的话,默认为*
代表所有,不能设置为x.x.x.*
除此以外还支持通过--operation
设置原语级。例如:--operation Read --operation Write
配额不属于安全范畴,启用安全策略以后,就可以通过设置 user 维度配额(不开启身份认证,只能通过 clientid 来进行限流),配额能限制生产者和消费者的流量。
#1. 配置user+clientid。例如,user为”user1”,clientid为”clientA”。kafka-configs --zookeeper localhost:2181 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048' \--entity-type users --entity-name user1 --entity-type clients --entity-name clientA#2. 配置user。例如,user为”user1”kafka-configs --zookeeper localhost:2181 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048' \--entity-type users --entity-name user1#3. 配置client-id。例如,client-id为”clientA”kafka-configs --zookeeper localhost:2181 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048' \--entity-type clients --entity-name clientA