Kubernetes上的Kafka,Strimzi Way(第2部分)

作者 : IT 大叔 本文共7389个字,预计阅读时间需要19分钟 发布时间: 2020-08-18

学习第二部门之前请先查阅第一部分内容:

Kubernetes上的Kafka,Strimzi方式!(第1部分)

本节内容要点:了解如何使用开源操作员在Kubernetes上运行Kafka。

我们通过设置一个单节点Kafka集群开始了本系列第一部分,该集群只有同一Kubernetes集群中的内部客户端才能访问,没有加密,认证或授权,并且使用了临时持久性。在本博客系列的过程中,我们将不断对此进行迭代/改进。

本部分将涵盖以下主题:

  • 将Kafka集群公开给外部应用程序
  • 应用TLS加密
  • 探索Kubernetes的幕后资源
  • 使用Kafka CLI和Go客户端应用程序来测试我们的集群设置

该代码可在GitHub上找到-https: //github.com/abhirockzz/kafka-kubernetes-strimzi/

我需要尝试什么?

kubectlhttps://kubernetes.io/docs/tasks/tools/install-kubectl/

我将使用Azure Kubernetes服务(AKS)来演示概念,但是总的来说,它独立于Kubernetes提供程序(例如,可以随意使用本地设置,如minikube)。如果要使用AKS,您只需一个Microsoft Azure帐户,如果您还没有帐户,则可以免费获得

在本系列的本部分或后续部分中,我将不再重复一些常见的部分(例如Helm的安装/设置,Strimzi,Azure Kubernetes服务以及Strimzi概述),并且请您参考第一部分以了解详细信息。

让我们创建一个外部可访问的Kafka集群

为此,我们只需要Kafka稍微调整一下Strimzi 资源。我在下面重点介绍关键部分- 这是第1部分的原始清单

spec:

  kafka:
    version: 2.4.0
    replicas: 1
    listeners:
      plain: {}
      external:
        type: loadbalancer
        tls: true

发生了什么变化?

为了使Kafka可供外部客户端应用程序访问,我们添加了external类型为的侦听器loadbalancer。由于我们将应用程序公开到公共Internet,因此我们需要其他保护层,例如传输级别(TLS/SSL加密)和应用程序级别安全性(身份验证和授权)。在这一部分中,我们将仅配置加密并在另一个博客中探讨其他方面。要配置端到端TLS加密,我们添加tls: true

tls: true config实际上是默认设置,但为了清楚起见,我已明确添加它

要创建集群:

kubectl apply -f https://github.com/abhirockzz/kafka-kubernetes-strimzi/raw/master/part-2/kafka.yaml

Kubernetes Magic!

Strimzi操作员开始行动,为我们完成所有繁重的工作:

  • 它创建一个Kubernetes LoadBalancer服务。
  • ..并将适当的Kafka服务器配置植入 ConfigMap

我将重点介绍与外部侦听器和TLS加密相对应的资源。有关在Kafka集群中创建的所有资源的逐步浏览请参阅第1部分

如果您寻找Service,则会看到类似以下内容:

kubectl get svc
my-kafka-cluster-kafka-0                    LoadBalancer   10.0.162.98    40.119.233.2    9094:31860/TCP               60s
my-kafka-cluster-kafka-bootstrap            ClusterIP      10.0.200.20    <none>          9091/TCP,9092/TCP            60s
my-kafka-cluster-kafka-brokers              ClusterIP      None           <none>          9091/TCP,9092/TCP            60s
my-kafka-cluster-kafka-external-bootstrap   LoadBalancer   10.0.122.211   20.44.239.202   9094:32267/TCP               60
my-kafka-cluster-zookeeper-client           ClusterIP      10.0.137.33    <none>          2181/TCP                     82s
my-kafka-cluster-zookeeper-nodes            ClusterIP      None           <none>          2181/TCP,2888/TCP,3888/TCP   82s

注意my-kafka-cluster-kafka-external-bootstrap Service类型的LoadBalancer?由于我使用的是Azure Kubernetes服务,因此由Azure负载均衡器提供支持,该负载均衡器具有公共IP(20.44.239.202在此示例中),并通过port将Kafka暴露给外部客户端9094。您应该能够使用Azure的CLI(或者,如果你喜欢在Azure门户)来找到它通过使用az network lb list命令

export AKS_RESOURCE_GROUP=[replace with resource group name]
export AKS_CLUSTER_NAME=[replace with AKS cluster name]
export AKS_LOCATION=[replace with region e.g. southeastasia]
az network lb list -g MC_${AKS_RESOURCE_GROUP}_${AKS_CLUSTER_NAME}_${AKS_LOCATION}

加密部分呢?

为了弄清楚这一点,让我们来反思一下Kafka服务器的配置:

此文件存储在ConfigMap

export CLUSTER_NAME=my-kafka-cluster
kubectl get configmap/${CLUSTER_NAME}-kafka-config -o yaml

这是什么样Common listener configurationserver.config揭示:

listeners=REPLICATION-9091://0.0.0.0:9091,PLAIN-9092://0.0.0.0:9092,EXTERNAL-9094://0.0.0.0:9094
advertised.listeners=REPLICATION-9091://my-kafka-cluster-kafka-${STRIMZI_BROKER_ID}.my-kafka-cluster-kafka-brokers.default.svc:9091,PLAIN-9092://my-kafka-cluster-kafka-${STRIMZI_BROKER_ID}.my-kafka-cluster-kafka-brokers.default.svc:9092,EXTERNAL-9094://${STRIMZI_EXTERNAL_9094_ADVERTISED_HOSTNAME}:${STRIMZI_EXTERNAL_9094_ADVERTISED_PORT}
listener.security.protocol.map=REPLICATION-9091:SSL,PLAIN-9092:PLAINTEXT,EXTERNAL-9094:SSL

请注意,除了代理间复制(通过port 9091)和未加密的内部(在Kubernetes集群内)客户端通过非TLS端口的访问之外9092,还为端口上的TLS加密访问添加了适当的侦听器配置。9094

关键时刻...

为了确认,让我们尝试一些客户端应用程序,这些应用程序将与我们在Kubernetes上刚创建的Kafka集群通信!我们将使用以下方式生成和使用消息:

与我们Kafka群集的通信必须进行加密(非TLS客户端连接将被拒绝)。TLS/SSL隐式暗示一种身份验证,其中客户端验证Kafka经纪人身份。为此,客户端应用程序需要信任群集CA证书。请记住,集群CA证书存储在Kubernetes中Secret(请参阅第1部分中的详细信息)。默认情况下,这些是由Strimzi自动生成的,但是您也可以提供自己的证书(请参阅https://strimzi.io/docs/operators/master/using.html#kafka-listener-certificates-str

首先提取群集CA证书和密码:

export CLUSTER_NAME=my-kafka-cluster
kubectl get secret $CLUSTER_NAME-cluster-ca-cert -o jsonpath='{.data.ca\.crt}' | base64 --decode > ca.crt
kubectl get secret $CLUSTER_NAME-cluster-ca-cert -o jsonpath='{.data.ca\.password}' | base64 --decode > ca.password

您应该有两个文件:ca.crtca.password。随时检查他们的内容

有些Kafka客户端(例如Confluent Go客户端)直接使用CA证书,而其他(例如Java客户端,Kafka CLI等)则要求通过来访问CA证书truststore。我正在使用truststoreJDK(Java)安装随附的内置程序-但这只是为了方便起见,您可以自由使用其他选项(例如创建自己的选项)

export CERT_FILE_PATH=ca.crt
export CERT_PASSWORD_FILE_PATH=ca.password
# replace this with the path to your truststore
export KEYSTORE_LOCATION=/Library/Java/JavaVirtualMachines/jdk1.8.0_221.jdk/Contents/Home/jre/lib/security/cacerts
export PASSWORD=`cat $CERT_PASSWORD_FILE_PATH`
export CA_CERT_ALIAS=strimzi-kafka-cert
# you will prompted for the truststore password. for JDK truststore, the default password is "changeit"
# Type yes in response to the 'Trust this certificate? [no]:' prompt
sudo keytool -importcert -alias $CA_CERT_ALIAS -file $CERT_FILE_PATH -keystore $KEYSTORE_LOCATION -keypass $PASSWORD
sudo keytool -list -alias $CA_CERT_ALIAS -keystore $KEYSTORE_LOCATION

基本设置就是这样-您可以尝试使用Kafka CLI客户端!

请注意,以下详述的Kafka CLI的配置步骤也将适用于Java客户端-试试看!

提取LoadBalancerKafka集群的公共IP

export KAFKA_CLUSTER_NAME=my-kafka-cluster
kubectl get service/${KAFKA_CLUSTER_NAME}-kafka-external-bootstrap --output=jsonpath={.status.loadBalancer.ingress[0].ip}

创建一个client-ssl.properties具有以下内容的文件:

bootstrap.servers=[LOADBALANCER_PUBLIC_IP]:9094
security.protocol=SSL
ssl.truststore.location=[TRUSTSTORE_LOCATION]
//for JDK truststore, the default password is "changeit"
ssl.truststore.password=changeit

要使用Kafka CLI,请下载(Kafka如果尚未安装的话)-https: //kafka.apache.org/downloads

您需要做的就是使用kafka-console-producer和并将kafka-console-consumer其指向client-ssl.properties您刚创建的文件

export KAFKA_HOME=[replace with Kafka installation path] e.g. /Users/foobar/kafka_2.12-2.3.0
export LOADBALANCER_PUBLIC_IP=[replace with public IP of Load Balancer]
export TOPIC_NAME=test-strimzi-topic
# on a terminal, start producer and send a few messages
$KAFKA_HOME/bin/kafka-console-producer.sh --broker-list $LOADBALANCER_PUBLIC_IP:9094 --topic $TOPIC_NAME --producer.config client-ssl.properties
# on another terminal, start consumer
$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server $LOADBALANCER_PUBLIC_IP:9094 --topic $TOPIC_NAME --consumer.config client-ssl.properties --from-beginning

您应该看到生产者和消费者协同工作。大!

如果遇到SSL握手错误,请检查是否已正确导入CA证书及其正确密码。如果无法访问Kafka群集,请确保对公共IP使用正确的值

现在,让我们尝试一个编程客户端。由于Java客户端行为(必需的配置属性)与CLI相同,因此我正在使用Go客户端尝试不同的操作。不用担心,如果您不是Go程序员,那么应该很容易理解-我将不遍历整个程序,仅介绍我们创建与连接相关的配置的那一部分。

这是代码段:

bootstrapServers = os.Getenv("KAFKA_BOOTSTRAP_SERVERS")
    caLocation = os.Getenv("CA_CERT_LOCATION")
    topic = os.Getenv("KAFKA_TOPIC")
    config := &kafka.ConfigMap{"bootstrap.servers": bootstrapServers, "security.protocol": "SSL", "ssl.ca.location": caLocation}

请注意,bootstrap.serverssecurity.protocol与您在Kafka CLI客户端中使用的相同(Java也相同)。唯一的区别是,ssl.ca.location它用来直接指向CA证书,而不是truststore

如果已Go安装,可以尝试一下。克隆Git仓库...

git clone https://github.com/abhirockzz/kafka-kubernetes-strimzi
cd part-2/go-client-app

运行程序

export KAFKA_BOOTSTRAP_SERVERS=[replace with loadbalancer_ip:9094] e.g. 42.42.424.424:9094
export CA_CERT_LOCATION=[replace with path to ca.crt file which you downloaded]
export KAFKA_TOPIC=test-strimzi-topic
go run kafka-client.go

您应该看到与此类似的日志,并确认正在生成和使用消息

按下ctrl+c即可退出应用

started consumer
started producer delivery goroutine
started producer goroutine
delivered messaged test-strimzi-topic[0]@122
delivered messaged test-strimzi-topic[0]@123
delivered messaged test-strimzi-topic[0]@124
received message from test-strimzi-topic[0]@122: value-2020-06-08 16:23:05.913303 +0530 IST m=+0.020529419
received message from test-strimzi-topic[0]@123: value-2020-06-08 16:23:07.915252 +0530 IST m=+2.022455867
received message from test-strimzi-topic[0]@124: value-2020-06-08 16:23:09.915875 +0530 IST m=+4.023055601
received message from test-strimzi-topic[0]@125: value-2020-06-08 16:23:11.915977 +0530 IST m=+6.023134961
....

现在就这些了,但是还有更多!

所以我们取得了一些进展!现在,我们在Kubernetes上拥有一个Kafka集群,该集群可公开访问,但由于使用了TLS加密,因此(部分)安全。我们还使用了一个(而不是)两个(不同的)客户端应用程序进行了一些健全性测试。在下一部分中,我们将对此进行进一步的改进...敬请期待!

免责声明:
1. 本站资源转自互联网,源码资源分享仅供交流学习,下载后切勿用于商业用途,否则开发者追究责任与本站无关!
2. 本站使用「署名 4.0 国际」创作协议,可自由转载、引用,但需署名原版权作者且注明文章出处
3. 未登录无法下载,登录使用金币下载所有资源。
IT小站 » Kubernetes上的Kafka,Strimzi Way(第2部分)

常见问题FAQ

没有金币/金币不足 怎么办?
本站已开通每日签到送金币,每日签到赠送五枚金币,金币可累积。
所有资源普通会员都能下载吗?
本站所有资源普通会员都可以下载,需要消耗金币下载的白金会员资源,通过每日签到,即可获取免费金币,金币可累积使用。

发表评论