一.事件背景
在2018年项目中期阶段,有位同事误删除了某表数据,涉及数据量4w多。在业务上与其它表有依赖关系,若未匹配到数据则新增,否则修改,对业务影响不大。 近段时间前同事问我MySQL误操作 UPDATE/DELETE 后怎么恢复原有数据(感觉他们又误操作了),于是写下处理过程,给自己打一个预防针。
二.修复方法
在人工手动进行一些数据库变更操作的时候(比方说数据修正),尤其是一些不可控的批量更新或删除,通常都建议备份后操作。不过不怕万一,就怕一万,有备无患总是好的。在线上或者测试环境误操作导致数据被删除或者更新后,想要恢复,一般有两种方法。
方法1
利用最近的全量备份+增量binlog备份,恢复到误操作之前的状态,但是随着数据量的增大,binlog的增多,恢复起来很费时。
方法2
如果binlog的格式为row,那么就可以将binlog解析出来生成反向的原始SQL 以下是利用方法二写的一个python脚本rollback_sql.py,可利用此脚本生成反向的原始SQL。
三.恢复思路
- 在误操作后确认MySQL当前写binlog文件
- 解析binlog,获取操作事务的起始POSID、结束POSID
- 获取到事务的操作语句,对数据进行预筛选、解析、格式化生成sql
- 如果为动态数据,修复成本得酌情考虑,当然只是针对UPDATE的情况下,如果误操作DELETE,不管什么场景都需要进行修复。
四.恢复前提
- MySQL需要开启binlog
- MySQL binlog必须是row模式,row会记录每一条数据被修改的记录并且把修改的值体现出来
- 脚本适用于python3环境
五.测试案例(UPDATE)
1.1 原表数据
mysql> select belong,count(belong) from tb_ec_info group by belong;
+------------+---------------+
| belong | count(belong) |
+------------+---------------+
| NULL | 0 |
| 11_Plat | 1 |
| 7Plat | 2 |
| BJ_IDC | 1513 |
| FifthPlat | 855 |
| fivePlat | 1 |
| onlinePlat | 9 |
| orderPlat | 74 |
| secPlat | 125821 |
| thirdPlat | 66 |
| vipPlat01 | 11 |
+------------+---------------+
11 rows in set (0.19 sec)
1.2 修改后数据
mysql> update tb_ec_info set belong='2Plat' where belong='secPlat';
Query OK, 125821 rows affected (4.13 sec)
Rows matched: 125821 Changed: 125821 Warnings: 0
1.3 确认binary写日志
mysql> show binary logs;
+------------------+-----------+
| Log_name | File_size |
+------------------+-----------+
| mysql-bin.000001 | 177 |
| mysql-bin.000002 | 1043 |
| mysql-bin.000003 | 14284624 |
| mysql-bin.000004 | 229673622 |
+------------------+-----------+
4 rows in set (0.00 sec)
1.4 mysqlbinlog获取posid
#220121 13:12:32 server id 1 end_log_pos 161586335 CRC32 0x5824b9ee Query thread_id=304 exec_time=0 error_code=0
SET TIMESTAMP=1642741952/*!*/;
BEGIN
/*!*/;
# at 161586335
#220121 13:12:32 server id 1 end_log_pos 161586527 CRC32 0x4c3d5de5 Table_map: `wtc`.`tb_ec_info` mapped to number 234
.........................
.........................
.........................
.........................
### @45='1' /* STRING(3) meta=65027 nullable=1 is_null=0 */
### @46='w3' /* VARSTRING(30) meta=30 nullable=1 is_null=0 */
### @47='1' /* VARSTRING(27) meta=27 nullable=1 is_null=0 */
# at 229673591
#220121 13:12:32 server id 1 end_log_pos 229673622 CRC32 0x040e40b3 Xid = 235800
COMMIT/*!*/;
BEGIN -- COMMIT 完整事务,binlog过大时根据经验值来寻找posid
1.5 脚本解析binlog
[root@iZ2ze2yo6lh2hmy6pm4wafZ scripts]# python rollback_sql.py -f ../binlog/mysql-bin.000004 -u root -pwtc.com -P38383 -d wtc --start-position 161586335 --stop-position 229673591 --rollback-type UPDATE --only-primary 1
正在获取参数.....
2022-01-21 13:34:20 开始解析binlog.....
2022-01-21 13:34:38 解析完毕binlog.....
正在初始化列名.....
正在开始拼凑sql.....
done!............................
生成time_rollback.sql
1.6 数据导入
[root@iZ2ze2yo6lh2hmy6pm4wafZ scripts]# mysql -pwtc.com
mysql: [Warning] Using a password on the command line interface can be insecure.
Welcome to the MySQL monitor. Commands end with ; or \g.
Your MySQL connection id is 308
Server version: 5.7.36-log MySQL Community Server (GPL)
Copyright (c) 2000, 2021, Oracle and/or its affiliates.
Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.
Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.
mysql> source /opt/mysql/scripts/20220121133420_rollback.sql;
Query OK, 1 row affected (0.00 sec)
Rows matched: 1 Changed: 1 Warnings: 0
Query OK, 1 row affected (0.01 sec)
Rows matched: 1 Changed: 1 Warnings: 0
Query OK, 1 row affected (0.00 sec)
Rows matched: 1 Changed: 1 Warnings: 0
Query OK, 1 row affected (0.01 sec)
Rows matched: 1 Changed: 1 Warnings: 0
必须在测试环境做好验证
1.7 数据确认
mysql> select belong,count(belong) from tb_ec_info group by belong;
+------------+---------------+
| belong | count(belong) |
+------------+---------------+
| NULL | 0 |
| 11_Plat | 1 |
| 2Plat | 1 |
| 7Plat | 2 |
| BJ_IDC | 1513 |
| FifthPlat | 855 |
| fivePlat | 1 |
| onlinePlat | 9 |
| orderPlat | 74 |
| secPlat | 125821 |
| thirdPlat | 66 |
| vipPlat01 | 11 |
+------------+---------------+
六. 脚本案例
1. 脚本容器性目前还不是很高,针对binlog "##"进行分割,若字段value带有分割字符串则会有问题,如:密码、敏感数据等。但能满足大部分业务场景
2. SQL为一批次一条,若有百万级脏数据,则耗时是个问题,需要再次处理下
#!/usr/bin/python
# -*- coding: utf-8 -*-
# author:wtc
"""
Created on 2022/1/20 15:46
"""
# !/bin/env python
# -*- coding:utf-8 -*-
import os
import re
import sys
import time
import getopt
import pymysql
# genernal
host = '127.0.0.1'
user = ''
password = ''
port = 3306
start_position = '4'
stop_position = '1362365054815265219931015'
database = ''
mysqlbinlog_bin = 'mysqlbinlog -v'
binlog = ''
fileContent = ''
output = time.strftime("%Y%m%d%H%M%S") + '_rollback.sql'
only_primary = 0
rollback_type = ''
# mysql operation
class mysqlOperation:
"""
创建mysql连接对象
"""
def __init__(self,host,user,port,password):
self.host = host
self.port = port
self.user = user
self.password = password
self.code = "True"
self.char = "utf8"
self.MysqlClient=pymysql.connect(host=self.host,user=self.user,passwd=self.password,port=self.port,use_unicode=self.code,charset=self.char)
self.MyCursor = self.MysqlClient.cursor()
def select(self,selectSQL):
task_result = self.MyCursor.execute(selectSQL)
Result=self.MyCursor.fetchall()
return Result
def update(self,updateSQL):
pass
def delete(self,deleteSQL):
pass
def insert(self,insertSQL):
pass
def close(self):
self.MyCursor.close()
# ----------------------------------------------------------------------------------------
# 功能:生成binlog解析文件
# ----------------------------------------------------------------------------------------
def getopts_parse_binlog():
global host
global user
global password
global port
global fileContent
global output
global binlog
global start_position
global stop_position
global database
global only_primary
global rollback_type
try:
options, args = getopt.getopt(sys.argv[1:], "f:o:h:u:p:P:d:", \
[ \
"help", \
"binlog=", \
"output=", \
"host=", \
"user=", \
"password=", \
"port=", \
"start-position=", \
"stop-position=", \
"database=", \
"only-primary=", \
"rollback-type=" \
])
except getopt.GetoptError:
print("参数输入有误!!!!!")
options = []
if options == [] or options[0][0] in ("--help"):
usage()
sys.exit()
print("正在获取参数.....")
for name, value in options:
if name == "-f" or name == "--binlog":
binlog = value
if name == "-o" or name == "--output":
output = value
if name == "-h" or name == "--host":
host = value
if name == "-u" or name == "--user":
user = value
if name == "-p" or name == "--password":
password = value
if name == "-P" or name == "--port":
port = value
if name == "--start-position":
start_position = value
if name == "--stop-position":
stop_position = value
if name == "-d" or name == "--database":
database = value
if name == "--only-primary":
only_primary = value
if name == "--rollback-type":
rollback_type = value
if binlog == '':
print ("错误:请指定binlog文件名!")
usage()
if user == '':
print ("错误:请指定用户名!")
usage()
if password == '':
print ("错误:请指定密码!")
usage()
if start_position == '':
print ("错误:请指定start-position!")
usage()
if stop_position == '':
print ("错误:请指定stop-position!")
usage()
if database == '':
print ("错误:请指定数据库!")
usage()
if rollback_type == '':
print ("错误:请指定回滚类型")
print(str(time.strftime("%Y-%m-%d %H:%M:%S")) + " 开始解析binlog.....")
fileContent = os.popen("\
{} {} \
--base64-output=DECODE-ROWS \
--start-position={} \
--stop-position={} | \
grep -B 2 '###' | \
sed -e 's/### //g' -e 's/^UPDATE/##UPDATE/g' -e 's/^DELETE/##DELETE/g'" \
.format(mysqlbinlog_bin, binlog, start_position, stop_position)).read()
print(str(time.strftime("%Y-%m-%d %H:%M:%S")) + " 解析完毕binlog.....")
# ----------------------------------------------------------------------------------------
# 功能:初始化binlog里的所有表名和列名,用全局字典result_dict来储存每个表有哪些列
# ----------------------------------------------------------------------------------------
def init_col_name():
global result_dict
global pri_dict
global fileContent
result_dict = {}
pri_dict= {}
table_list = re.findall('`.*`\\.`.*`', fileContent)
table_list = list(set(table_list))
print("正在初始化列名.....")
for table in table_list:
database_name = table.split('.')[0].replace('`', '')
table_name = table.split('.')[1].replace('`', '')
# 连接数据库获取列和列id
try:
conn = mysqlOperation(host, user, int(port), password)
result = conn.select("\
select \
ordinal_position,column_name \
from \
information_schema.columns \
where table_schema='{}' and table_name='{}'".format(database_name, table_name))
result_dict[database_name + '.' + table_name] = result
pri_key = conn.select("\
select \
ordinal_position,column_name \
from \
information_schema.columns \
where table_schema='{}' and table_name='{}' and column_key='PRI'".format(database_name,table_name))
pri_dict[database_name + '.' + table_name] = pri_key
conn.close()
except Exception as e:
try:
print("Error %d:%s" % (e.args[0], e.args[1]))
except IndexError:
print("MySQL Error:%s" % str(e))
sys.exit()
# ----------------------------------------------------------------------------------------
# 功能:拼凑回滚sql,逆序
# ----------------------------------------------------------------------------------------
def gen_rollback_sql():
fileOutput = open(output, 'w')
# 先将文件根据'--'分块,每块代表一个sql
area_list = fileContent.split('##')
print("正在开始拼凑sql.....")
for sql in area_list:
try:
if sql.split()[0] == 'UPDATE' and rollback_type == "UPDATE":
rollback_sql = re.sub('SET\n', '#SET#\n', sql, 1)
rollback_sql = re.sub('WHERE\n', 'SET\n', rollback_sql, 1)
rollback_sql = re.sub('#SET#\n', 'WHERE\n', rollback_sql, 1)
tablename_pos = 1
table_name = rollback_sql.split()[tablename_pos].replace('`', '')
# 因为第一个列前面没有逗号或者and,所以单独替换
rollback_sql = rollback_sql.replace('@1=', result_dict[table_name][0][1] + '=')
# 获取该sql中的所有列并替换
col_list = sorted(list(set(re.findall('@\d+=', rollback_sql))))
for col in col_list:
col = col.replace("=","")
i = int(col[1:].replace("=","")) - 1
rollback_sql = rollback_sql.replace( \
col + '=', ',' + result_dict[table_name][i][1] + '=',\
1).replace(col + '=',\
'AND ' + result_dict[table_name][i][1] + '=')
if int(only_primary) != 0 and len(pri_dict) != 0:
sub_where = ''
for primary in pri_dict[table_name]:
primary_name = primary[1]
for condition in rollback_sql.split('WHERE', 1)[1].splitlines():
if re.compile('^\s*' + primary_name).match(condition) or re.compile(
'^\s*AND\s*' + primary_name).match(condition):
sub_where = sub_where + condition + '\n'
sub_where = re.sub('^\s*AND', '', sub_where, 1)
rollback_sql = rollback_sql.split('WHERE', 1)[0] + 'WHERE\n' + sub_where
fileOutput.write(rollback_sql + ";" + "\n")
else:
pass
if sql.split()[0] == "DELETE" and rollback_type == "DELETE":
rollback_sql = re.sub('^DELETE FROM', 'INSERT INTO', sql, 1)
rollback_sql = re.sub('WHERE\n', 'SET\n', rollback_sql, 1)
tablename_pos = 2
table_name = rollback_sql.split()[tablename_pos].replace('`', '')
# 因为第一个列前面没有逗号或者and,所以单独替换
rollback_sql = rollback_sql.replace('@1=', result_dict[table_name][0][1] + '=')
# 获取该sql中的所有列
col_list = sorted(list(set(re.findall('@\d+=', rollback_sql))))
for col in col_list:
col = col.replace("=","")
i = int(col[1:]) - 1
rollback_sql = rollback_sql.replace(col + '=', ',' + result_dict[table_name][i][1] + '=', 1)
rollback_sql = re.sub('\n$', ';\n', rollback_sql)
fileOutput.write(rollback_sql + ";" + "\n")
except Exception as e:
print("Error:%s" % str(e))
sys.exit()
print ("done!............................")
def usage():
help_info = """==========================================================================================
Command line options :
--help # OUT : print help info.
-f, --binlog # IN : binlog file path. (required)
-o, --outfile # OUT : output rollback sql file. (time+rollback.sql)
-h, --host # IN : host. (default '127.0.0.1')
-u, --user # IN : user. (required)
-p, --password # IN : password. (required)
-P, --port # IN : port. (default 3306)
--start-position # IN : start position. (default '4')
--stop-position # IN : stop position. (default '1362365054815265219931015')
-d, --database # IN : database (required).
--only-primary # IN : rimary key (default 0).
--rollback-type # IN : UPDATE or DELETE (required).
Sample :
shell> python binlog_rollback.py -f 'mysql-bin.000001' -o '/tmp/rollback.sql' -h 192.168.0.1 -u 'user' -p 'pwd' -P 3307 -d dbname
=========================================================================================="""
print(help_info)
sys.exit()
if __name__ == '__main__':
getopts_parse_binlog()
init_col_name()
gen_rollback_sql()