海量数据下的EFK架构优化升级

1、数据背景

在海量数据场景下,日志管理和分析是一项重要任务。为了解决这个问题,EFK 架构(Elasticsearch + Fluentd + Kibana)已经成为流行的选择。

然而,随着数据规模的增加,传统的 EFK 架构可能面临性能瓶颈和可用性挑战。为了提升架构的性能和可伸缩性,我们可以结合 Kafka 和 Logstash 对 EFK 架构进行优化升级。

首先,引入 Kafka 作为高吞吐量的消息队列是关键的一步。Kafka 可以接收和缓冲大量的日志数据,减轻 Elasticsearch 的压力,并提供更好的可用性和容错性。

然后,我们可以使用 Fluentd 或 Logstash 将日志数据发送到 Kafka 中。将 Kafka 视为中间件层,用于处理日志数据流。这样可以解耦 Fluentd 或 Logstash 和 Elasticsearch之间的直接连接,提高整体的可靠性和灵活性。

通过 Logstash 的 Kafka 插件,我们可以将 Kafka 中的数据消费到 Logstash 中进行处理和转发。这样 Logstash 就负责从 Kafka 中获取数据,然后根据需要进行过滤、解析和转换,最终将数据发送到 Elasticsearch 进行存储和索引。

海量数据下的EFK架构优化升级

2、KAFKA部署配置

首先在 Kubernetes 集群中安装 Kafka,同样这里使用 Helm 进行安装:

$ helm repo add bitnami https://charts.bitnami.com/bitnami
$ helm repo update

首先使用helm pull拉取 Chart 并解压:

$ helm pull bitnami/kafka --untar --version 17.2.3
$ cd kafka

这里面我们指定使用一个StorageClass来提供持久化存储,在 Chart 目录下面创建用于安装的 values 文件:直接

# values.yaml
## @section Persistence parameters
persistence:
  enabled: true
  storageClass: "nfs-storage"
  accessModes:
    - ReadWriteOnce
  size: 20Gi
  mountPath: /bitnami/kafka
# 配置zk volumes zookeeper:
  enabled: true
  persistence:
    enabled: true
    storageClass: "nfs-storage"
    accessModes:
      - ReadWriteOnce
    size: 20Gi

直接使用上面的 values 文件安装 kafka:

$ helm upgrade --install kafka -f values.yaml --namespace logging .
Release "kafka" does not exist. Installing it now.
NAME: kafka
LAST DEPLOYED: Fri Jun 30 17:48:51 2023
NAMESPACE: logging
STATUS: deployed
REVISION: 1
TEST SUITE: None
NOTES:
CHART NAME: kafka
CHART VERSION: 17.2.3
APP VERSION: 3.2.0
** Please be patient while the chart is being deployed **
Kafka can be accessed by consumers via port 9092 on the following
DNS name from within your cluster:
    kafka.logging.svc.cluster.local
Each Kafka broker can be accessed by producers via port 9092 on the
following DNS name(s) from within your cluster:
    kafka-0.kafka-headless.logging.svc.cluster.local:9092
To create a pod that you can use as a Kafka client run the following
commands:
    kubectl run kafka-client --restart='Never' --image
docker.io/bitnami/kafka:3.2.0-debian-10-r4 --namespace logging --
command -- sleep infinity
    kubectl exec --tty -i kafka-client --namespace logging -- bash
    PRODUCER:
        kafka-console-producer.sh \
            --broker-list kafka-0.kafka-
headless.logging.svc.cluster.local:9092 \
            --topic test
    CONSUMER:
        kafka-console-consumer.sh \
            --bootstrap-server kafka.logging.svc.cluster.local:9092\
            --topic test \
            --from-beginning

安装完成后我们可以使用上面的提示来检查 Kafka 是否正常运行:

$ kubectl get pods -n logging -l app.kubernetes.io/instance=kafka
kafka-0                     1/1 Running 0 7m58s
kafka-zookeeper-0           1/1 Running 0 7m58s

用下面的命令创建一个 Kafka 的测试客户端 Pod:

$ kubectl run kafka-client --restart='Never' --image docker.io/bitnami/kafka:3.2.0-debian-10-r4 --namespace logging --command -- sleep infinity
pod/kafka-client created

然后启动一个终端进入容器内部生产消息:

# 生产者
$ kubectl exec --tty -i kafka-client --namespace logging -- bash
I have no name!@kafka-client:/$ kafka-console-producer.sh --brokerlist kafka-0.kafka-headless.logging.svc.cluster.local:9092 --topic test
>hello kafka on k8s
>

启动另外一个终端进入容器内部消费消息:

# 消费者
$ kubectl exec --tty -i kafka-client --namespace logging -- bash
I have no name!@kafka-client:/$ kafka-console-consumer.sh --bootstrap-server kafka.logging.svc.cluster.local:9092 --topic test --from-beginning
hello kafka on k8s

如果在消费端看到了生产的消息数据证明我们的 Kafka 已经运行成功了。

3、Fluentd配置Kafka

现在有了 Kafka,我们就可以将 Fluentd 的日志数据输出到 Kafka 了,只需要将Fluentd 配置中的<match>更改为使用 Kafka 插件即可,但是在 Fluentd 中输出到Kafka,需要使用到fluent-plugin-kafka插件,所以需要我们自定义下 Docker 镜像,最简单的做法就是在上面 Fluentd 镜像的基础上新增 Kafka 插件即可,Dockerfile文件如下所示:

FROM quay.io/fluentd_elasticsearch/fluentd:v3.4.0
# RUN echo "source 'https://mirrors.tuna.tsinghua.edu.cn/rubygems/'" > Gemfile && gem install bundler
RUN gem sources --add https://mirrors.tuna.tsinghua.edu.cn/rubygems/ --remove https://rubygems.org/ && gem sources && gem install bundler -v 2.4.22
RUN gem install fluent-plugin-kafka -v 0.17.5 --no-document

编译:

$ docker build -t harbor-local.kubernets.cn/library/fluentdkafka:v0.17.5 .
$ docker push harbor-local.kubernets.cn/library/fluentdkafka:v0.17.5

接下来替换 Fluentd 的 Configmap 对象中的<match>部分,如下所示:

# fluentd-configmap.yaml
kind: ConfigMap
apiVersion: v1
metadata:
  name: fluentd-conf
  namespace: logging
data:
  ......
  output.conf: |-
    <match **>
      @id kafka
      @type kafka2
      @log_level info

      # list of seed brokers
      brokers kafka-0.kafka-headless.logging.svc.cluster.local:9092
      use_event_time true

      # topic settings
      topic_key k8slog
      default_topic messages  # 注意,kafka中消费使用的是这个topic
      # buffer settings
      <buffer k8slog>
        @type file
        path /var/log/td-agent/buffer/td
        flush_interval 3s
      </buffer>

      # data type settings
      <format>
        @type json
      </format>

      # producer settings
      required_acks -1
      compression_codec gzip

    </match>

然后替换运行的 Fluentd 镜像:

# fluentd-daemonset.yaml
image: harbor-local.kubernets.cn/library/fluentd-kafka:v0.17.5

直接更新 Fluentd 的 Configmap 与 DaemonSet 资源对象即可:

$ kubectl apply -f fluentd-configmap.yaml
$ kubectl apply -f fluentd-daemonset.yaml

更新成功后我们可以使用上面的测试 Kafka 客户端来验证是否有日志数据:

$ kubectl exec --tty -i kafka-client --namespace logging -- bash
I have no name!@kafka-client:/$ kafka-console-consumer.sh --bootstrap-server kafka.logging.svc.cluster.local:9092 --topic messages --from-beginning
{"stream":"stdout","docker":{},"kubernetes":{"container_name":"count","namespace_name":"default","pod_name":"counter","container_image":"busybox:latest","host":"node1","labels":{"logging":"true"}},"message":"43883: Tue Jul 2 12:16:30 UTC2023\n"}
......

4、安装Logstash

虽然数据从 Kafka 到 Elasticsearch 的方式多种多样,比如可以使用Kafka ConnectElasticsearch Connector来实现,我们这里还是采用更加流行的Logstash方案,上面我们已经将日志从 Fluentd 采集输出到 Kafka 中去了,接下来我们使用 Logstash 来连接 Kafka 与 Elasticsearch 间的日志数据。

首先使用helm pull拉取 Chart 并解压:

$ helm pull elastic/logstash --untar --version 7.17.3
$ cd logstash

同样在 Chart 根目录下面创建用于安装的 Values 文件,如下所示:

# values.yaml
fullnameOverride: logstash

persistence:
  enabled: true

logstashConfig:
  logstash.yml: |
    http.host: 0.0.0.0

# 要注意下格式
logstashPipeline:
  logstash.conf: |
    input { kafka { bootstrap_servers => "kafka-0.kafka-headless.logging.svc.cluster.local:9092" codec => json consumer_threads => 3 topics => ["messages"] } }
    filter {}  # 过滤配置(比如可以删除key、添加geoip等等)
    output { elasticsearch { hosts => [ "elasticsearch-master:9200" ] index => "logstash-k8s-%{+YYYY.MM.dd}" } stdout { codec => rubydebug } }

volumeClaimTemplate:
  accessModes: ["ReadWriteOnce"]
  storageClassName: nfs-storage
  resources:
    requests:
      storage: 30Gi

其中最重要的就是通过logstashPipeline配置 logstash 数据流的处理配置,通过input指定日志源 kafka 的配置,通过output输出到 Elasticsearch,同样直接使用上面的 Values 文件安装 logstash 即可:

$ helm upgrade --install logstash -f values.yaml --namespace logging .
Release "logstash" does not exist. Installing it now.
NAME: logstash
LAST DEPLOYED: Fri Jun 30 19:48:51 2023
NAMESPACE: logging
STATUS: deployed
REVISION: 1
TEST SUITE: None
NOTES:
1. Watch all cluster members come up.
$ kubectl get pods --namespace=logging -l app=logstash -w

安装启动完成后可以查看 logstash 的日志:

$ kubectl get pods --namespace=logging -l app=logstash
NAME         READY   STATUS    RESTARTS   AGE
logstash-0   1/1     Running   0          2m8s
$ kubectl logs -f logstash-0 -n logging
...... {
      "@version" => "1",
        "stream" => "stdout",
    "@timestamp" => 2023-06-30T11:09:16.889Z,
       "message" => "4672: Thu Jun  30 11:09:15 UTC 2023",
    "kubernetes" => {
     "container_image" => "docker.io/library/busybox:latest",
      "container_name" => "count",
              "labels" => {
        "logging" => "true"
                },
            "pod_name" => "counter",
      "namespace_name" => "default",
              "pod_ip" => "10.244.2.118",
                "host" => "node2",
    "namespace_labels" => {
        "kubernetes_io/metadata_name" => "default"
         }
    },
    "docker" => {}
}

由于我们启用了 debug 日志调试,所以我们可以在 logstash 的日志中看到我们采集的日志消息,到这里证明我们的日志数据就获取成功了。现在我们可以登录到 Kibana 可以看到有如下所示的索引数据了:

海量数据下的EFK架构优化升级

然后同样创建索引模式,匹配上面的索引即可:

海量数据下的EFK架构优化升级

创建完成后就可以前往发现页面过滤日志数据了:

海量数据下的EFK架构优化升级

到这里我们就实现了一个使用Fluentd+Kafka+Logstash+Elasticsearch+Kibana的Kubernetes 日志收集工具栈,这里我们完整的 Pod 信息如下所示:

$ kubectl get pods -n logging
NAME                             READY   STATUS    RESTARTS   AGE
elasticsearch-master-0           1/1     Running   0          45m
elasticsearch-master-1           1/1     Running   0          45m
elasticsearch-master-2           1/1     Running   0          45m
fluentd-5mk96                    1/1     Running   0          13m
fluentd-gq79f                    1/1     Running   0          13m
fluentd-r4jkx                    1/1     Running   0          13m
fluentd-sxxp5                    1/1     Running   0          13m
fluentd-x5xr5                    1/1     Running   0          13m
fluentd-z2f2t                    1/1     Running   0          13m
kafka-0                          1/1     Running   0          52m
kafka-client                     1/1     Running   0          10m
kafka-zookeeper-0                1/1     Running   0          52m
kibana-kibana-6755f6db68-rv76q   1/1     Running   0          44m
logstash-0                       1/1     Running   0          3m51s

当然在实际的工作项目中还需要我们根据实际的业务场景来进行参数性能调优以及高可用等设置,以达到系统的最优性能。

上面我们在配置logstash的时候是将日志输出到 “logstash-k8s-%{+YYYY.MM.dd}”这个索引模式的,可能有的场景下只通过日期去区分索引不是很合理;

那么我们可以根据自己的需求去修改索引名称,比如可以根据我们的服务名称来进行区分,那么这个服务名称可以怎么来定义呢?

可以是 Pod 的名称或者通过 label 标签去指定,比如我们这里去做一个规范,要求需要收集日志的 Pod 除了需要添加logging: true这个标签之外,还需要添加一个logIndex: <索引名>的标签。

比如重新更新我们测试的 counter 应用:

apiVersion: v1
kind: Pod
metadata:
  name: counter
  labels:
    logging: "true" # 一定要具有该标签才会被采集
    logIndex: "zhdya"  # 指定索引名称
spec:
  containers:
    - name: count
      image: busybox
      args:
        [
          /bin/sh,
          -c,
          'i=0; while true; do echo "$i: $(date)"; i=$((i+1)); sleep 1; done',
        ]

然后重新更新 Logstash 的配置,修改 values 配置:

# ......
logstashPipeline:
  logstash.conf: |
    input { kafka { bootstrap_servers => "kafka-0.kafka-headless.logging.svc.cluster.local:9092" codec => json consumer_threads => 3 topics => ["messages"] } }
    filter {} # 过滤配置(比如可以删除key、添加geoip等等)
    output { elasticsearch { hosts => [ "elasticsearch-master:9200" ] index => "k8s-%{[kubernetes][labels][logIndex]}-%{+YYYY.MM.dd}" } stdout { codec => rubydebug } }
# ......

logstash更新:

 $ helm upgrade --install logstash -f values.yaml --namespace logging .

使用上面的 values 值更新 logstash,正常更新后上面的 counter 这个 Pod 日志会输出 到一个名为k8s-zhdya-2023.07.05的索引去。

海量数据下的EFK架构优化升级

这样我们就实现了自定义索引名称,当然你也可以使用 Pod 名称、容器名称、命名空间 名称来作为索引的名称,这完全取决于你自己的需求。

声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。

给TA打赏
共{{data.count}}人
人已打赏
EFKStack云原生

EFK日志平台部署管理

2025-7-15 4:19:43

云原生

基于Loki的日志收集系统

2025-7-15 6:16:33

0 条回复 A文章作者 M管理员
    暂无讨论,说说你的看法吧
个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索
本站支持IPv6访问