Use TiDB CDC to synchronize change records to Elasticsearch for developers to troubleshoot issues

Note:
This topic has been translated from a Chinese forum by GPT and might contain errors.

Original topic: tidb用cdc同步变更记录到es里面去查询。给开发排查问题

| username: tidb狂热爱好者

[TiDB Usage Environment] Production Environment
[TiDB Version]
For development troubleshooting, you need to parse MySQL’s binlog logs. Setting up this platform will save you effort.
[Reproduction Path] What operations were performed to encounter the issue
[Encountered Issue: Issue Phenomenon and Impact]
[Resource Configuration]
[Attachments: Screenshots/Logs/Monitoring]

First, let me explain how to search in TiDB.
Add the TiDB CDC component in front of the existing TiDB cluster.
Note: To use TiCDC, the TiDB version must be upgraded to v4.0.6 or above.

  1. TiCDC Configuring Data Synchronization to Kafka
    This article continues from the previous one. Let’s first look at the current cluster status:

We are still using the CDC-server expanded in the previous article.

In the previous article, we created a task to synchronize data from TiDB to MySQL. Now, let’s create another synchronization task to Kafka:

./cdc cli changefeed create --pd=http://192.168.40.160:2379 --sink-uri='kafka://192.168.40.1:9092/tidb-cdc?kafka-version=2.6.0&partition-num=1&max-message-bytes=67108864&replication-factor=1&protocol=canal-json' --changefeed-id="replication-task-2"
  • tidb-cdc: indicates the topic
  • kafka-version: downstream Kafka version number (optional, default is 2.4.0, currently supports a minimum version of 0.11.0.2)
  • kafka-client-id: specifies the Kafka client ID for the synchronization task (optional, default is TiCDC_sarama_producer_sync_task_ID)
  • partition-num: number of downstream Kafka partitions (optional, cannot exceed the actual number of partitions. If not filled, it will automatically fetch the partition number)
  • protocol: indicates the message protocol output to Kafka, optional values are default, canal, avro, maxwell, canal-json (default is default)
  • max-message-bytes: maximum data amount sent to Kafka broker each time (optional, default is 64MB)
  • replication-factor: number of Kafka message replicas (optional, default is 1)
  • ca: path to the CA certificate file required to connect to the downstream Kafka instance (optional)
  • cert: path to the certificate file required to connect to the downstream Kafka instance (optional)
  • key: path to the certificate key file required to connect to the downstream Kafka instance (optional)

Successfully created.

Use the following command to see all tasks:

./cdc cli changefeed list --pd=http://192.168.40.160:2379

Or check the details of our task:

./cdc cli changefeed query --pd=http://192.168.40.160:2379 --changefeed-id=replication-task-2

Then start a Kafka single-node task to receive data stored in Kafka by TiDB CDC:

wget https://downloads.apache.org/kafka/3.4.0/kafka_2.12-3.4.0.tgz
tar zxf kafka_2.12-3.4.0.tgz
cd kafka_2.12-3.4.0

First, start Zookeeper:

nohup bin/zookeeper-server-start.sh config/zookeeper.properties &

Then start Kafka:

nohup bin/kafka-server-start.sh config/server.properties &
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic tidb-cdc

After starting, install an ELK platform with one click.

Download Logstash:

wget https://artifacts.elastic.co/downloads/logstash/logstash-7.0.0.tar.gz

Write a Logstash configuration file to transfer Kafka data to ES. Note that ES needs to be configured with an account and password:

input {
    kafka {
        bootstrap_servers => "127.0.0.1:9092"
        topics => ["tidb-cdc"]
        codec => json {
            charset => "UTF-8"
        }
    }
    # If there are other data sources, add them directly below
}

output {
    # Processed logs are stored in local files
    file {
        path => "/config-dir/test.log"
        flush_interval => 0
    }
    # Processed logs are stored in ES
    elasticsearch {
        user => "elastic"
        password => "oscardba"
        hosts => "172.21.10.147:9200"
        index => "tidb-%{+YYYY.MM.dd}"
    }
}

Then start Logstash:

logstash -f es.conf

Finally, create an index on the Kibana platform to query SQL change records.
Lastly, create a TiDB collection index on the platform.

Attached is a one-click ELK creation script:

#!/bin/bash
# Date: 2019-5-20 13:14:00
# Author Blog:

# Auto Install ELK log analysis platform

User="elk"
Elasticsearch_User="elastic"
Elasticsearch_Passwd="oscardba"
IPADDR=$(hostname -I | awk '{print $1}')
Elasticsearch_DIR="/data/elasticsearch"
Kafka_IP=$(hostname -I | awk '{print $1}')
Zookeeper_IP=$(hostname -I | awk '{print $1}')
Elasticsearch_IP=$(hostname -I | awk '{print $1}')

# Define JDK path variables
JDK_URL=https://mirrors.yangxingzhen.com/jdk
JDK_File=jdk-11.0.1_linux-x64_bin.tar.gz
JDK_File_Dir=jdk-11.0.1
JDK_Dir=/usr/local/jdk-11.0.1

# Define Redis path variables
Redis_URL=http://download.redis.io/releases
Redis_File=redis-5.0.7.tar.gz
Redis_File_Dir=redis-5.0.7
Redis_Prefix=/usr/local/redis

# Define Nginx path variables
Nginx_URL=http://nginx.org/download
Nginx_File=nginx-1.18.0.tar.gz
Nginx_File_Dir=nginx-1.18.0
Nginx_Dir=/usr/local/nginx

# Define Elasticsearch path variables
Elasticsearch_URL=https://artifacts.elastic.co/downloads/elasticsearch
Elasticsearch_File=elasticsearch-7.5.1-linux-x86_64.tar.gz
Elasticsearch_File_Dir=elasticsearch-7.5.1
Elasticsearch_Dir=/usr/local/elasticsearch

# Define Logstash path variables
Filebeat_URL=https://artifacts.elastic.co/downloads/beats/filebeat
Filebeat_File=filebeat-7.5.1-linux-x86_64.tar.gz
Filebeat_File_Dir=filebeat-7.5.1-linux-x86_64
Filebeat_Dir=/usr/local/filebeat

# Define Kafka path variables
Logstash_URL=https://artifacts.elastic.co/downloads/logstash
Logstash_File=logstash-7.5.1.tar.gz
Logstash_File_Dir=logstash-7.5.1
Logstash_Dir=/usr/local/logstash

# Define Kibana path variables
Kibana_URL=https://artifacts.elastic.co/downloads/kibana
Kibana_File=kibana-7.5.1-linux-x86_64.tar.gz
Kibana_File_Dir=kibana-7.5.1-linux-x86_64
Kibana_Dir=/usr/local/kibana

# Configure kernel parameters
cat >>/etc/security/limits.conf <<EOF
* soft nofile 65537
* hard nofile 65537
* soft nproc 65537
* hard nproc 65537
EOF

if [ $(grep -wc "4096" /etc/security/limits.d/20-nproc.conf) -eq 0 ]; then
cat >>/etc/security/limits.d/20-nproc.conf <<EOF
*          soft    nproc     4096
EOF
fi

cat >/etc/sysctl.conf <<EOF
net.ipv4.tcp_max_syn_backlog = 65536
net.core.netdev_max_backlog = 32768
net.core.somaxconn = 32768
net.core.wmem_default = 8388608
net.core.rmem_default = 8388608
net.core.rmem_max = 16777216
net.core.wmem_max = 16777216
net.ipv4.tcp_timestamps = 0
net.ipv4.tcp_synack_retries = 2
net.ipv4.tcp_syn_retries = 2
net.ipv4.tcp_tw_recycle = 1
net.ipv4.tcp_tw_reuse = 1
net.ipv4.tcp_mem = 94500000 915000000 927000000
net.ipv4.tcp_max_orphans = 3276800
net.ipv4.tcp_fin_timeout = 120
net.ipv4.tcp_keepalive_time = 120
net.ipv4.ip_local_port_range = 1024 65535
net.ipv4.tcp_max_tw_buckets = 30000
fs.file-max=655350
vm.max_map_count = 262144
net.core.somaxconn= 65535
net.ipv4.ip_forward = 1
net.ipv6.conf.all.disable_ipv6=1
EOF

# Apply sysctl -p to make the configuration effective
sysctl -p >/dev/null

# Create elk user
[ $(grep -wc "elk" /etc/passwd) -eq 0 ] && useradd elk >/dev/null

# Install JDK environment
java -version >/dev/null 2>&1
if [ $? -ne 0 ]; then
    # Install Package
    [ -f /usr/bin/wget ] || yum -y install wget >/dev/null
    wget -c ${JDK_URL}/${JDK_File}
    tar xf ${JDK_File}
    mv ${JDK_File_Dir} ${JDK_Dir}
cat >>/etc/profile <<EOF
export JAVA_HOME=${JDK_Dir}
export CLASSPATH=\$CLASSPATH:\$JAVA_HOME/lib:\$JAVA_HOME/jre/lib
export PATH=\$JAVA_HOME/bin:\$JAVA_HOME/jre/bin:\$PATH:\$HOMR/bin
EOF
fi

# Load environment variables
source /etc/profile >/dev/null

# Install Redis
if [ ! -d ${Redis_Prefix} ]; then
    [ -f /usr/bin/openssl ] || yum -y install openssl openssl-devel
    yum -y install wget gcc gcc-c++
    wget -c ${Redis_URL}/${Redis_File}
    tar zxf ${Redis_File}
    \mv ${Redis_File_Dir} ${Redis_Prefix}
    cd ${Redis_Prefix} && make
    if [ $? -eq 0 ]; then
        echo -e "\033[32mThe Redis Install Success...\033[0m"
    else
        echo -e "\033[31mThe Redis Install Failed...\033[0m"
    fi
else
    echo -e "\033[31mThe Redis has been installed...\033[0m"
    exit 1
fi

# Randomly generate password
Passwd=$(openssl rand -hex 12)

# Config Redis
ln -sf ${Redis_Prefix}/src/redis-* /usr/bin
sed -i "s/127.0.0.1/0.0.0.0/g" ${Redis_Prefix}/redis.conf
sed -i "/daemonize/s/no/yes/" ${Redis_Prefix}/redis.conf
sed -i "s/dir .*/dir \/data\/redis/" ${Redis_Prefix}/redis.conf
sed -i "s/logfile .*/logfile \/usr\/local\/redis\/redis.log/" ${Redis_Prefix}/redis.conf
sed -i '/appendonly/s/no/yes/' ${Redis_Prefix}/redis.conf
sed -i "s/# requirepass foobared/requirepass ${Passwd}/" ${Redis_Prefix}/redis.conf
echo never > /sys/kernel/mm/transparent_hugepage/enabled
sysctl vm.overcommit_memory=1

# Create data directory
[ -d /data/redis ] || mkdir -p /data/redis

# Create systemctl management configuration file
cat >/usr/lib/systemd/system/redis.service <<EOF
[Unit]
Description=Redis Server
After=network-online.target remote-fs.target nss-lookup.target
Wants=network-online.target

[Service]
Type=forking
ExecStart=/usr/bin/redis-server ${Redis_Prefix}/redis.conf
ExecStop=/usr/bin/redis-cli -h 127.0.0.1 -p 6379 shutdown
User=root
Group=root

[Install]
WantedBy=multi-user.target
EOF

# Add power on self start And Start Redis
systemctl daemon-reload
systemctl enable redis
systemctl start redis

# Install Elasticsearch
if [ ! -d ${Elasticsearch_Dir} ]; then
    # Install Package
    [ -f /usr/bin/wget ] || yum -y install wget >/dev/null
    wget -c ${Elasticsearch_URL}/${Elasticsearch_File}
    tar xf ${Elasticsearch_File}
    mv ${Elasticsearch_File_Dir} ${Elasticsearch_Dir}
else
    echo -e "\033[31mThe Elasticsearch Already Install...\033[0m"
    exit 1
fi

# Install Kibana
if [ ! -d ${Kibana_Dir} ]; then
    # Install Package
    [ -f /usr/bin/wget ] || yum -y install wget >/dev/null
    wget -c ${Kibana_URL}/${Kibana_File}
    tar xf ${Kibana_File}
    mv ${Kibana_File_Dir} ${Kibana_Dir}
else
    echo -e "\033[31mThe Kibana Already Install...\033[0m"
    exit 1
fi

# Configure Elasticsearch
mkdir -p ${Elasticsearch_DIR}/{data,logs}
cat >${Elasticsearch_Dir}/config/elasticsearch.yml <<EOF
# Node name
node.name: es-master
# Data storage directory, create this directory first
path.data: ${Elasticsearch_DIR}/data
# Log storage directory, create this directory first
path.logs: ${Elasticsearch_DIR}/logs
# Node IP
network.host: ${Elasticsearch_IP}
# TCP port
transport.tcp.port: 9300
# HTTP port
http.port: 9200
# List of eligible master nodes, if there are multiple master nodes, configure the corresponding master nodes
cluster.initial_master_nodes: ["${Elasticsearch_IP}:9300"]
# Whether to allow as a master node
node.master: true
# Whether to save data
node.data: true
node.ingest: false
node.ml: false
cluster.remote.connect: false
# Cross-domain
http.cors.enabled: true
http.cors.allow-origin: "*"
# Configure X-Pack
http.cors.allow-headers: Authorization
xpack.security.enabled: true
xpack.security.transport.ssl.enabled: true
EOF

# Configure Kibana
cat >${Kibana_Dir}/config/kibana.yml <<EOF
server.port: 5601
server.host: "${Elasticsearch_IP}"
elasticsearch.hosts: ["http://${Elasticsearch_IP}:9200"]
elasticsearch.username: "${Elasticsearch_User}"
elasticsearch.password: "${Elasticsearch_Passwd}"
logging.dest: ${Kibana_Dir}/logs/kibana.log
i18n.locale: "zh-CN"
EOF

# Create Kibana log directory
[ -d ${Kibana_Dir}/logs ] || mkdir ${Kibana_Dir}/logs

# Authorize ELK user to manage Elasticsearch, Kibana
chown -R ${User}.${User} ${Elasticsearch_Dir}
chown -R ${User}.${User} ${Elasticsearch_DIR}
chown -R root.root ${Kibana_Dir}

# Start Elasticsearch
# su ${User} -c "source /etc/profile >/dev/null && ${Elasticsearch_Dir}/bin/elasticsearch -d"

# Create systemctl management configuration file
cat >/usr/lib/systemd/system/elasticsearch.service <<EOF
[Unit]
Description=elasticsearch
After=network-online.target remote-fs.target nss-lookup.target
Wants=network-online.target

[Service]
LimitCORE=infinity
LimitNOFILE=655360
LimitNPROC=655360
User=${User}
Group=${User}
PIDFile=${Elasticsearch_Dir}/logs/elasticsearch.pid
ExecStart=${Elasticsearch_Dir}/bin/elasticsearch
ExecReload=/bin/kill -s HUP $MAINPID
ExecStop=/bin/kill -s TERM $MAINPID
RestartSec=30
Restart=always
PrivateTmp=true

[Install]
WantedBy=multi-user.target
EOF

# Start Elasticsearch service
systemctl daemon-reload
systemctl enable elasticsearch
systemctl start elasticsearch

# Determine if the Elasticsearch service is started, and execute the following operations only if it starts successfully
Code=""
while sleep 10
do
    echo -e "\033[32m$(date +'%F %T') Waiting for Elasticsearch service to start...\033[0m"
    # Get Elasticsearch service port
    netstat -lntup | egrep "9200|9300" >/dev/null
    if [ $? -eq 0 ]; then
        Code="break"
    fi
${Code}
done

# Generate Elasticsearch password
cat >/tmp/config_elasticsearch_passwd.exp <<EOF
spawn su ${User} -c "source /etc/profile >/dev/null && ${Elasticsearch_Dir}/bin/elasticsearch-setup-passwords interactive"
set timeout 60
expect {
        -timeout 20
        "y/N" {
                send "y\n"
                exp_continue
                }
        "Enter password *:" {
                send "${Elasticsearch_Passwd}\n"
                exp_continue
                }
        "Reenter password *:" {
                send "${Elasticsearch_Passwd}\n"
                exp_continue
                }
        "Enter password *:" {
                send "${Elasticsearch_Passwd}\n"
                exp_continue
                }
        "Reenter password *:" {
                send "${Elasticsearch_Passwd}\n"
                exp_continue
                }
        "Enter password *:" {
                send "${Elasticsearch_Passwd}\n"
                exp_continue
                }
        "Reenter password *:" {
                send "${Elasticsearch_Passwd}\n"
                exp_continue
                }
        "Enter password *:" {
                send "${Elasticsearch_Passwd}\n"
                exp_continue
                }
        "Reenter password *:" {
                send "${Elasticsearch_Passwd}\n"
                exp_continue
                }
        "Enter password *:" {
                send "${Elasticsearch_Passwd}\n"
                exp_continue
                }
        "Reenter password *:" {
                send "${Elasticsearch_Passwd}\n"
                exp
| username: Billmay表妹 | Original post link

Is this an article? You can post it to the column~