08、基于K8s构建EFK+logstash+kafka日志平台(下)

目录

一、安装收集日志组件 Fluentd

二、kibana 可视化展示查询 k8s 容器日志

三、测试 efk 收集生产环境业务 pod 日志

四、基于 EFK+logstash+kafka 构建高吞吐量的日志平台

4、 1部署fluentd;

4、 2接入kafka;

4、 3配置logstash;

4、 4启动logstash;


本篇文章所用到的资料文件下载地址:https://download.csdn.net/download/weixin_46560589/87392272

一、安装收集日志组件 Fluentd

我们使用 daemonset 控制器部署 fluentd 组件,这样可以保证集群中的每个节点都可以运行同样 fluentd 的 pod 副本,这样就可以收集 k8s 集群中每个节点的日志。在 k8s 集群中,容器应用程序的输入输出日志会重定向到 node 节点里的 json 文件中,fluentd 可以 tail 和过滤以及把日志转换成指定的格式发送到 elasticsearch 集群中。除了容器日志,fluentd 也可以采集 kubelet、kube-proxy、docker 的日志。

# 离线镜像压缩包 fluentd.tar.gz 上传到各个节点上,手动解压:
[root@k8s-master1 ~]# docker load -i fluentd.tar.gz 
[root@k8s-node1 ~]# docker load -i fluentd.tar.gz 
[root@k8s-node2 ~]# docker load -i fluentd.tar.gz 

[root@k8s-master1 efk]# vim fluentd.yaml 
apiVersion: v1
kind: ServiceAccount
metadata:
  name: fluentd
  namespace: kube-logging
  labels:
    app: fluentd
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: fluentd
  labels:
    app: fluentd
rules:
- apiGroups:
  - ""
  resources:
  - pods
  - namespaces
  verbs:
  - get
  - list
  - watch
---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: fluentd
roleRef:
  kind: ClusterRole
  name: fluentd
  apiGroup: rbac.authorization.k8s.io
subjects:
- kind: ServiceAccount
  name: fluentd
  namespace: kube-logging
---
apiVersion: apps/v1
kind: DaemonSet
metadata:
  name: fluentd
  namespace: kube-logging
  labels:
    app: fluentd
spec:
  selector:
    matchLabels:
      app: fluentd
  template:
    metadata:
      labels:
        app: fluentd
    spec:
      serviceAccount: fluentd
      serviceAccountName: fluentd
      tolerations:
      - key: node-role.kubernetes.io/master
        effect: NoSchedule
      containers:
      - name: fluentd
        image: fluent/fluentd-kubernetes-daemonset:v1.4.2-debian-elasticsearch-1.1
        imagePullPolicy: IfNotPresent
        env:
          - name:  FLUENT_ELASTICSEARCH_HOST
            value: "elasticsearch.kube-logging.svc.cluster.local"
          - name:  FLUENT_ELASTICSEARCH_PORT
            value: "9200"
          - name: FLUENT_ELASTICSEARCH_SCHEME
            value: "http"
          - name: FLUENTD_SYSTEMD_CONF
            value: disable
        resources:
          limits:
            memory: 512Mi
          requests:
            cpu: 100m
            memory: 200Mi
        volumeMounts:
        - name: varlog
          mountPath: /var/log
        - name: varlibdockercontainers
          mountPath: /var/lib/docker/containers
          readOnly: true
      terminationGracePeriodSeconds: 30
      volumes:
      - name: varlog
        hostPath:
          path: /var/log
      - name: varlibdockercontainers
        hostPath:
          path: /var/lib/docker/containers

[root@k8s-master1 efk]# kubectl apply -f fluentd.yaml 

[root@k8s-master1 efk]# kubectl get pods -n kube-logging | grep fluentd
fluentd-6vmdc             1/1     Running   0             24s
fluentd-mtgxg             1/1     Running   0             24s
fluentd-nzv4n             1/1     Running   0             24s

二、kibana 可视化展示查询 k8s 容器日志

Fluentd 启动成功后,我们可以前往 Kibana 的 Dashboard 页面中,点击 Try our sample data:

*

点击左侧的Discover

*

可以看到如下配置页面

*

在这里可以配置我们需要的 Elasticsearch 索引,前面 Fluentd 配置文件中我们采集的日志使用的是 logstash 格式,这里只需要在文本框中输入logstash-* 即可匹配到 Elasticsearch 集群中的所有日志数据。点击 Next step:

*

选择@timestamp,创建索引

*

点击左侧的 discover,可看到如下

*

三、测试 efk 收集生产环境业务 pod 日志

[root@k8s-master1 efk]# vim pod.yaml 
apiVersion: v1
kind: Pod
metadata:
  name: counter
spec:
  containers:
  - name: count
    image: busybox
    imagePullPolicy: IfNotPresent
    args: [/bin/sh, -c,'i=0; while true; do echo "$i: $(date)"; i=$((i+1)); sleep 1; done']

[root@k8s-master1 efk]# kubectl apply -f pod.yaml 

[root@k8s-master1 efk]# kubectl get pods 
NAME                               READY   STATUS    RESTARTS      AGE
counter                            1/1     Running   0             25s
nfs-provisioner-6988f7c774-f478v   1/1     Running   2 (56m ago)   138m

Kibana 查询语言 KQL 官方地址:Kibana Query Language | Kibana Guide [7.2] | Elastic

登录到 kibana 的控制面板,在 discover 处的搜索栏中输入 kubernetes.pod_name:counter,这将过滤名为 counter的 Pod 的日志数据 ,如下所示:

*

通过前面的实验,我们已经在 k8s 集群成功部署了 elasticsearch、fluentd、kibana,这里使用的 efk 系统包括 3 个 Elasticsearch Pod,一个 Kibana Pod 和一组作为 DaemonSet 部署的Fluentd Pod。

四、基于 EFK+logstash+kafka 构建高吞吐量的日志平台

fluentd --> kafka --> logstash --> elasticsearch --> kibana

适用于数据量大的场景。此处只提供思路方法,有待验证。

4.1 部署 fluentd

[root@k8s-master1 efk]# vim fluentd-configmap.yaml
kind: ConfigMap
apiVersion: v1
metadata:
  name: fluentd-config
  namespace: logging
  labels:
    addonmanager.kubernetes.io/mode: Reconcile
data:
  system.conf: |-
    <system>
      root_dir /tmp/fluentd-buffers/
    </system>
  containers.input.conf: |-
    <source>
      @id fluentd-containers.log
      @type tail
      path /var/log/containers/*.log
      pos_file /var/log/es-containers.log.pos
      time_format %Y-%m-%dT%H:%M:%S.%NZ
      localtime
      tag raw.kubernetes.*
      format json
      read_from_head true
    </source>
    # Detect exceptions in the log output and forward them as one log entry.
    <match raw.kubernetes.**>
      @id raw.kubernetes
      @type detect_exceptions
      remove_tag_prefix raw
      message log
      stream stream
      multiline_flush_interval 5
      max_bytes 500000
      max_lines 1000
    </match>
  system.input.conf: |-
    # Logs from systemd-journal for interesting services.
    <source>
      @id journald-docker
      @type systemd
      filters [{ "_SYSTEMD_UNIT": "docker.service" }]
      <storage>
        @type local
        persistent true
      </storage>
      read_from_head true
      tag docker
    </source>
    <source>
      @id journald-kubelet
      @type systemd
      filters [{ "_SYSTEMD_UNIT": "kubelet.service" }]
      <storage>
        @type local
        persistent true
      </storage>
      read_from_head true
      tag kubelet
    </source>
  forward.input.conf: |-
    # Takes the messages sent over TCP
    <source>
      @type forward
    </source>
  output.conf: |-
    # Enriches records with Kubernetes metadata
    <filter kubernetes.**>
      @type kubernetes_metadata
    </filter>
    <match **>
      @id elasticsearch
      @type elasticsearch
      @log_level info
      include_tag_key true
      host 192.168.78.143            # es 主机 ip
      port 9200
      logstash_format true
      request_timeout 30s
      <buffer>
        @type file
        path /var/log/fluentd-buffers/kubernetes.system.buffer
        flush_mode interval
        retry_type exponential_backoff
        flush_thread_count 2
        flush_interval 5s
        retry_forever
        retry_max_interval 30
        chunk_limit_size 2M
        queue_limit_length 8
        overflow_action block
      </buffer>
    </match>

[root@k8s-master1 efk]# kubectl apply -f fluentd-configmap.yaml 

[root@k8s-master1 efk]# vim fluentd-daemonset.yaml
apiVersion: v1
kind: ServiceAccount
metadata:
  name: fluentd-es
  namespace: logging
  labels:
    k8s-app: fluentd-es
    kubernetes.io/cluster-service: "true"
    addonmanager.kubernetes.io/mode: Reconcile
---
kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: fluentd-es
  labels:
    k8s-app: fluentd-es
    kubernetes.io/cluster-service: "true"
    addonmanager.kubernetes.io/mode: Reconcile
rules:
- apiGroups:
  - ""
  resources:
  - "namespaces"
  - "pods"
  verbs:
  - "get"
  - "watch"
  - "list"
---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: fluentd-es
  labels:
    k8s-app: fluentd-es
    kubernetes.io/cluster-service: "true"
    addonmanager.kubernetes.io/mode: Reconcile
subjects:
- kind: ServiceAccount
  name: fluentd-es
  namespace: logging
  apiGroup: ""
roleRef:
  kind: ClusterRole
  name: fluentd-es
  apiGroup: ""
---
apiVersion: apps/v1
kind: DaemonSet
metadata:
  name: fluentd-es
  namespace: logging
  labels:
    k8s-app: fluentd-es
    version: v2.0.4
    kubernetes.io/cluster-service: "true"
    addonmanager.kubernetes.io/mode: Reconcile
spec:
  selector:
    matchLabels:
      k8s-app: fluentd-es
      version: v2.0.4
  template:
    metadata:
      labels:
        k8s-app: fluentd-es
        kubernetes.io/cluster-service: "true"
        version: v2.0.4
      # This annotation ensures that fluentd does not get evicted if the node
      # supports critical pod annotation based priority scheme.
      # Note that this does not guarantee admission on the nodes (#40573).
      annotations:
        scheduler.alpha.kubernetes.io/critical-pod: ''
    spec:
      serviceAccountName: fluentd-es
      containers:
      - name: fluentd-es
        image: cnych/fluentd-elasticsearch:v2.0.4
        env:
        - name: FLUENTD_ARGS
          value: --no-supervisor -q
        resources:
          limits:
            memory: 500Mi
          requests:
            cpu: 100m
            memory: 200Mi
        volumeMounts:
        - name: varlog
          mountPath: /var/log
        - name: varlibdockercontainers
          mountPath: /var/lib/docker/containers
          readOnly: true
        - name: config-volume
          mountPath: /etc/fluent/config.d
      nodeSelector:
        beta.kubernetes.io/fluentd-ds-ready: "true"
      tolerations:
      - key: node-role.kubernetes.io/master
        operator: Exists
        effect: NoSchedule
      terminationGracePeriodSeconds: 30
      volumes:
      - name: varlog
        hostPath:
          path: /var/log
      - name: varlibdockercontainers
        hostPath:
          path: /var/lib/docker/containers
      - name: config-volume
        configMap:
          name: fluentd-config

# 创建节点标签
[root@k8s-master1 efk]# kubectl label nodes k8s-master1 beta.kubernetes.io/fluentd-ds-ready=true
[root@k8s-master1 efk]# kubectl label nodes k8s-node1 beta.kubernetes.io/fluentd-ds-ready=true
[root@k8s-master1 efk]# kubectl label nodes k8s-node2 beta.kubernetes.io/fluentd-ds-ready=true

[root@k8s-master1 efk]# kubectl apply -f fluentd-daemonset.yaml 

4.2 接入 kafka

[root@k8s-master1 efk]# vim kafka-config.yaml
kind: ConfigMap
apiVersion: v1
metadata:
  name: fluentd-config
  namespace: logging
  labels:
    addonmanager.kubernetes.io/mode: Reconcile
data:
  system.conf: |-
    <system>
      root_dir /tmp/fluentd-buffers/
    </system>
  containers.input.conf: |-
    <source>
      @id fluentd-containers.log
      @type tail
      path /var/log/containers/*.log
      pos_file /var/log/es-containers.log.pos
      time_format %Y-%m-%dT%H:%M:%S.%NZ
      localtime
      tag raw.kubernetes.*
      format json
      read_from_head true
    </source>
    # Detect exceptions in the log output and forward them as one log entry.
    <match raw.kubernetes.**>
      @id raw.kubernetes
      @type detect_exceptions
      remove_tag_prefix raw
      message log
      stream stream
      multiline_flush_interval 5
      max_bytes 500000
      max_lines 1000
    </match>
  system.input.conf: |-
    # Logs from systemd-journal for interesting services.
    <source>
      @id journald-docker
      @type systemd
      filters [{ "_SYSTEMD_UNIT": "docker.service" }]
      <storage>
        @type local
        persistent true
      </storage>
      read_from_head true
      tag docker
    </source>
    <source>
      @id journald-kubelet
      @type systemd
      filters [{ "_SYSTEMD_UNIT": "kubelet.service" }]
      <storage>
        @type local
        persistent true
      </storage>
      read_from_head true
      tag kubelet
    </source>
  forward.input.conf: |-
    # Takes the messages sent over TCP
    <source>
      @type forward
    </source>
  output.conf: |-
    # Enriches records with Kubernetes metadata
    <filter kubernetes.**>
      @type kubernetes_metadata
    </filter>
    <match **>
      @id kafka
      @type kafka2
      @log_level info
      include_tag_key true
      # list of seed brokers
      brokers kafka ip:9092
      use_event_time true
      # buffer settings
      <buffer>
        @type file
        path /var/log/fluentd-buffers/kubernetes.system.buffer
        flush_mode interval
        retry_type exponential_backoff
        flush_thread_count 2
        flush_interval 5s
        retry_forever
        retry_max_interval 30
        chunk_limit_size 2M
        queue_limit_length 8
        overflow_action block
      </buffer>
      # data type settings
      <format>
        @type json
      </format>
      # topic settings
      topic_key topic
      default_topic messages
      # producer settings
      required_acks -1
      compression_codec gzip
    </match>

[root@k8s-master1 efk]# kubectl apply -f kafka-config.yaml 

# 重启 fluentd 
[root@k8s-master1 efk]# kubectl delete -f fluentd-daemonset.yaml 
[root@k8s-master1 efk]# kubectl apply -f fluentd-daemonset.yaml 

4.3 配置 logstash

配置logstash 消费 messages 日志写入 elasticsearch

[root@k8s-master1 efk]# vim config/kafkaInput_fluentd.conf
input {
    kafka {
        bootstrap_servers => ["kafka ip:9092"]
        client_id => "fluentd"
        group_id => "fluentd"
        consumer_threads => 1
        auto_offset_reset => "latest"
        topics => ["messages"]
    }
}
 
filter {
        json{
                source => "message"
        }
        
       ruby {
       code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)"
       }
      ruby {
        code => "event.set('@timestamp',event.get('timestamp'))"
       }
      ruby {
        code => "event.set('find_time',event.get('@timestamp').time.localtime - 8*60*60)"
       }
     mutate {
    remove_field => ["timestamp"]
    remove_field => ["message"]
    }
     
} 
output {
          elasticsearch{
               hosts => ["es ip地址: 9200"]
               index => "kubernetes_%{+YYYY_MM_dd}"

          }
#    stdout {
#           codec => rubydebug
#           }
}

4.4 启动 logstash

[root@k8s-master1 efk]# nohup ./bin/logstash -f config/kafkaInput_fluentd.conf --config.reload.automatic --path.data=/opt/logstash/data_fluentd 2>&1 > fluentd.log &

上一篇文章:【Kubernetes 企业项目实战】04、基于 K8s 构建 EFK+logstash+kafka 日志平台(中)_Stars.Sky的博客-CSDN博客

下一篇文章:【Kubernetes 企业项目实战】05、基于云原生分布式存储 Ceph 实现 K8s 数据持久化(上)_Stars.Sky的博客-CSDN博客