记一次中大规模数据库迁移过程,从MySql到PostgreSQL.

news/2024/7/9 21:39:27 标签: 数据库, mysql, postgresql, java, springboot

从MySql到PostgreSQL迁移的决策过程就不说了。我也是第一次用PostgreSQL,也没法说好不好。决策已经定了,下面介绍一下执行过程。

一、数据基本情况

服务器:4核CPU,8G内存,1T硬盘,8Mbit网速。

数据库:MySql-5.5-community,数据量492GB,包含索引、日志。

由于服务器硬盘容量已不足300GB,没有办法在服务器上同时运行MySql和PostgreSQL完成迁移,所以只在本地运行PostgreSQL,并将数据先迁移到本地。

二、采用通用代码迁移。

因为熟悉,决定采用Java迁移。(为了减少工作量,选择站在巨人的肩膀上。)搜索到了这么一篇文章:自己动手写一个Mysql到PostgreSQL数据库迁移工具,看起来不错,拷贝到本地,稍做适配、改进,对主键为整形的数据表,采用增量方式进行迁移,代码如下:

java">package springDemo;


import java.sql.SQLException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import javax.sql.DataSource;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.util.Assert;

import com.zaxxer.hikari.HikariDataSource;


public class DataTableMigration{

	private static Logger LOG = LoggerFactory.getLogger(DataTableMigration.class);
	
    private final JdbcTemplate targetJdbc;
    private final JdbcTemplate sourceJdbc;
    private final String tableName;
    private final String primaryKey;
    private final String[] columnNamesInSourceDB;
    private final String[] columnNamesInTargetDB;

    private final Map<String, String> columnMappings;

    public DataTableMigration(DataSource sourceDataSource, String tableName, DataSource targetDataSource) throws SQLException {
    	
    	this.tableName = tableName.toLowerCase();
        
        if (sourceDataSource instanceof HikariDataSource) {
            HikariDataSource hikariDataSource = (HikariDataSource) sourceDataSource;
            hikariDataSource.setMaxLifetime(86400000); // 设置为24小时
            hikariDataSource.setConnectionTimeout(600000);
            hikariDataSource.setReadOnly(true);
        }
        if (targetDataSource instanceof HikariDataSource) {
            HikariDataSource hikariDataSource = (HikariDataSource) targetDataSource;
            hikariDataSource.setMaxLifetime(86400000); // 设置为24小时
            hikariDataSource.setConnectionTimeout(600000);
        }
        
        this.sourceJdbc = new JdbcTemplate(sourceDataSource);
        this.targetJdbc = new JdbcTemplate(targetDataSource);
        System.out.println(sourceDataSource);
        System.out.println(targetDataSource);
        
        this.primaryKey = MigrationUtils.getPrimaryKeyByTableName(sourceDataSource.getConnection(), this.tableName);
        this.columnNamesInSourceDB = MigrationUtils.getColumnsByTableName(sourceDataSource.getConnection(), this.tableName);
        Assert.isTrue(this.columnNamesInSourceDB != null && this.columnNamesInSourceDB.length > 0,
                "can't find column infor from source db for the table " + this.tableName);
        
        this.columnNamesInTargetDB = MigrationUtils.getColumnsByTableName(targetDataSource.getConnection(), this.tableName);
        Assert.isTrue(this.columnNamesInTargetDB != null && this.columnNamesInTargetDB.length > 0,
                "can't find column infor from target db for the table " + this.tableName);
        
        this.columnMappings = new HashMap<>();
    }

    protected JdbcTemplate getSourceJdbc() {
      return this.sourceJdbc;
    }

    protected JdbcTemplate getTargetJdbc() {
        return this.targetJdbc;
      }


    protected List<Map<String, Object>> queryForList(String querySql, long offset, long stepLength) {
        return getSourceJdbc().queryForList(querySql, offset, stepLength);
    }

    private Object[] rowToParam(Map<String, Object> row) {
        return Arrays.stream(columnNamesInTargetDB)
                .map(colInSource -> columnMappings.getOrDefault(colInSource, colInSource))
                .map(row::get)
                .toArray();
    }

    protected String getInsertSQL() {
        return String.format("insert into %s (%s) values(%s) ",
                this.tableName,
                String.join(",", columnNamesInTargetDB),
                IntStream.range(0, columnNamesInTargetDB.length)
                        .mapToObj(n -> "?")
                        .collect(Collectors.joining(",")));
    }
    
    protected String getInsertSQLOnCconflict() {
        return String.format("insert into %s (%s) values(%s) ON CONFLICT (%s) DO NOTHING",
                this.tableName,
                String.join(",", columnNamesInTargetDB),
                IntStream.range(0, columnNamesInTargetDB.length).mapToObj(n -> "?").collect(Collectors.joining(",")),
                this.primaryKey);
    }

    protected int getStepLength() {
        return 1000000;
    }

	protected long getSourceMaxIndex() {
		long count = getSourceJdbc().queryForObject("select max(" + primaryKey + ") from " + tableName, Long.class);
		return count;
	}
	protected long getTargetMaxIndex() {
		long count = getTargetJdbc().queryForObject("select count(1) from " + tableName, Long.class);

		if (count > 0)
			count = getTargetJdbc().queryForObject("select max(" + primaryKey + ") from " + tableName, Long.class);
		else
			count = getSourceJdbc().queryForObject("select min(" + primaryKey + ") from " + tableName, Long.class) - 1;
		return count;
	}
    public void migrateIntegerIndexTable() throws Exception {

        LOG.info("start to migrate data from source db to target db");

        String sql = String.format("select %s from %s where %s > ? order by %s asc limit ?;",
        		String.join(",", columnNamesInSourceDB), this.tableName, this.primaryKey, this.primaryKey);

        long maxRecords = getSourceMaxIndex();
        long stepLength = getStepLength();
		for (long offset = getTargetMaxIndex(); offset < maxRecords; offset = getTargetMaxIndex()) {
			List<Map<String, Object>> rows = queryForList(sql, offset, stepLength);
			LOG.info("get records From source");
	        getTargetJdbc().batchUpdate(getInsertSQL(),
                	rows.stream().map(this::rowToParam).collect(Collectors.toList()));
			LOG.info("moved {} records", offset);
		}
    }
    
    public void migrateIntegerIndexTableJust1Line(long id) throws Exception {

        LOG.info("start to migrate data from source db to target db");

        String sql = String.format("select %s from %s where %s = ? limit ?;",
        		String.join(",", columnNamesInSourceDB), this.tableName, this.primaryKey);

		List<Map<String, Object>> rows = queryForList(sql, id, 1);
		LOG.info("get records From source");
	    getTargetJdbc().batchUpdate(getInsertSQL(),
	        	rows.stream().map(this::rowToParam).collect(Collectors.toList()));
		LOG.info("moved {} record", id);
    }

//	从原库获取总数量。
	protected int getSourceTotalRecords() {
		int count = getSourceJdbc().queryForObject("select count(1) from " + tableName, Integer.class);
		LOG.info("source db has {} records", count);
		return count;
	}
//	从目标库获取已经存储的数量。
	protected int getTargetTotalRecords() {
		int count = getTargetJdbc().queryForObject("select count(1) from " + tableName, Integer.class);
		LOG.info("target db has {} records", count);
		return count;
	}
    public void migrateStringIndexTable() throws SQLException {

        LOG.info("start to migrate data from source db to target db");

        String sql = String.format("select %s from %s order by %s asc limit ?, ?;",
				String.join(",", columnNamesInSourceDB), this.tableName, this.primaryKey);

        int maxRecords = getSourceTotalRecords();
        int stepLength = getStepLength();
		for (int offset = 0; offset < maxRecords; offset = offset + stepLength) {
			List<Map<String, Object>> rows = queryForList(sql, offset, stepLength);
			LOG.info("get records From source, " + rows.size());
	        getTargetJdbc().batchUpdate(getInsertSQLOnCconflict(),
                	rows.stream().map(this::rowToParam).collect(Collectors.toList()));
			LOG.info("moved {} records", offset);
		}    	
    }
    
    public void close() {
        try {
            if (sourceJdbc != null) {
            	sourceJdbc.getDataSource().getConnection().close();
            }
            if (targetJdbc != null) {
            	targetJdbc.getDataSource().getConnection().close();
            }
        } catch (SQLException e) {
            LOG.error("Error closing database connection", e);
        }
    }
    
    public static void main(String[] args) {
    	LOG.atInfo();
    	Config cf = new Config();

    	System.setProperty("spring.jdbc.getParameterType.ignore","true");
    	
        try {
			DataTableMigration dtmStr = new DataTableMigration(cf.sourceDataSource(), "target", cf.targetDataSource());
			dtmStr.migrateStringIndexTable();
			dtmStr.close();

			String[] tableNames = { "dailyexchange", "movingavg", "stats" };
			for (int i = 0; i < tableNames.length; i++) {
				DataTableMigration dtmInt = new DataTableMigration(cf.sourceDataSource(), tableNames[i], cf.targetDataSource());
				dtmInt.migrateIntegerIndexTable();
				dtmInt.close();
			}

//			DataTableMigration dtmInt = new DataTableMigration(cf.sourceDataSource(), "min1", cf.targetDataSource());
//			dtmInt.migrateIntegerIndexTable();
//			dtmInt.close();
            
		} catch (SQLException e) {
			e.printStackTrace();
		} catch (Exception e) {
			e.printStackTrace();
		}
    }
}

开始几个数据表,由于规模较小只有几百万行,几个小时就迁移完成。下面开始迁移最大的数据表min1,有34亿行。这个速度就无法接受了。考虑到每次通讯会耗费时间,所以尽量加大每批次传输量。调整每批次迁移数量到100万行后(最大是1048576),稍微提高了传输速度,达到10分钟每百万行。如下:

HikariDataSource (null)
HikariDataSource (null)
2023-04-12T07:31:49.370+08:00  INFO   --- [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Starting...
2023-04-12T07:31:50.701+08:00  INFO   --- [           main] com.zaxxer.hikari.pool.HikariPool        : HikariPool-1 - Added connection com.mysql.cj.jdbc.ConnectionImpl@3cce5371
2023-04-12T07:31:50.704+08:00  INFO   --- [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Start completed.
2023-04-12T07:31:51.056+08:00  INFO   --- [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-2 - Starting...
2023-04-12T07:31:51.148+08:00  INFO   --- [           main] com.zaxxer.hikari.pool.HikariPool        : HikariPool-2 - Added connection org.postgresql.jdbc.PgConnection@19b93fa8
2023-04-12T07:31:51.148+08:00  INFO   --- [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-2 - Start completed.
2023-04-12T07:31:51.164+08:00  INFO   --- [           main] springDemo.DataTableMigration            : start to migrate data from source db to target db
2023-04-12T07:40:24.912+08:00  WARN   --- [           main] com.zaxxer.hikari.pool.PoolBase          : HikariPool-1 - Failed to validate connection com.mysql.cj.jdbc.ConnectionImpl@3016fd5e (No operations allowed after connection closed.). Possibly consider using a shorter maxLifetime value.
2023-04-12T07:40:29.923+08:00  WARN   --- [           main] com.zaxxer.hikari.pool.PoolBase          : HikariPool-1 - Failed to validate connection com.mysql.cj.jdbc.ConnectionImpl@6c45ee6e (No operations allowed after connection closed.). Possibly consider using a shorter maxLifetime value.
2023-04-12T07:40:34.928+08:00  WARN   --- [           main] com.zaxxer.hikari.pool.PoolBase          : HikariPool-1 - Failed to validate connection com.mysql.cj.jdbc.ConnectionImpl@6b3e12b5 (No operations allowed after connection closed.). Possibly consider using a shorter maxLifetime value.
2023-04-12T07:40:39.933+08:00  WARN   --- [           main] com.zaxxer.hikari.pool.PoolBase          : HikariPool-1 - Failed to validate connection com.mysql.cj.jdbc.ConnectionImpl@5aac4250 (No operations allowed after connection closed.). Possibly consider using a shorter maxLifetime value.
2023-04-12T07:40:44.936+08:00  WARN   --- [           main] com.zaxxer.hikari.pool.PoolBase          : HikariPool-1 - Failed to validate connection com.mysql.cj.jdbc.ConnectionImpl@1338fb5 (No operations allowed after connection closed.). Possibly consider using a shorter maxLifetime value.
2023-04-12T07:40:49.938+08:00  WARN   --- [           main] com.zaxxer.hikari.pool.PoolBase          : HikariPool-1 - Failed to validate connection com.mysql.cj.jdbc.ConnectionImpl@42463763 (No operations allowed after connection closed.). Possibly consider using a shorter maxLifetime value.
2023-04-12T07:40:54.941+08:00  WARN   --- [           main] com.zaxxer.hikari.pool.PoolBase          : HikariPool-1 - Failed to validate connection com.mysql.cj.jdbc.ConnectionImpl@59f63e24 (No operations allowed after connection closed.). Possibly consider using a shorter maxLifetime value.
2023-04-12T07:40:59.947+08:00  WARN   --- [           main] com.zaxxer.hikari.pool.PoolBase          : HikariPool-1 - Failed to validate connection com.mysql.cj.jdbc.ConnectionImpl@7ca33c24 (No operations allowed after connection closed.). Possibly consider using a shorter maxLifetime value.
2023-04-12T07:41:20.733+08:00  INFO   --- [           main] springDemo.DataTableMigration            : get records From source
2023-04-12T07:41:33.743+08:00  INFO   --- [           main] springDemo.DataTableMigration            : moved 2990509187 records

 以这个速度传输完34亿行数据大概需要24天(是的吧(34亿/100万)*10分钟/1440分钟),仍然无法接受。参考相关文章(找不见了),了解到采用通用代码迁移数据,将会有大量时间用于构建List<Map<String, Object>>映射。

三、编写专用代码迁移。

想偷懒采用通用代码,对大表看来不太行。所以没有办法,不得不编写专门的迁移代码,鸣谢:ChatGTP。代码具体如下:

java">package pack;

import java.sql.*;
import java.time.LocalDate;
import java.time.LocalTime;
import java.io.InputStream;
import java.util.Properties;

import org.apache.log4j.Logger;
import org.apache.log4j.PropertyConfigurator;

public class MysqlToPostgres {

	/*
	 * 要通过批量插入的方式将MySQL数据库的数据表迁移到PostgreSQL数据库中,你需要基于JDBC技术开发一个Java程序。
	 * 
	 * 以下是迁移过程中的步骤:
	 * 
	 * 1. 使用JDBC连接MySQL数据库和PostgreSQL数据库。
	 * 
	 * 2. 从MySQL数据库中读取要迁移的数据表。
	 * 
	 * 3. 将MySQL数据表中的数据批量读取出来。
	 * 
	 * 4. 将数据批量插入到PostgreSQL数据库中的相应数据表中。
	 * 
	 */
	
	private static final Logger logger = Logger.getLogger(Min1.class);
	private Connection mysqlConn = null;
	private Connection pgConn = null;

	public static void main(String args[]) {

		System.out.println(System.getProperty("java.class.path"));
		PropertyConfigurator.configure("log4j.properties");
		
		MysqlToPostgres M2P = new MysqlToPostgres();
		M2P.init();
		long flag = M2P.getTargetMaxIndex();
		long end = M2P.getSourceMaxIndex();
		
		logger.info("source line count:" + end);

		for (; flag < end; flag = M2P.getTargetMaxIndex()) {
			logger.info("target line count:" + flag);
			M2P.migrate(flag);
//			break;
		}
		
		M2P.uninit();
	}

	public void init() {
		Properties props = new Properties();
		InputStream input = null;
		
		try {
			String filename = "consts.properties";
			input = MysqlToPostgres.class.getClassLoader().getResourceAsStream(filename);
			if (input == null) {
				System.out.println("Sorry, unable to find " + filename);
				return;
			}

			// load the properties file
			// get the property value and print it out
			props.load(input);
			String sourceIP = props.getProperty("sourceIP");
			String targetIP = props.getProperty("targetIP");
			String username = props.getProperty("DBUserName");
			String password = props.getProperty("DBPassword");
			System.out.println(getMinute() + " " + username);

			// 连接MySQL数据库
			Class.forName("com.mysql.jdbc.Driver");
			mysqlConn = DriverManager.getConnection("jdbc:mysql://" + sourceIP + "/cf_stock?useCompression=true", username, password);

			// 连接PostgreSQL数据库
			Class.forName("org.postgresql.Driver");
			pgConn = DriverManager.getConnection("jdbc:postgresql://" + targetIP + "/cf_stock", username, password);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
	
	protected long getSourceMaxIndex() {
		long count = 0;
		Statement mysqlStmt = null;
		try {
			mysqlStmt = mysqlConn.createStatement();

			// 批量读取MySQL数据表中的数据
			ResultSet mysqlRs = mysqlStmt.executeQuery("select max(recordID) from min1;");
			if (mysqlRs.next()) {
				count = mysqlRs.getLong("max(recordID)");
			}
			mysqlStmt.close();
		} catch (Exception e) {
			e.printStackTrace();
		}
		return count;
	}
	
	protected long getTargetMaxIndex() {
		long count = 0;
		Statement pgStmt = null;

		try {
			pgStmt = pgConn.createStatement();

			// 批量读取MySQL数据表中的数据
			ResultSet pgRs = pgStmt.executeQuery("select max(recordID) from min1;");
			if (pgRs.next()) {
				count = pgRs.getLong(1);
			}
			pgStmt.close();
		} catch (Exception e) {
			e.printStackTrace();
		}
		return count;
	}

	public void migrate(long flag) {
		PreparedStatement pgStmt = null;
		PreparedStatement mysqlStmt = null;

		try {
			String sql = "INSERT INTO min1 "
					+ "(recordID, dayRecordID, targetID, date, minute, "
					+ "open, high, low, close, average, shareVolume, moneyVolume, openInterest) "
					+ "VALUES (?,?,?,?,?, ?,?,?,?, ?,?,?,?) "; 
			pgStmt = pgConn.prepareStatement(sql);
			
			// 批量读取MySQL数据表中的数据
			String mysqlSql = "select * from min1 where recordID > ? order by recordID asc limit 1000000;";
			mysqlStmt = mysqlConn.prepareStatement(mysqlSql);
			mysqlStmt.setLong(1, flag);
			ResultSet mysqlRs = mysqlStmt.executeQuery();
			logger.info(getMinute()+" get records from mysql.");

			int i = 0;
			while (mysqlRs.next()) {
				Min1 m1 = new Min1(mysqlRs);

				// 将数据批量插入到PostgreSQL数据库中
				pgStmt.setLong		(1, m1.recordID);
				pgStmt.setLong		(2, m1.dayRecordID);
				pgStmt.setString	(3, m1.targetID);
				pgStmt.setDate		(4, m1.date);
				pgStmt.setShort		(5, m1.minute);
				
				pgStmt.setFloat	(6, m1.open);
				pgStmt.setFloat	(7, m1.high);
				pgStmt.setFloat	(8, m1.low);
				pgStmt.setFloat	(9, m1.close);
			
				pgStmt.setFloat	(10, m1.average);
				pgStmt.setLong	(11, m1.shareVolume);
				pgStmt.setLong	(12, m1.moneyVolume);
				pgStmt.setLong	(13, m1.openInterest);
								
				pgStmt.addBatch();
				
				i++;
				if (i % 500000 == 0) {
					System.out.println(i);
				}
			}

			// 提交批量插入
			logger.info(getMinute() + " combine all sql into a batch.");
			pgStmt.executeBatch();
			logger.info(getMinute() + " after excute batch.");
			pgStmt.clearBatch();

			mysqlRs.close();
			mysqlStmt.close();
			pgStmt.close();
		
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	public void uninit() {
		try {
			mysqlConn.close();
			pgConn.close();
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
	
	public String getMinute() {
		LocalTime now = LocalTime.now();
		return "" + now.getHour() + ":" + now.getMinute() + ":" + now.getSecond();
	}
}

运行起来效果还可以,大概2分钟迁移100万行,如此算来大概需要5天:

[main] INFO pack.Min1 - source line count:3474392405
[main] INFO pack.Min1 - target line count:2991509187
[main] INFO pack.Min1 - 7:44:14 get records from mysql.
500000
1000000
[main] INFO pack.Min1 - 7:44:15 combine all sql into a batch.
[main] INFO pack.Min1 - 7:44:29 after excute batch.
[main] INFO pack.Min1 - target line count:2992509187
[main] INFO pack.Min1 - 7:45:54 get records from mysql.
500000
1000000
[main] INFO pack.Min1 - 7:45:56 combine all sql into a batch.
[main] INFO pack.Min1 - 7:46:10 after excute batch.
[main] INFO pack.Min1 - target line count:2993509187

 完。


http://www.niftyadmin.cn/n/225517.html

相关文章

【JUC进阶】详解synchronized锁升级

文章目录1. synchronized概述2. synchronized 的实现原理2.1 Java对象组成2.2 Monitor2.3 从字节码角度看synchronized3. 锁升级3.1 偏向锁3.2 轻量级锁1. synchronized概述 synchronized是一个悲观锁&#xff0c;可以实现线程同步&#xff0c;在多线程的环境下&#xff0c;需…

freeswitch带媒体压力测试方案

概述 原本的计划是使用sipp完成带媒体压力测试&#xff0c;但是实际测试过程中发现sipp的媒体处理功能有问题&#xff08;也有可能是我使用的姿势不对&#xff09;。 sipp在带媒体的情况下&#xff08;600路并发开始&#xff09;&#xff0c;出现大量的不响应和响应延迟&…

微信小程序 | 秋招颗粒无收 ?快用ChatGPT做一款模拟面试小程序

Pre&#xff1a;效果预览 ① 选择职位进行面试 ② 根据岗位职责进行回答 一、需求背景 这两年IT互联网行业进入寒冬期&#xff0c;降本增效、互联网毕业、暂停校招岗位的招聘&#xff0c;各类裁员、缩招的情况层出不穷&#xff01;对于这个市场来说&#xff0c;在经历了互联网…

【分享】集简云审批支付助手,实现OA付款单自动到招商银行支付

场景描述 支付管理是企业财务管理中的重要一环&#xff0c;直接涉及企业现金流的管理和资金的运用。 在现代商业环境下&#xff0c;企业支付管理越来越复杂&#xff0c;许多公司都存在支付流程不规范、支付环节复杂的问题&#xff0c;导致企业支付效率低下。一方面&#xff0…

你以为你守规矩就没事了吗?你还是太年轻了,老程序员告诉你实情

大家好! 今天我们不讲这个技术了 我们来讲一些程序员的职场问题 在这里我先给大家提一个问题 不守规矩的程序员 他到底过得舒不舒服 大家可以把心里想的答案 写在这个公屏上面 我这里呢先给大家说一个案例 大家就知道这个答案了 这里有两个同事 一个叫小明一个叫小强 小明呢每天…

Python中if语句在列表中的使用

Python中if语句在列表中的使用 假设我们有一个简单的列表&#xff1a; my_list [1, 2, 3, 4, 5]我们可以使用 if 语句来筛选其中的元素&#xff0c;例如&#xff1a; my_list [1, 2, 3, 4, 5] new_list [] for item in my_list: if item > 2: new_list.append(it…

技术宅小伙:过来人给计算机专业学生的一些建议

给大学计算机专业学生的血泪建议 看完这个视频&#xff0c;真的有可能改变你的大学四年。虽然你现在的大学还一片空白&#xff0c;但请准备一份简历。不管你愿不愿意&#xff0c;当你大四走出校园的那一刻&#xff0c;别人就是通过这样一张纸来评判你。然而现实是很多人大学玩…

从0学习stm32

1.STM32介绍&#xff1a; ST&#xff1a;指的是意法半导体&#xff1b; M&#xff1a;指定微处理器 使用的是ARMCortex-M3 ARM分成三个系列&#xff1a; Cortex-A: 针对多媒体应用(手机) Cortex-R:针对对实时性和性能有一定要求的场景 Cortex-M&#xff1a;针对低功耗高性…