Note:
This topic has been translated from a Chinese forum by GPT and might contain errors.Original topic: tidb用cdc同步变更记录到es里面去查询。给开发排查问题
[TiDB Usage Environment] Production Environment
[TiDB Version]
For development troubleshooting, you need to parse MySQL’s binlog logs. Setting up this platform will save you effort.
[Reproduction Path] What operations were performed to encounter the issue
[Encountered Issue: Issue Phenomenon and Impact]
[Resource Configuration]
[Attachments: Screenshots/Logs/Monitoring]
First, let me explain how to search in TiDB.
Add the TiDB CDC component in front of the existing TiDB cluster.
Note: To use TiCDC, the TiDB version must be upgraded to v4.0.6 or above.
- TiCDC Configuring Data Synchronization to Kafka
This article continues from the previous one. Let’s first look at the current cluster status:
We are still using the CDC-server expanded in the previous article.
In the previous article, we created a task to synchronize data from TiDB to MySQL. Now, let’s create another synchronization task to Kafka:
./cdc cli changefeed create --pd=http://192.168.40.160:2379 --sink-uri='kafka://192.168.40.1:9092/tidb-cdc?kafka-version=2.6.0&partition-num=1&max-message-bytes=67108864&replication-factor=1&protocol=canal-json' --changefeed-id="replication-task-2"
- tidb-cdc: indicates the topic
- kafka-version: downstream Kafka version number (optional, default is 2.4.0, currently supports a minimum version of 0.11.0.2)
- kafka-client-id: specifies the Kafka client ID for the synchronization task (optional, default is TiCDC_sarama_producer_sync_task_ID)
- partition-num: number of downstream Kafka partitions (optional, cannot exceed the actual number of partitions. If not filled, it will automatically fetch the partition number)
- protocol: indicates the message protocol output to Kafka, optional values are default, canal, avro, maxwell, canal-json (default is default)
- max-message-bytes: maximum data amount sent to Kafka broker each time (optional, default is 64MB)
- replication-factor: number of Kafka message replicas (optional, default is 1)
- ca: path to the CA certificate file required to connect to the downstream Kafka instance (optional)
- cert: path to the certificate file required to connect to the downstream Kafka instance (optional)
- key: path to the certificate key file required to connect to the downstream Kafka instance (optional)
Successfully created.
Use the following command to see all tasks:
./cdc cli changefeed list --pd=http://192.168.40.160:2379
Or check the details of our task:
./cdc cli changefeed query --pd=http://192.168.40.160:2379 --changefeed-id=replication-task-2
Then start a Kafka single-node task to receive data stored in Kafka by TiDB CDC:
wget https://downloads.apache.org/kafka/3.4.0/kafka_2.12-3.4.0.tgz
tar zxf kafka_2.12-3.4.0.tgz
cd kafka_2.12-3.4.0
First, start Zookeeper:
nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
Then start Kafka:
nohup bin/kafka-server-start.sh config/server.properties &
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic tidb-cdc
After starting, install an ELK platform with one click.
Download Logstash:
wget https://artifacts.elastic.co/downloads/logstash/logstash-7.0.0.tar.gz
Write a Logstash configuration file to transfer Kafka data to ES. Note that ES needs to be configured with an account and password:
input {
kafka {
bootstrap_servers => "127.0.0.1:9092"
topics => ["tidb-cdc"]
codec => json {
charset => "UTF-8"
}
}
# If there are other data sources, add them directly below
}
output {
# Processed logs are stored in local files
file {
path => "/config-dir/test.log"
flush_interval => 0
}
# Processed logs are stored in ES
elasticsearch {
user => "elastic"
password => "oscardba"
hosts => "172.21.10.147:9200"
index => "tidb-%{+YYYY.MM.dd}"
}
}
Then start Logstash:
logstash -f es.conf
Finally, create an index on the Kibana platform to query SQL change records.
Lastly, create a TiDB collection index on the platform.
Attached is a one-click ELK creation script:
#!/bin/bash
# Date: 2019-5-20 13:14:00
# Author Blog:
# Auto Install ELK log analysis platform
User="elk"
Elasticsearch_User="elastic"
Elasticsearch_Passwd="oscardba"
IPADDR=$(hostname -I | awk '{print $1}')
Elasticsearch_DIR="/data/elasticsearch"
Kafka_IP=$(hostname -I | awk '{print $1}')
Zookeeper_IP=$(hostname -I | awk '{print $1}')
Elasticsearch_IP=$(hostname -I | awk '{print $1}')
# Define JDK path variables
JDK_URL=https://mirrors.yangxingzhen.com/jdk
JDK_File=jdk-11.0.1_linux-x64_bin.tar.gz
JDK_File_Dir=jdk-11.0.1
JDK_Dir=/usr/local/jdk-11.0.1
# Define Redis path variables
Redis_URL=http://download.redis.io/releases
Redis_File=redis-5.0.7.tar.gz
Redis_File_Dir=redis-5.0.7
Redis_Prefix=/usr/local/redis
# Define Nginx path variables
Nginx_URL=http://nginx.org/download
Nginx_File=nginx-1.18.0.tar.gz
Nginx_File_Dir=nginx-1.18.0
Nginx_Dir=/usr/local/nginx
# Define Elasticsearch path variables
Elasticsearch_URL=https://artifacts.elastic.co/downloads/elasticsearch
Elasticsearch_File=elasticsearch-7.5.1-linux-x86_64.tar.gz
Elasticsearch_File_Dir=elasticsearch-7.5.1
Elasticsearch_Dir=/usr/local/elasticsearch
# Define Logstash path variables
Filebeat_URL=https://artifacts.elastic.co/downloads/beats/filebeat
Filebeat_File=filebeat-7.5.1-linux-x86_64.tar.gz
Filebeat_File_Dir=filebeat-7.5.1-linux-x86_64
Filebeat_Dir=/usr/local/filebeat
# Define Kafka path variables
Logstash_URL=https://artifacts.elastic.co/downloads/logstash
Logstash_File=logstash-7.5.1.tar.gz
Logstash_File_Dir=logstash-7.5.1
Logstash_Dir=/usr/local/logstash
# Define Kibana path variables
Kibana_URL=https://artifacts.elastic.co/downloads/kibana
Kibana_File=kibana-7.5.1-linux-x86_64.tar.gz
Kibana_File_Dir=kibana-7.5.1-linux-x86_64
Kibana_Dir=/usr/local/kibana
# Configure kernel parameters
cat >>/etc/security/limits.conf <<EOF
* soft nofile 65537
* hard nofile 65537
* soft nproc 65537
* hard nproc 65537
EOF
if [ $(grep -wc "4096" /etc/security/limits.d/20-nproc.conf) -eq 0 ]; then
cat >>/etc/security/limits.d/20-nproc.conf <<EOF
* soft nproc 4096
EOF
fi
cat >/etc/sysctl.conf <<EOF
net.ipv4.tcp_max_syn_backlog = 65536
net.core.netdev_max_backlog = 32768
net.core.somaxconn = 32768
net.core.wmem_default = 8388608
net.core.rmem_default = 8388608
net.core.rmem_max = 16777216
net.core.wmem_max = 16777216
net.ipv4.tcp_timestamps = 0
net.ipv4.tcp_synack_retries = 2
net.ipv4.tcp_syn_retries = 2
net.ipv4.tcp_tw_recycle = 1
net.ipv4.tcp_tw_reuse = 1
net.ipv4.tcp_mem = 94500000 915000000 927000000
net.ipv4.tcp_max_orphans = 3276800
net.ipv4.tcp_fin_timeout = 120
net.ipv4.tcp_keepalive_time = 120
net.ipv4.ip_local_port_range = 1024 65535
net.ipv4.tcp_max_tw_buckets = 30000
fs.file-max=655350
vm.max_map_count = 262144
net.core.somaxconn= 65535
net.ipv4.ip_forward = 1
net.ipv6.conf.all.disable_ipv6=1
EOF
# Apply sysctl -p to make the configuration effective
sysctl -p >/dev/null
# Create elk user
[ $(grep -wc "elk" /etc/passwd) -eq 0 ] && useradd elk >/dev/null
# Install JDK environment
java -version >/dev/null 2>&1
if [ $? -ne 0 ]; then
# Install Package
[ -f /usr/bin/wget ] || yum -y install wget >/dev/null
wget -c ${JDK_URL}/${JDK_File}
tar xf ${JDK_File}
mv ${JDK_File_Dir} ${JDK_Dir}
cat >>/etc/profile <<EOF
export JAVA_HOME=${JDK_Dir}
export CLASSPATH=\$CLASSPATH:\$JAVA_HOME/lib:\$JAVA_HOME/jre/lib
export PATH=\$JAVA_HOME/bin:\$JAVA_HOME/jre/bin:\$PATH:\$HOMR/bin
EOF
fi
# Load environment variables
source /etc/profile >/dev/null
# Install Redis
if [ ! -d ${Redis_Prefix} ]; then
[ -f /usr/bin/openssl ] || yum -y install openssl openssl-devel
yum -y install wget gcc gcc-c++
wget -c ${Redis_URL}/${Redis_File}
tar zxf ${Redis_File}
\mv ${Redis_File_Dir} ${Redis_Prefix}
cd ${Redis_Prefix} && make
if [ $? -eq 0 ]; then
echo -e "\033[32mThe Redis Install Success...\033[0m"
else
echo -e "\033[31mThe Redis Install Failed...\033[0m"
fi
else
echo -e "\033[31mThe Redis has been installed...\033[0m"
exit 1
fi
# Randomly generate password
Passwd=$(openssl rand -hex 12)
# Config Redis
ln -sf ${Redis_Prefix}/src/redis-* /usr/bin
sed -i "s/127.0.0.1/0.0.0.0/g" ${Redis_Prefix}/redis.conf
sed -i "/daemonize/s/no/yes/" ${Redis_Prefix}/redis.conf
sed -i "s/dir .*/dir \/data\/redis/" ${Redis_Prefix}/redis.conf
sed -i "s/logfile .*/logfile \/usr\/local\/redis\/redis.log/" ${Redis_Prefix}/redis.conf
sed -i '/appendonly/s/no/yes/' ${Redis_Prefix}/redis.conf
sed -i "s/# requirepass foobared/requirepass ${Passwd}/" ${Redis_Prefix}/redis.conf
echo never > /sys/kernel/mm/transparent_hugepage/enabled
sysctl vm.overcommit_memory=1
# Create data directory
[ -d /data/redis ] || mkdir -p /data/redis
# Create systemctl management configuration file
cat >/usr/lib/systemd/system/redis.service <<EOF
[Unit]
Description=Redis Server
After=network-online.target remote-fs.target nss-lookup.target
Wants=network-online.target
[Service]
Type=forking
ExecStart=/usr/bin/redis-server ${Redis_Prefix}/redis.conf
ExecStop=/usr/bin/redis-cli -h 127.0.0.1 -p 6379 shutdown
User=root
Group=root
[Install]
WantedBy=multi-user.target
EOF
# Add power on self start And Start Redis
systemctl daemon-reload
systemctl enable redis
systemctl start redis
# Install Elasticsearch
if [ ! -d ${Elasticsearch_Dir} ]; then
# Install Package
[ -f /usr/bin/wget ] || yum -y install wget >/dev/null
wget -c ${Elasticsearch_URL}/${Elasticsearch_File}
tar xf ${Elasticsearch_File}
mv ${Elasticsearch_File_Dir} ${Elasticsearch_Dir}
else
echo -e "\033[31mThe Elasticsearch Already Install...\033[0m"
exit 1
fi
# Install Kibana
if [ ! -d ${Kibana_Dir} ]; then
# Install Package
[ -f /usr/bin/wget ] || yum -y install wget >/dev/null
wget -c ${Kibana_URL}/${Kibana_File}
tar xf ${Kibana_File}
mv ${Kibana_File_Dir} ${Kibana_Dir}
else
echo -e "\033[31mThe Kibana Already Install...\033[0m"
exit 1
fi
# Configure Elasticsearch
mkdir -p ${Elasticsearch_DIR}/{data,logs}
cat >${Elasticsearch_Dir}/config/elasticsearch.yml <<EOF
# Node name
node.name: es-master
# Data storage directory, create this directory first
path.data: ${Elasticsearch_DIR}/data
# Log storage directory, create this directory first
path.logs: ${Elasticsearch_DIR}/logs
# Node IP
network.host: ${Elasticsearch_IP}
# TCP port
transport.tcp.port: 9300
# HTTP port
http.port: 9200
# List of eligible master nodes, if there are multiple master nodes, configure the corresponding master nodes
cluster.initial_master_nodes: ["${Elasticsearch_IP}:9300"]
# Whether to allow as a master node
node.master: true
# Whether to save data
node.data: true
node.ingest: false
node.ml: false
cluster.remote.connect: false
# Cross-domain
http.cors.enabled: true
http.cors.allow-origin: "*"
# Configure X-Pack
http.cors.allow-headers: Authorization
xpack.security.enabled: true
xpack.security.transport.ssl.enabled: true
EOF
# Configure Kibana
cat >${Kibana_Dir}/config/kibana.yml <<EOF
server.port: 5601
server.host: "${Elasticsearch_IP}"
elasticsearch.hosts: ["http://${Elasticsearch_IP}:9200"]
elasticsearch.username: "${Elasticsearch_User}"
elasticsearch.password: "${Elasticsearch_Passwd}"
logging.dest: ${Kibana_Dir}/logs/kibana.log
i18n.locale: "zh-CN"
EOF
# Create Kibana log directory
[ -d ${Kibana_Dir}/logs ] || mkdir ${Kibana_Dir}/logs
# Authorize ELK user to manage Elasticsearch, Kibana
chown -R ${User}.${User} ${Elasticsearch_Dir}
chown -R ${User}.${User} ${Elasticsearch_DIR}
chown -R root.root ${Kibana_Dir}
# Start Elasticsearch
# su ${User} -c "source /etc/profile >/dev/null && ${Elasticsearch_Dir}/bin/elasticsearch -d"
# Create systemctl management configuration file
cat >/usr/lib/systemd/system/elasticsearch.service <<EOF
[Unit]
Description=elasticsearch
After=network-online.target remote-fs.target nss-lookup.target
Wants=network-online.target
[Service]
LimitCORE=infinity
LimitNOFILE=655360
LimitNPROC=655360
User=${User}
Group=${User}
PIDFile=${Elasticsearch_Dir}/logs/elasticsearch.pid
ExecStart=${Elasticsearch_Dir}/bin/elasticsearch
ExecReload=/bin/kill -s HUP $MAINPID
ExecStop=/bin/kill -s TERM $MAINPID
RestartSec=30
Restart=always
PrivateTmp=true
[Install]
WantedBy=multi-user.target
EOF
# Start Elasticsearch service
systemctl daemon-reload
systemctl enable elasticsearch
systemctl start elasticsearch
# Determine if the Elasticsearch service is started, and execute the following operations only if it starts successfully
Code=""
while sleep 10
do
echo -e "\033[32m$(date +'%F %T') Waiting for Elasticsearch service to start...\033[0m"
# Get Elasticsearch service port
netstat -lntup | egrep "9200|9300" >/dev/null
if [ $? -eq 0 ]; then
Code="break"
fi
${Code}
done
# Generate Elasticsearch password
cat >/tmp/config_elasticsearch_passwd.exp <<EOF
spawn su ${User} -c "source /etc/profile >/dev/null && ${Elasticsearch_Dir}/bin/elasticsearch-setup-passwords interactive"
set timeout 60
expect {
-timeout 20
"y/N" {
send "y\n"
exp_continue
}
"Enter password *:" {
send "${Elasticsearch_Passwd}\n"
exp_continue
}
"Reenter password *:" {
send "${Elasticsearch_Passwd}\n"
exp_continue
}
"Enter password *:" {
send "${Elasticsearch_Passwd}\n"
exp_continue
}
"Reenter password *:" {
send "${Elasticsearch_Passwd}\n"
exp_continue
}
"Enter password *:" {
send "${Elasticsearch_Passwd}\n"
exp_continue
}
"Reenter password *:" {
send "${Elasticsearch_Passwd}\n"
exp_continue
}
"Enter password *:" {
send "${Elasticsearch_Passwd}\n"
exp_continue
}
"Reenter password *:" {
send "${Elasticsearch_Passwd}\n"
exp_continue
}
"Enter password *:" {
send "${Elasticsearch_Passwd}\n"
exp_continue
}
"Reenter password *:" {
send "${Elasticsearch_Passwd}\n"
exp