【实用工具】用Python将Excel表格数据一键导入MySQL数据库
我来教你写一个实用的Python工具一键将Excel数据导入MySQL数据库。完整代码实现import pandas as pd import pymysql from sqlalchemy import create_engine import os import sys from datetime import datetime class ExcelToMySQL: def __init__(self, hostlocalhost, userroot, passwordyour_password, databasetest_db): 初始化数据库连接 :param host: 数据库地址 :param user: 用户名 :param password: 密码 :param database: 数据库名 self.db_config { host: host, user: user, password: password, database: database, charset: utf8mb4 } # 使用SQLAlchemy创建引擎更推荐的方式 self.engine create_engine(fmysqlpymysql://{user}:{password}{host}/{database}?charsetutf8mb4) # 传统连接方式用于执行DDL语句 self.connection None self.cursor None def connect(self): 建立数据库连接 try: self.connection pymysql.connect(**self.db_config) self.cursor self.connection.cursor() print(✅ 数据库连接成功) return True except Exception as e: print(f❌ 数据库连接失败: {e}) return False def close(self): 关闭连接 if self.cursor: self.cursor.close() if self.connection: self.connection.close() print( 数据库连接已关闭) def read_excel(self, file_path, sheet_name0): 读取Excel文件 :param file_path: Excel文件路径 :param sheet_name: 工作表名称或索引默认第一个sheet :return: DataFrame对象 try: # 判断文件类型 if file_path.endswith(.xlsx): df pd.read_excel(file_path, sheet_namesheet_name, engineopenpyxl) elif file_path.endswith(.xls): df pd.read_excel(file_path, sheet_namesheet_name, enginexlrd) elif file_path.endswith(.csv): df pd.read_csv(file_path, encodingutf-8) else: print(❌ 不支持的文件格式请使用.xlsx、.xls或.csv文件) return None print(f✅ 成功读取Excel文件: {file_path}) print(f 共 {len(df)} 行数据{len(df.columns)} 列) print(f 列名: {list(df.columns)}) return df except Exception as e: print(f❌ 读取Excel失败: {e}) return None def preview_data(self, df, rows5): 预览数据前几行 :param df: DataFrame对象 :param rows: 预览行数 if df is None or df.empty: print( 数据为空) return print(f\n 数据预览 (前{rows}行):) print( * 80) print(df.head(rows).to_string(indexFalse)) print( * 80) # 显示数据类型 print(\n 数据类型:) for col in df.columns: print(f {col}: {df[col].dtype}) def auto_create_table(self, df, table_name, drop_if_existsFalse): 根据DataFrame自动创建MySQL表 :param df: DataFrame对象 :param table_name: 表名 :param drop_if_exists: 如果表存在是否删除重建 if not self.connect(): return False try: # 映射pandas数据类型到MySQL类型 type_mapping { object: VARCHAR(255), int64: INT, float64: DECIMAL(15,2), bool: TINYINT(1), datetime64[ns]: DATETIME, timedelta[ns]: TIME } # 构建CREATE TABLE语句 columns_def [] for col, dtype in df.dtypes.items(): mysql_type type_mapping.get(str(dtype), VARCHAR(255)) # 清理列名中的特殊字符 clean_col col.replace( , _).replace(-, _).replace(., _) columns_def.append(f{clean_col} {mysql_type}) create_sql fCREATE TABLE IF NOT EXISTS {table_name} (\n create_sql id INT AUTO_INCREMENT PRIMARY KEY,\n create_sql ,\n .join(columns_def) \n create_sql ) ENGINEInnoDB DEFAULT CHARSETutf8mb4; # 如果表存在且需要删除 if drop_if_exists: self.cursor.execute(fDROP TABLE IF EXISTS {table_name}) print(f️ 已删除旧表: {table_name}) self.cursor.execute(create_sql) self.connection.commit() print(f✅ 表 {table_name} 创建成功) return True except Exception as e: print(f❌ 创建表失败: {e}) self.connection.rollback() return False finally: self.close() def import_data(self, df, table_name, if_existsappend, chunksize1000): 导入数据到MySQL :param df: DataFrame对象 :param table_name: 目标表名 :param if_exists: 表存在时的处理方式 (fail, replace, append) :param chunksize: 批量插入的每批大小 if df is None or df.empty: print( 数据为空无法导入) return False try: # 清理列名 df_clean df.copy() df_clean.columns [col.replace( , _).replace(-, _).replace(., _) for col in df_clean.columns] # 处理缺失值 df_clean df_clean.fillna() # 分批导入大数据量 total_rows len(df_clean) imported_rows 0 print(f\n⏳ 开始导入数据到表 {table_name}...) start_time datetime.now() for i in range(0, total_rows, chunksize): chunk df_clean.iloc[i:ichunksize] chunk.to_sql( nametable_name, conself.engine, if_existsif_exists if i 0 else append, indexFalse, methodmulti ) imported_rows len(chunk) progress (imported_rows / total_rows) * 100 print(f 进度: {progress:.1f}% ({imported_rows}/{total_rows})) end_time datetime.now() duration (end_time - start_time).total_seconds() print(f\n✅ 导入完成!) print(f 共导入 {imported_rows} 条数据) print(f⏱️ 耗时: {duration:.2f} 秒) print(f⚡ 速度: {imported_rows/duration:.0f} 条/秒) return True except Exception as e: print(f❌ 导入失败: {e}) return False def validate_and_clean_data(self, df): 验证和清洗数据 :param df: DataFrame对象 :return: 清洗后的DataFrame print(\n 数据验证与清洗...) issues [] # 检查空值 null_counts df.isnull().sum() if null_counts.any(): print(f⚠️ 发现空值:) for col, count in null_counts[null_counts 0].items(): print(f - {col}: {count} 个空值) # 检查重复行 duplicates df.duplicated().sum() if duplicates 0: print(f⚠️ 发现 {duplicates} 行重复数据) issues.append(重复数据) # 检查特殊字符 for col in df.select_dtypes(include[object]).columns: special_chars df[col].str.contains(r[\\\], naFalse).sum() if special_chars 0: print(f⚠️ {col} 列包含 {special_chars} 个特殊字符) # 填充或处理空值 df_clean df.fillna({ col: if df[col].dtype object else 0 for col in df.columns }) if not issues: print(✅ 数据验证通过) return df_clean def interactive_mode(): 交互式模式 print( * 50) print( Excel数据一键导入MySQL工具) print( * 50) # 获取数据库配置 print(\n 数据库配置:) host input(数据库地址 (默认 localhost): ).strip() or localhost user input(数据库用户名 (默认 root): ).strip() or root password input(数据库密码: ).strip() database input(数据库名: ).strip() # 创建导入器 importer ExcelToMySQL(hosthost, useruser, passwordpassword, databasedatabase) # 获取Excel文件路径 while True: file_path input(\n Excel文件路径: ).strip() if os.path.exists(file_path): break print(❌ 文件不存在请重新输入) # 读取Excel df importer.read_excel(file_path) if df is None: return # 预览数据 importer.preview_data(df) # 数据清洗 clean input(\n是否进行数据清洗? (y/n, 默认 y): ).strip().lower() if clean ! n: df importer.validate_and_clean_data(df) # 设置表名 table_name input(目标表名 (默认使用文件名): ).strip() if not table_name: table_name os.path.splitext(os.path.basename(file_path))[0] # 清理表名 table_name .join(c if c.isalnum() or c _ else _ for c in table_name) # 选择导入模式 print(\n 导入模式:) print(1. 追加到现有表 (append)) print(2. 替换现有表 (replace)) print(3. 如果表不存在则创建新表 (auto)) mode_map {1: append, 2: replace, 3: auto} mode_choice input(请选择 (1-3, 默认 1): ).strip() or 1 if_exists mode_map.get(mode_choice, append) # 自动建表如果需要 if if_exists auto or if_exists replace: drop_first if_exists replace importer.auto_create_table(df, table_name, drop_if_existsdrop_first) if_exists append # 确认导入 print(f\n 准备导入 {len(df)} 条数据到表 {table_name}) confirm input(确认导入? (y/n): ).strip().lower() if confirm y: importer.import_data(df, table_name, if_existsif_exists) else: print(已取消导入) def quick_import(excel_path, db_config, table_nameNone, sheet_name0): 快速导入函数 :param excel_path: Excel文件路径 :param db_config: 数据库配置字典 :param table_name: 表名默认使用文件名 :param sheet_name: 工作表名称或索引 importer ExcelToMySQL(**db_config) # 读取Excel df importer.read_excel(excel_path, sheet_name) if df is None: return False # 自动设置表名 if not table_name: table_name os.path.splitext(os.path.basename(excel_path))[0] # 自动建表并导入 importer.auto_create_table(df, table_name) result importer.import_data(df, table_name) return result # 示例用法 if __name__ __main__: # 方法1: 交互式模式推荐新手使用 interactive_mode() # 方法2: 快速导入适合脚本调用 db_config { host: localhost, user: root, password: your_password, database: test_db } quick_import(学生成绩.xlsx, db_config) 使用说明1️⃣ 安装依赖pip install pandas pymysql sqlalchemy openpyxl xlrd2️⃣ 准备Excel文件Excel文件格式示例学生成绩.xlsx姓名语文数学英语张三908592李四8891873️⃣ 运行程序交互式模式推荐python excel_to_mysql.py然后按照提示输入数据库信息和文件路径即可。快速导入模式适合脚本db_config { host: localhost, user: root, password: 123456, database: test_db } quick_import(学生成绩.xlsx, db_config)功能特点✅自动建表根据Excel列名和数据类型自动创建MySQL表✅批量导入大数据量时分批导入避免内存溢出✅数据清洗自动处理空值和特殊字符✅进度显示实时显示导入进度✅支持多种格式.xlsx、.xls、.csv✅性能统计显示导入速度和耗时常见问题解决编码问题如果中文乱码确保Excel保存为UTF-8编码大文件处理超过10万行的数据会自动分批次导入类型转换数字列会被自动识别为INT或DECIMAL类型

相关新闻