产线启用安全功能最佳实践方案

为了保证正常业务不受影响,集群在启用安全功能的过程中还可正常提供服务,可以使用多阶段,多端口,多协议的方案。

  • 以增量替换的方式添加额外的安全端口(s)。
  • 客户端使用安全的端口来连接,而不是 PLAINTEXT 端口的(假设你是客户端需要安全连接 broker)。
  • 再次增量的方式依次启用 broker 与 broker 之间的安全端口(如果需要)
  • 最后依次关闭 PLAINTEXT 端口。

本方案以confluentinc/cp-kafka:5.2.1 版本进行试验,SASL 机制选择:SASL/SCRAM 特此说明。SCRAM 全称为 Salted Challenge Response Authentication Mechanism。

Kafka 支持如下 SASL 机制:

SASL 机制Kafka 版本特点
SASL/OAUTHBEARER2.0.0需自己实现接口实现 token 的创建和验证,需要额外 Oauth 服务
SASL/Kerberos0.9.0.0需要独立部署验证服务
SASL/PLAIN0.10.0.0不能动态增加用户
SASL/SCRAM0.10.2.0可以动态增加用户

方案

增量方式添加额外安全端口

1.创建 admin SCRAM 证书

kafka-configs --zookeeper xx.xx.xx.xx:2181 --alter --add-config 'SCRAM-SHA-256=[password=admin]' --entity-type users --entity-name admin
# 查看admin的SCRAM证书
kafka-configs --zookeeper xx.xx.xx.xx:2181 --describe --entity-type users --entity-name admin

当然也可以只用--bootstrap-server来创建证书。

2.新建kafka_server_jaas.conf

KafkaServer { org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="admin";
};

在 docker volumes 中添加该文件

/opt/app/cp-kafka-5.2.1/secrets:/etc/kafka/secrets

3.在 Env 修改 Kafka 配置参数

KAFKA_OPTS=-Djava.security.auth.login.config=/etc/kafka/secrets/kafka_server_jaas.conf
KAFKA_LISTENERS=PLAINTEXT://{ip}:9092,SASL_PLAINTEXT://{ip}:9093
KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://{ip}:9092,SASL_PLAINTEXT://{ip}:9093
KAFKA_SASL_ENABLED_MECHANISMS=SCRAM-SHA-256
KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL=SCRAM-SHA-256
KAFKA_AUTHORIZER_CLASS_NAME=kafka.security.auth.SimpleAclAuthorizer
KAFKA_SUPER_USERS=User:admin

LISTENERSADVERTISED_LISTENERS 我们保持 PLAINTEXT 和 SASL_PLAINTEXT 都存在。这样就可以保持在过度期间,不会影响 client 未启用安全的服务。但最后我们是需要删掉 PLAINTEXT。

其他参数是因为要使用 SASL/SCRAM 和 ACLs 增加的配置。

除了以上参数,在 Confluent Kafka 版本还需要添加如下参数:

KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE=false
KAFKA_ALLOW.EVERYONE.IF.NO.ACL.FOUND=true

本次方案没有开启 zookeeper 安全验证,因此也需要增加禁用 zookeeper SASL 功能,官方中推荐使用网略隔离策略来保证 zookeeper 安全性,当然也可以采用mutual TLS,本次方案未考虑。

ZOOKEEPER_SASL_ENABLED=false

然后依次重启 Kafka broker,直至集群整体都启用安全验证。

客户端使用安全的端口来连接

此步为客户端修改配置为安全验证,在实践过程中,也可以延期做。

一般是通过 Properties 增加来启用安全配置。

Producer

public void producer() throws ExecutionException, InterruptedException, IOException {
Properties props = new Properties();
props.setProperty("bootstrap.servers", "xxx.xxx.xxx.xxx:9093");
props.setProperty("key.serializer", StringSerializer.class.getName());
props.setProperty("value.serializer", StringSerializer.class.getName());
props.setProperty("security.protocol", "SASL_PLAINTEXT");
props.setProperty("sasl.mechanism", "SCRAM-SHA-256");
props.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"user\" password=\"user\";");
KafkaProducerTemplate<String, String> producer = new KafkaProducerTemplate<>(props);
while (true){
producer.send("truman_test",System.currentTimeMillis()+"",System.currentTimeMillis()+"");
Thread.sleep(1000);
}
}

Consumer:

public void consumer() {
Properties props = new Properties();
props.setProperty("bootstrap.servers", "xxx.xxx.xxx.xxx:9093");
props.setProperty("group.id", "test");
props.setProperty("enable.auto.commit", "true");
props.setProperty("auto.commit.interval.ms", "1000");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("security.protocol", "SASL_PLAINTEXT");
props.setProperty("sasl.mechanism", "SCRAM-SHA-256");
props.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"user\" password=\"user\";");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("truman_test"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}

再次增量的方式依次启用 broker 与 broker 之间的安全端口

这步在实践中也是可以省略的,不过为了安全,推荐启用该配置

在 Env 中增加KAFKA_SECURITY_INTER_BROKER_PROTOCOL=SASL_PLAINTEXT,然后再次挨个重启 broker.

最后依次关闭 PLAINTEXT 端口

修改 Env 中LISTENERSADVERTISED_LISTENERS,移除其中的 PLAINTEXT,挨个重启 Broker,即可完成在集群中启用安全配置。

KAFKA_LISTENERS=SASL_PLAINTEXT://{ip}:9093
KAFKA_ADVERTISED_LISTENERS=SASL_PLAINTEXT://{ip}:9093

SCRAM 证书管理

创建

kafka-configs --zookeeper x.x.x.x:2181 --alter --add-config 'SCRAM-SHA-256=[password=user]' --entity-type users --entity-name user

Acls

Kafka 授权原语主要有以下几种:

  • Read
  • Write
  • Create
  • Delete
  • Alter
  • Describe
  • ClusterAction
  • DescribeConfigs
  • AlterConfigs
  • IdempotentWrite
  • All

限制资源:

  • Topic
  • Group
  • Cluster
  • TransactionalId
  • DelegationToken

概括来说,通过 Acls 可以给指定用户,指定机器 host 赋予限制资源的操作(授权原语)。资源的匹配模式支持三种:1.literal(默认)2.match 3.prefixed

host 可以是白名单,也可以是黑名单。

查看 Acls 信息

#查询所有的topic
kafka-acls --authorizer-properties zookeeper.connect=x.x.x.x:2181 --list --topic *
# 查询指定topic
kafka-acls --authorizer-properties zookeeper.connect=x.x.x.x:2181 --list --topic truman_test

增加 Producer 和 Consumer 权限

kafka-acls --authorizer-properties zookeeper.connect=x.x.x.x:2181 --add --allow-principal User:user --producer --topic truman_test
kafka-acls --authorizer-properties zookeeper.connect=x.x.x.x:2181 --add --allow-principal User:user --consumer --topic truman_test --group test

如果需要删除的话,只用将以上命令的 add 修改为 remove。

按指定前缀分配资源

kafka-acls --authorizer-properties zookeeper.connect=x.x.x.x:2181 --add --allow-principal User:xaecbd --producer --resource-pattern-type prefixed --topic truman
kafka-acls --authorizer-properties zookeeper.connect=x.x.x.x:2181 --add --allow-principal User:xaecbd --consumer --resource-pattern-type prefixed --topic truman --group test

这里要主要的是按前缀方式设置的话,会导致该前缀生效配置 Acls,其他未安全验证的 client 都会受此影响,因此在迁移的过程中注意。资源的匹配模式支持三种:1.literal(默认,这种模式支持*,指匹配所有)2.match 3.prefixed

限制 Host

kafka-acls --authorizer-properties zookeeper.connect=x.x.x.x:2181 --add --allow-principal User:xaecbd --allow-host x.x.x.x --producer --topic truman_test
kafka-acls --authorizer-properties zookeeper.connect=x.x.x.x:2181 --add --allow-principal User:xaecbd --allow-host x.x.x.x --consumer --topic truman_test --group test

除了白名单,还可以设置黑名单(deny-host),该参数支持设置*,不填写该参数的话,默认为*代表所有,不能设置为x.x.x.*

除此以外还支持通过--operation 设置原语级。例如:--operation Read --operation Write

配额

配额不属于安全范畴,启用安全策略以后,就可以通过设置 user 维度配额(不开启身份认证,只能通过 clientid 来进行限流),配额能限制生产者和消费者的流量。

  • producer_byte_rate。发布者单位时间(每秒)内可以发布到单台 broker 的字节数。
  • consumer_byte_rate。消费者单位时间(每秒)内可以从单台 broker 拉取的字节数。
#1. 配置user+clientid。例如,user为”user1”,clientid为”clientA”。
kafka-configs --zookeeper localhost:2181 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048' \
--entity-type users --entity-name user1 --entity-type clients --entity-name clientA
#2. 配置user。例如,user为”user1”
kafka-configs --zookeeper localhost:2181 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048' \
--entity-type users --entity-name user1
#3. 配置client-id。例如,client-id为”clientA”
kafka-configs --zookeeper localhost:2181 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048' \
--entity-type clients --entity-name clientA

参考

  1. 7.5 Incorporating Security Features in a Running Cluster
  2. Configuring SCRAM
  3. Docker Configuration Parameters
  4. Configuring SCRAM