Pure jdbc operation oracle database migration to mysql database (test 2000 tables, single table data more than one million, no OOM)

Article directory

Preface

Data migration is always troublesome. If there is a need for oracle to mysql, data migration is a big problem
Before data migration, baidu had some solutions:
1. Use nacavit's data transmission function

It's hard to analyze the problem

  1. Using kettle
    kettle is actually very convenient to use, but the only bad thing is that every table has to do a few scripts. It's better to do one by one
    , but now the system has 2000 tables
  2. github looking for open source projects
    Instead, find an open source project of Alibaba: yugong

Because it's too professional, it also needs time to learn cost. Secondly, the data that needs to be migrated needs to meet some of its own logic

  1. Later, I thought about why I can't write a script for counting by myself? Now the framework is used a lot, and the thinking becomes rigid. It's easy to realize jdbc if you think about it

Thinking before coding

  1. How to migrate data
    Obtain oracle data through query, and then splice insert sql statements to execute in mysql
  2. What mode to use
    Here I think the producer consumer mode is particularly suitable. One thread produces the insert statement from oracle, and one thread executes the insert statement in mysql
  3. Where does the splicing sql statement exist
    Consider the thread problem. Here we decide to use the thread safe queue BlockingQueue

Realization

demo structure

Introducing dependency

 <dependencies>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <!--<version>8.0.15</version>-->
            <version>5.1.38</version>
        </dependency>

        <dependency>
            <groupId>com.oracle</groupId>
            <artifactId>ojdbc6</artifactId>
            <version>1.0</version>
            <scope>system</scope>
            <systemPath>${project.basedir}/src/main/resources/lib/ojdbc6.jar</systemPath>
        </dependency>
    </dependencies>

ps here oracle relies on local dependency, because public maven libraries are oracle jar s that can't be charged at present. Secondly, MySQL jars are better to use 8, or the metadata gets the table field to get the whole database instead of a single database

Implementation class

  • Main
/**
 * @author wh
 * @version 1.0
 * @date 2019-12-13 17:06
 */
public class Main {

    static final String MYSQL_DRIVER = "com.mysql.jdbc.Driver";
    static final String MYSQL_URL = "jdbc:mysql://192.168.1.101:3306/test?useUnicode=true&autoReconnect=true&allowMultiQueries=true&useSSL=false&serverTimezone=Asia/Shanghai";
    static final String MYSQL_USERNAME = "root";
    static final String MYSQL_PASSWORD = "123456";

    static final String ORACLE_DRIVER = "oracle.jdbc.driver.OracleDriver";
    static final String ORACLE_URL = "jdbc:oracle:thin:@192.168.1.103:1601:FTSHARE";
    static final String ORACLE_USERNAME = "root";
    static final String ORACLE_PASSWORD = "123456";

    static Connection mysqlCon = null;
    static Connection oracleCon = null;

    static List<Table> tables = new ArrayList<>();

    static BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<>(30);


    public static void main(String[] args) throws Exception {
        try {
            //Initialize connection
            mysqlCon = init(MYSQL_DRIVER, MYSQL_URL, MYSQL_USERNAME, MYSQL_PASSWORD);
            oracleCon = init(ORACLE_DRIVER, ORACLE_URL, ORACLE_USERNAME, ORACLE_PASSWORD);
            DatabaseMetaData dbMetData = mysqlCon.getMetaData();
            //Get metadata rs
            ResultSet rs = Utils.getRs(dbMetData);
            //Get all table names
            tables = Utils.getTables(rs);
            //Gets all columns of all tables and the total number of columns per table
            tables = Utils.setAllColumns(dbMetData, tables);
            //Get oracle to execute sql
            tables = Utils.generateOracleSelectSQL(tables);
            // Consuming thread, inserting mysql
            Consumer consumer = new Consumer(blockingQueue/*, mysqlCon, FLAG*/);
            Thread consumerThread = new Thread(consumer);
            consumerThread.start();
            //Get mysqlSql by querying Oracle SQL and put it into BlockingQueue
            Utils.generateMysqlInsertSQL(tables, oracleCon, blockingQueue);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (mysqlCon != null) {
                try {
                    mysqlCon.close();
                    oracleCon.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
        }

    }

    /**
     *  Initialize connection
     * @param driver
     * @param url
     * @param username
     * @param password
     * @return
     * @throws Exception
     */
    public static Connection init(String driver, String url, String username, String password) throws Exception {
        Class.forName(driver);
        System.out.println("Drive: " + driver + "Get connection successful");
        return DriverManager.getConnection(url, username, password);
    }
}

  • Table.java
/**
 * @author wh
 * @version 1.0
 * @date 2019-12-13 16:13
 */
public class Table {

    /**
     * Table name
     */
    String tablename;
    /**
     * All columns must use the list set, and string is used to directly splice. Later, it is not good to splice '' for keyword conflicts
     */
    List<String> columns;
    /**
     * Total column number
     */
    int columnSum;
    /**
     * Total data in the table
     */
    int sum;
    /**
     * mysql Execute sql statement
     */
    String mysqSb;
    /**
     * oracle Implementation of sql
     */
    String oracleSb;

	//Omit get set method
  • Utils
/**
 * @author wh
 * @version 1.0
 * @date 2019-12-13 11:53
 */
public class Utils {
    /**
     * mysql 50 Data is 1 insert
     */
    static int BATCH = 50;
    /**
     * oracle 100 For one page
     */
    static final int PAGING = 100;
    /**
     *  Get the total number of query results
     * @param rs
     * @return int  Total number of query results
     * @throws Exception
     */
    public static int sumColumn(ResultSet rs) throws Exception {
        rs.last();
        //Columns per table
        int sumColum = rs.getRow();
        rs.beforeFirst();
        return  sumColum;
    }



    /**
     * Get all database table names
     * @param rs
     * @return List<Table>
     * @throws Exception
     */
    public static List<Table> getTables(ResultSet rs) throws Exception{
        /*
        * ResultSet rs = dbMetData.getTables(null,
                Utils.convertDatabaseCharsetType("zou", "mysql"), null,
                new String[] { "TABLE"});
        *
        * */
        List<Table> tables = new ArrayList<>();
        System.out.println("The total number of get tables is:" + Utils.sumColumn(rs));
        while (rs.next()) {
            if (rs.getString(4) != null
                    && (rs.getString(4).equalsIgnoreCase("TABLE"))) {
                Table table = new Table();

                String tableName = rs.getString(3).toLowerCase();            
                table.setTablename(tableName);
                tables.add(table);

            }
        }
        return tables;

    }

    /**
     * Get columns and total number of columns for all tables
     * @param dbMetData
     * @param tables All tables
     * @return
     * @throws Exception
     */
    public static List<Table> setAllColumns(DatabaseMetaData dbMetData, List<Table> tables) throws Exception {
        tables.forEach(table -> {
            try {
                Table table1 = getColumn(table.getTablename(), dbMetData);
                table.setColumns(table1.getColumns());
                table.setColumnSum(table1.getColumnSum());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        );
        return tables;
    }


    //Get columns and total number of columns according to table name
    public static Table getColumn(String tableName, DatabaseMetaData dbMetData) throws Exception{
        Table table = new Table();
        String columnName;
        //column
        List<String> list = new ArrayList<>();
        ResultSet colRet = dbMetData.getColumns(null, "%", tableName,
                "%");
        //Get total number of columns
        int sumColum = Utils.sumColumn(colRet);

        //Get all columns
        while (colRet.next()) {
            columnName = colRet.getString("COLUMN_NAME");
            //Remove mysql primary key. If the total number of primary key columns is - 1
            if (columnName.equals("ID_")){
                sumColum--;
                continue;
            }
            list.add(columnName);
        }
        table.setColumns(list);
        table.setColumnSum(sumColum);
        return  table;
    }

    /**
     *  Get oracle to execute sql
     * @param tables
     * @return
     */
    public static List<Table> generateOracleSelectSQL(List<Table> tables) {
        StringBuffer sb = new StringBuffer("select ");
        for (Table table : tables) {
            for (String column : table.getColumns()) {
                sb.append(column + ",");
            }
            //Remove the last one after splicing sql,
            sb = sb.deleteCharAt(sb.length()-1);
            sb.append(" from " + table.getTablename());
            table.setOracleSb(sb + "");
            //Re new an object cannot use setlength because it points to the same reference
             sb = new StringBuffer("select ");


        }
        return tables;
    }


    /**
     * Get metadata rs
     * @param dbMetData
     * @return
     * @throws Exception
     */
    public static ResultSet getRs(DatabaseMetaData dbMetData) throws Exception {
        //Get metadata rs
        ResultSet rs = dbMetData.getTables(null,
                null, null,
                new String[] { "TABLE"});
        return rs;

    }

    //  Get all mysql execution sql
    public static void generateMysqlInsertSQL(List<Table> tables, Connection oracleCon, BlockingQueue<String> blockingQueue) throws Exception {


        ResultSet resultSet = null;
        PreparedStatement statement = null;
        //Query total data sql
        String sql;
        //Table name
        String tableNmae;
        //Table total data
        int sum;


        for (Table table : tables) {
            String oracleSql = table.getOracleSb();
            //Get table name
            tableNmae = table.getTablename();
            try {
                /**
                 * todo  out of memory
                 * Big target, full of old generation (old age 2.5G)
                 * 1 paging
                 * 2 Large memory
                  */
                System.out.println("surface: " + tableNmae  + "  Start loading data..........");
                //Total query table data
                sum =  Utils.getsum(oracleCon, tableNmae);
                //Insert the total table amount back into the table object
                table.setSum(sum);
                //Page big data table based on 10000W
                if (sum > PAGING) {
                    Utils.putblocking(sum, oracleCon, table, blockingQueue);
                    return;
                }

                resultSet = statement.executeQuery(oracleSql + "");

                //If there is data in the table, the splicing sql is stored in BlockingQueue

                if (table.getSum() > 0) {
                  generateMysqlInsertSQLByTable(resultSet, table, blockingQueue);
                }


                // Release resultSet
                if(resultSet != null) {
                    resultSet.close();
                }
                if(statement != null) {
                    statement.close();
                }

            } catch (Exception e) {
                e.printStackTrace();
                System.out.println("surface: " + table.getTablename() + "Non-existent");
            }


        }
        System.out.println("All tables are exported!!");
    }

    //Splicing single mysql SQL through rs
    public static void generateMysqlInsertSQLByTable(ResultSet rs, Table table, BlockingQueue<String> queue) throws Exception {


        //Get mysql splicing column
        String mysqlColumn = Utils.getMysqlColum(table);

        /**
         * todo  This place generates sql statements one by one
         * 100 A sql
         */

        // prefix
        StringBuffer mysqlSql = new StringBuffer("insert into " + table.getTablename() + "( " + mysqlColumn + " )" + "values(");
        int count = 0;
        while (rs.next()) {
            count++;
            // a,b,c,d
            String singleValue = geneateOneSQL(rs, table);

            // insert values( +  a,b,c,d
            mysqlSql.append(singleValue);


            if (count % BATCH == 0) {
                // insert values( +  a,b,c,d  + )
                mysqlSql.append(")");
                // Put in queue
                queue.put(mysqlSql.toString());
                mysqlSql = new StringBuffer("insert into " + table.getTablename() + "( " + mysqlColumn + " )" + "values(");


            } else {
                // insert values( +  a,b,c,d  + ),(
                mysqlSql.append("),(");
            }

        }

        if (count % BATCH != 0) {
            mysqlSql = mysqlSql.deleteCharAt(mysqlSql.length() - 1);
            mysqlSql = mysqlSql.deleteCharAt(mysqlSql.length() - 1);

            // Put in queue
            queue.put(mysqlSql.toString());
        }


    }
    public static String geneateOneSQL(ResultSet rs, Table table) throws  Exception {

        StringBuffer singleSQL = new StringBuffer("");

        //  Returns values' a ',' B ',' C ',' d 'of a piece of data
        for (int i =1; i <= table.getColumnSum(); i++) {

            String value = rs.getString(i);

            if (value == null) {
                //null do not splice ''
                singleSQL.append(value + ",");
            } else {
                if (value.contains("'")) {
                    value = value.replace("'","''");
                }
                singleSQL.append("'" + value + "'" + ",");
            }
        }

        /**
         * Remove trailing comma
         */
        singleSQL = singleSQL.deleteCharAt(singleSQL.length() - 1);

        return  singleSQL.toString();

    }

    /**
     * Get mysql splicing sql column
     * @param table
     * @return
     */
    public static String getMysqlColum(Table table) {
        StringBuffer sb = new StringBuffer();
        for (String colum : table.getColumns()) {
            sb.append("`" + colum + "`,");
        }
        //Remove the last,
        sb = sb.deleteCharAt(sb.length() - 1);
        return sb + "";
    }


    /**
     * Total data of query table
     * @param connection
     * @param tableNmae
     * @return
     * @throws Exception
     */
    public static int getsum(Connection connection, String tableNmae) throws Exception{
        int sum = 0;

        String sql = "select count(1) from " + tableNmae;
        PreparedStatement ps = connection.prepareStatement(sql);
        ResultSet rs = ps.executeQuery();
        while (rs.next()) {
            sum = rs.getInt(1);

        }
        System.out.println("The total amount of table data is: " + sum);
        ps.close();
        return sum;

    }


    /*
    * oracle paging
    *
    * Get paging SQL
    * */

    public static String getResultSet(int index, int lastPage, Table table) {
        StringBuffer sb = new StringBuffer("select ");
            for (String column : table.getColumns()) {
                sb.append(column + ",");
            }
            //Remove the last one after splicing sql,
            sb = sb.deleteCharAt(sb.length()-1);
        sb.append(" FROM (SELECT ROWNUM AS rowno, t.* FROM " + table.getTablename() + " t "
        + "WHERE  ROWNUM <=" + lastPage + ") table_alias " +
                " WHERE table_alias.rowno >=" + index);

        return  sb.toString();
    }


    /*
    * Do paging loop insertion when data volume exceeds 1W
    * */

    public static void putblocking(int sum, Connection con, Table table, BlockingQueue queue) throws Exception{
        String sql = "";
        //Initial page number
        int index = 1;
        //last page
        int lastPage = PAGING;
        while (sum > 0){
            sql = getResultSet(index, lastPage, table);
            PreparedStatement ps = con.prepareStatement(sql);
            ResultSet rs = ps.executeQuery();
            //Execute sql and put it in the queue
            generateMysqlInsertSQLByTable(rs, table, queue);
            rs.close();
            ps.close();

            sum = sum -PAGING;
            index = lastPage + 1;
            if (sum > PAGING) {
                lastPage = lastPage + PAGING;
            } else {
                lastPage = lastPage + sum;
            }

        }

    }
}

  • Consumer
/**
 * @author wh
 * @version 1.0
 * @date 2019-12-16 10:08
 */

public class Consumer implements Runnable {
    private BlockingQueue<String> blockingQueue;
    private Connection mysqlCon;

    public Consumer(BlockingQueue<String> blockingQueue  /*, Connection mysqlCon, boolean FLAG*/) {

        try {

            this.mysqlCon = Main.init(Main.MYSQL_DRIVER, Main.MYSQL_URL, Main.MYSQL_USERNAME, Main.MYSQL_PASSWORD);


            new Thread() {
                @Override
                public void run() {
                    while (true) {
                        try {
                            PreparedStatement preparedStatement = mysqlCon.prepareStatement("select 1");
                            ResultSet rs = preparedStatement.executeQuery();
                            System.out.println("Execute connection thread once" + rs.next());
                            Thread.sleep(50 * 1000);

                            if (preparedStatement != null) {
                                preparedStatement.close();
                            }
                            if (rs != null) {
                                rs.close();
                            }

                        } catch (SQLException e) {
                            e.printStackTrace();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }.start();
        } catch (Exception e) {
            e.printStackTrace();
        }
        this.blockingQueue = blockingQueue;
    }

    @Override
    public void run() {
        System.out.println("Consumer launch");
        while (true) {
            String mysqlSql = null;
                try {
                    mysqlSql = blockingQueue.take();

                    PreparedStatement preparedStatement = mysqlCon.prepareStatement(mysqlSql);
                    preparedStatement.executeUpdate();
                    preparedStatement.close();
                } catch (Exception e) {
                    e.printStackTrace();
                    System.out.println("Consumer exception" );

                }

        }

    }
}

optimization

  • Optional Optimization:
  1. Use thread pool instead of Thread.start
  2. Put connection information in the configuration file
  3. Run to analyze GC log. If full gc appears, you need to configure reasonable JVM parameters
  4. Increase threads

Source code

github

197 original articles published, 171 praised, 260000 visitors+
Private letter follow

Tags: SQL MySQL Oracle JDBC

Posted on Wed, 15 Jan 2020 04:14:48 -0800 by rudibr