一.事件背景

在2018年项目中期阶段,有位同事误删除了某表数据,涉及数据量4w多。在业务上与其它表有依赖关系,若未匹配到数据则新增,否则修改,对业务影响不大。 近段时间前同事问我MySQL误操作 UPDATE/DELETE 后怎么恢复原有数据(感觉他们又误操作了),于是写下处理过程,给自己打一个预防针。

二.修复方法

在人工手动进行一些数据库变更操作的时候(比方说数据修正),尤其是一些不可控的批量更新或删除,通常都建议备份后操作。不过不怕万一,就怕一万,有备无患总是好的。在线上或者测试环境误操作导致数据被删除或者更新后,想要恢复,一般有两种方法。

  • 方法1

    利用最近的全量备份+增量binlog备份,恢复到误操作之前的状态,但是随着数据量的增大,binlog的增多,恢复起来很费时。

  • 方法2

    如果binlog的格式为row,那么就可以将binlog解析出来生成反向的原始SQL 以下是利用方法二写的一个python脚本rollback_sql.py,可利用此脚本生成反向的原始SQL。

三.恢复思路

  1. 在误操作后确认MySQL当前写binlog文件
  2. 解析binlog,获取操作事务的起始POSID、结束POSID
  3. 获取到事务的操作语句,对数据进行预筛选、解析、格式化生成sql
  4. 如果为动态数据,修复成本得酌情考虑,当然只是针对UPDATE的情况下,如果误操作DELETE,不管什么场景都需要进行修复。

四.恢复前提

  1. MySQL需要开启binlog
  2. MySQL binlog必须是row模式,row会记录每一条数据被修改的记录并且把修改的值体现出来
  3. 脚本适用于python3环境

五.测试案例(UPDATE)

1.1 原表数据

mysql> select belong,count(belong) from tb_ec_info group by belong;
+------------+---------------+
| belong     | count(belong) |
+------------+---------------+
| NULL       |             0 |
| 11_Plat    |             1 |
| 7Plat      |             2 |
| BJ_IDC     |          1513 |
| FifthPlat  |           855 |
| fivePlat   |             1 |
| onlinePlat |             9 |
| orderPlat  |            74 |
| secPlat    |        125821 |
| thirdPlat  |            66 |
| vipPlat01  |            11 |
+------------+---------------+
11 rows in set (0.19 sec)

1.2 修改后数据

mysql> update tb_ec_info set belong='2Plat' where belong='secPlat';
Query OK, 125821 rows affected (4.13 sec)
Rows matched: 125821  Changed: 125821  Warnings: 0

1.3 确认binary写日志

mysql> show binary logs;
+------------------+-----------+
| Log_name         | File_size |
+------------------+-----------+
| mysql-bin.000001 |       177 |
| mysql-bin.000002 |      1043 |
| mysql-bin.000003 |  14284624 |
| mysql-bin.000004 | 229673622 |
+------------------+-----------+
4 rows in set (0.00 sec)

1.4 mysqlbinlog获取posid

#220121 13:12:32 server id 1  end_log_pos 161586335 CRC32 0x5824b9ee    Query   thread_id=304   exec_time=0     error_code=0
SET TIMESTAMP=1642741952/*!*/;
BEGIN
/*!*/;
# at 161586335
#220121 13:12:32 server id 1  end_log_pos 161586527 CRC32 0x4c3d5de5    Table_map: `wtc`.`tb_ec_info` mapped to number 234
.........................
.........................
.........................
.........................
###   @45='1' /* STRING(3) meta=65027 nullable=1 is_null=0 */
###   @46='w3' /* VARSTRING(30) meta=30 nullable=1 is_null=0 */
###   @47='1' /* VARSTRING(27) meta=27 nullable=1 is_null=0 */
# at 229673591
#220121 13:12:32 server id 1  end_log_pos 229673622 CRC32 0x040e40b3    Xid = 235800
COMMIT/*!*/;
BEGIN -- COMMIT 完整事务,binlog过大时根据经验值来寻找posid

1.5 脚本解析binlog

[root@iZ2ze2yo6lh2hmy6pm4wafZ scripts]# python rollback_sql.py -f ../binlog/mysql-bin.000004 -u root -pwtc.com -P38383 -d wtc --start-position 161586335 --stop-position 229673591 --rollback-type UPDATE --only-primary 1
正在获取参数.....
2022-01-21 13:34:20 开始解析binlog.....
2022-01-21 13:34:38 解析完毕binlog.....
正在初始化列名.....
正在开始拼凑sql.....
done!............................
生成time_rollback.sql

1.6 数据导入

[root@iZ2ze2yo6lh2hmy6pm4wafZ scripts]# mysql -pwtc.com
mysql: [Warning] Using a password on the command line interface can be insecure.
Welcome to the MySQL monitor.  Commands end with ; or \g.
Your MySQL connection id is 308
Server version: 5.7.36-log MySQL Community Server (GPL)

Copyright (c) 2000, 2021, Oracle and/or its affiliates.

Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.

Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.

mysql> source /opt/mysql/scripts/20220121133420_rollback.sql;
Query OK, 1 row affected (0.00 sec)
Rows matched: 1  Changed: 1  Warnings: 0

Query OK, 1 row affected (0.01 sec)
Rows matched: 1  Changed: 1  Warnings: 0

Query OK, 1 row affected (0.00 sec)
Rows matched: 1  Changed: 1  Warnings: 0

Query OK, 1 row affected (0.01 sec)
Rows matched: 1  Changed: 1  Warnings: 0
必须在测试环境做好验证

1.7 数据确认

mysql> select belong,count(belong) from tb_ec_info group by belong;
+------------+---------------+
| belong     | count(belong) |
+------------+---------------+
| NULL       |             0 |
| 11_Plat    |             1 |
| 2Plat      |             1 |
| 7Plat      |             2 |
| BJ_IDC     |          1513 |
| FifthPlat  |           855 |
| fivePlat   |             1 |
| onlinePlat |             9 |
| orderPlat  |            74 |
| secPlat    |        125821 |
| thirdPlat  |            66 |
| vipPlat01  |            11 |
+------------+---------------+

六. 脚本案例

1. 脚本容器性目前还不是很高,针对binlog "##"进行分割,若字段value带有分割字符串则会有问题,如:密码、敏感数据等。但能满足大部分业务场景

2. SQL为一批次一条,若有百万级脏数据,则耗时是个问题,需要再次处理下

#!/usr/bin/python
# -*- coding: utf-8 -*-
# author:wtc
"""
    Created on 2022/1/20 15:46
"""
# !/bin/env python
# -*- coding:utf-8 -*-

import os
import re
import sys
import time
import getopt
import pymysql

# genernal
host = '127.0.0.1'
user = ''
password = ''
port = 3306
start_position = '4'
stop_position = '1362365054815265219931015'
database = ''
mysqlbinlog_bin = 'mysqlbinlog -v'
binlog = ''
fileContent = ''
output = time.strftime("%Y%m%d%H%M%S") + '_rollback.sql'
only_primary = 0
rollback_type = ''

# mysql operation
class mysqlOperation:
    """
        创建mysql连接对象
    """
    def __init__(self,host,user,port,password):
        self.host = host
        self.port = port
        self.user = user
        self.password = password
        self.code = "True"
        self.char = "utf8"
        self.MysqlClient=pymysql.connect(host=self.host,user=self.user,passwd=self.password,port=self.port,use_unicode=self.code,charset=self.char)
        self.MyCursor = self.MysqlClient.cursor()

    def select(self,selectSQL):
        task_result = self.MyCursor.execute(selectSQL)
        Result=self.MyCursor.fetchall()
        return Result

    def update(self,updateSQL):
        pass

    def delete(self,deleteSQL):
        pass

    def insert(self,insertSQL):
        pass

    def close(self):
        self.MyCursor.close()

# ----------------------------------------------------------------------------------------
# 功能:生成binlog解析文件
# ----------------------------------------------------------------------------------------
def getopts_parse_binlog():
    global host
    global user
    global password
    global port
    global fileContent
    global output
    global binlog
    global start_position
    global stop_position
    global database
    global only_primary
    global rollback_type
    try:
        options, args = getopt.getopt(sys.argv[1:], "f:o:h:u:p:P:d:", \
            [ \
                "help", \
                "binlog=", \
                "output=", \
                "host=", \
                "user=", \
                "password=", \
                "port=", \
                "start-position=", \
                "stop-position=", \
                "database=", \
                "only-primary=", \
                "rollback-type=" \
            ])
    except getopt.GetoptError:
        print("参数输入有误!!!!!")
        options = []
    if options == [] or options[0][0] in ("--help"):
        usage()
        sys.exit()
    print("正在获取参数.....")
    for name, value in options:
        if name == "-f" or name == "--binlog":
            binlog = value
        if name == "-o" or name == "--output":
            output = value
        if name == "-h" or name == "--host":
            host = value
        if name == "-u" or name == "--user":
            user = value
        if name == "-p" or name == "--password":
            password = value
        if name == "-P" or name == "--port":
            port = value
        if name == "--start-position":
            start_position = value
        if name == "--stop-position":
            stop_position = value
        if name == "-d" or name == "--database":
            database = value
        if name == "--only-primary":
            only_primary = value
        if name == "--rollback-type":
            rollback_type = value

    if binlog == '':
        print ("错误:请指定binlog文件名!")
        usage()
    if user == '':
        print ("错误:请指定用户名!")
        usage()
    if password == '':
        print ("错误:请指定密码!")
        usage()
    if start_position == '':
        print ("错误:请指定start-position!")
        usage()
    if stop_position == '':
        print ("错误:请指定stop-position!")
        usage()
    if database == '':
        print ("错误:请指定数据库!")
        usage()
    if rollback_type == '':
        print ("错误:请指定回滚类型")

    print(str(time.strftime("%Y-%m-%d %H:%M:%S")) + " 开始解析binlog.....")
    fileContent = os.popen("\
        {} {} \
        --base64-output=DECODE-ROWS \
        --start-position={} \
        --stop-position={} | \
        grep -B 2 '###' | \
        sed -e 's/### //g' -e 's/^UPDATE/##UPDATE/g' -e 's/^DELETE/##DELETE/g'" \
        .format(mysqlbinlog_bin, binlog, start_position, stop_position)).read()
    print(str(time.strftime("%Y-%m-%d %H:%M:%S")) + " 解析完毕binlog.....")

# ----------------------------------------------------------------------------------------
# 功能:初始化binlog里的所有表名和列名,用全局字典result_dict来储存每个表有哪些列
# ----------------------------------------------------------------------------------------
def init_col_name():
    global result_dict
    global pri_dict
    global fileContent
    result_dict = {}
    pri_dict= {}
    table_list = re.findall('`.*`\\.`.*`', fileContent)
    table_list = list(set(table_list))
    print("正在初始化列名.....")
    for table in table_list:
        database_name = table.split('.')[0].replace('`', '')
        table_name = table.split('.')[1].replace('`', '')
        # 连接数据库获取列和列id
        try:
            conn = mysqlOperation(host, user, int(port), password)
            result = conn.select("\
                select \
                    ordinal_position,column_name \
                from \
                    information_schema.columns \
                where table_schema='{}' and table_name='{}'".format(database_name, table_name))
            result_dict[database_name + '.' + table_name] = result
            pri_key = conn.select("\
                select \
                    ordinal_position,column_name \
                from \
                    information_schema.columns \
                where table_schema='{}' and table_name='{}' and column_key='PRI'".format(database_name,table_name))
            pri_dict[database_name + '.' + table_name] = pri_key
            conn.close()
        except Exception as e:
            try:
                print("Error %d:%s" % (e.args[0], e.args[1]))
            except IndexError:
                print("MySQL Error:%s" % str(e))
            sys.exit()

# ----------------------------------------------------------------------------------------
# 功能:拼凑回滚sql,逆序
# ----------------------------------------------------------------------------------------
def gen_rollback_sql():
    fileOutput = open(output, 'w')
    # 先将文件根据'--'分块,每块代表一个sql
    area_list = fileContent.split('##')
    print("正在开始拼凑sql.....")
    for sql in area_list:
        try:
            if sql.split()[0] == 'UPDATE' and rollback_type == "UPDATE":
                rollback_sql = re.sub('SET\n', '#SET#\n', sql, 1)
                rollback_sql = re.sub('WHERE\n', 'SET\n', rollback_sql, 1)
                rollback_sql = re.sub('#SET#\n', 'WHERE\n', rollback_sql, 1)
                tablename_pos = 1
                table_name = rollback_sql.split()[tablename_pos].replace('`', '')
                # 因为第一个列前面没有逗号或者and,所以单独替换
                rollback_sql = rollback_sql.replace('@1=', result_dict[table_name][0][1] + '=')
                # 获取该sql中的所有列并替换
                col_list = sorted(list(set(re.findall('@\d+=', rollback_sql))))
                for col in col_list:
                    col = col.replace("=","")
                    i = int(col[1:].replace("=","")) - 1
                    rollback_sql = rollback_sql.replace( \
                        col + '=', ',' + result_dict[table_name][i][1] + '=',\
                        1).replace(col + '=',\
                        'AND ' + result_dict[table_name][i][1] + '=')
                if int(only_primary) != 0 and len(pri_dict) != 0:
                    sub_where = ''
                    for primary in pri_dict[table_name]:
                        primary_name = primary[1]
                        for condition in rollback_sql.split('WHERE', 1)[1].splitlines():
                            if re.compile('^\s*' + primary_name).match(condition) or re.compile(
                                '^\s*AND\s*' + primary_name).match(condition):
                                sub_where = sub_where + condition + '\n'
                        sub_where = re.sub('^\s*AND', '', sub_where, 1)
                        rollback_sql = rollback_sql.split('WHERE', 1)[0] + 'WHERE\n' + sub_where
                fileOutput.write(rollback_sql + ";" + "\n")
            else:
                pass
            if sql.split()[0] == "DELETE" and rollback_type == "DELETE":
                rollback_sql = re.sub('^DELETE FROM', 'INSERT INTO', sql, 1)
                rollback_sql = re.sub('WHERE\n', 'SET\n', rollback_sql, 1)
                tablename_pos = 2
                table_name = rollback_sql.split()[tablename_pos].replace('`', '')
                # 因为第一个列前面没有逗号或者and,所以单独替换
                rollback_sql = rollback_sql.replace('@1=', result_dict[table_name][0][1] + '=')
                # 获取该sql中的所有列
                col_list = sorted(list(set(re.findall('@\d+=', rollback_sql))))
                for col in col_list:
                    col = col.replace("=","")
                    i = int(col[1:]) - 1
                    rollback_sql = rollback_sql.replace(col + '=', ',' + result_dict[table_name][i][1] + '=', 1)
                    rollback_sql = re.sub('\n$', ';\n', rollback_sql)
                fileOutput.write(rollback_sql + ";" + "\n")
        except Exception as e:
            print("Error:%s" % str(e))
            sys.exit()
    print ("done!............................")


def usage():
    help_info = """==========================================================================================
Command line options :
    --help                  # OUT : print help info.
    -f, --binlog            # IN  : binlog file path. (required)
    -o, --outfile           # OUT : output rollback sql file. (time+rollback.sql)
    -h, --host              # IN  : host. (default '127.0.0.1')
    -u, --user              # IN  : user. (required)
    -p, --password          # IN  : password. (required)
    -P, --port              # IN  : port. (default 3306)
    --start-position        # IN  : start position. (default '4')
    --stop-position         # IN  : stop position. (default '1362365054815265219931015')
    -d, --database          # IN  : database (required).
    --only-primary          # IN  : rimary key (default 0).
    --rollback-type         # IN  : UPDATE or DELETE (required).

Sample :
   shell> python binlog_rollback.py -f 'mysql-bin.000001' -o '/tmp/rollback.sql' -h 192.168.0.1 -u 'user' -p 'pwd' -P 3307 -d dbname
=========================================================================================="""
    print(help_info)
    sys.exit()

if __name__ == '__main__':
    getopts_parse_binlog()
    init_col_name()
    gen_rollback_sql()

Copyright & TianCiwang 2021 all right reserved,powered by Gitbook修改时间: 2022-01-21 14:25:41

results matching ""

    No results matching ""

    results matching ""

      No results matching ""