Database Migration

Although it is highly recommended to use PostgreSQL as the database backend for OpenOlat instead of MySQL, there may be commercial reasons to do so. For example if you have a webhosting package which also includes some amount of free MySQL databases and you do not want to waste ressources on your virtual server by deploying a PostgreSQL server on it.

If you already put a huge amount of effort in the content of the database, you really want to have the opportunity to migrate this valuable content. However, finding a suitable tool for this job is not straight forward. Often the MySQL Workbench together with ODBC database connections is recommended. It quickly became clear that this was not a suitable approach.

In the end, I modified a Python script that uses SQLAlchemy to do the job:

#!/usr/bin/env python3

import getopt
import sys
from sqlalchemy import create_engine, MetaData, Table
from sqlalchemy.dialects.mysql.base import TINYINT
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.exc import ProgrammingError
from sqlalchemy import inspect

import time 
import progressbar 

def get_table_list_from_db(metadata):
    """
    return a list of table names from the current
    databases public schema
    """
    sql="select table_name from information_schema.tables where table_schema='public'"
    return [name for (name, ) in metadata.execute(sql)]

def make_session(connection_string):
    engine = create_engine(connection_string, echo=False) #, convert_unicode=True)
    Session = sessionmaker(bind=engine,autoflush=True) # False
    return Session(), engine

def pull_data(from_db, to_db):
    print(locals())
    source, sengine = make_session(from_db)
    smeta = MetaData(bind=sengine)
    destination, dengine = make_session(to_db)
    dmeta = MetaData(bind=dengine)
    con = dengine.connect()
    trans = con.begin()
    con.execute('SET SESSION foreign_key_checks=OFF;') 

    inspector = inspect(sengine)
    schemas = inspector.get_schema_names()

    dest_tables=inspector.get_table_names(schema='public')
    dest_tables.remove("hibernate_unique_key")    # Mapping Tabelle
    dest_tables.remove("o_mail_to_recipient")     # Mapping Tabelle
    dest_tables.remove("o_projectbroker_project") # -> Spalte fehlt!
    dest_tables.remove("o_stat_lastupdated")	  # ->o_stat_lastupdated could not assemble any primary key columns for mapped table 
    dest_tables.remove("o_stat_studybranch3")	  # 1064, "You have an error in your SQL syntax; check the manual that corresponds 
    dest_tables.remove("o_stat_studylevel")	  # 1064, "You have an error in your SQL syntax; check the manual that corresponds
    widgets = [' [', 
         progressbar.Timer(format= 'elapsed time: %(elapsed)s'), 
         '] ', progressbar.Bar('*'),' (', 
               progressbar.ETA(), ') ',   ] 
    i=1
    print('Truncating existing tables')
    dinspector = inspect(dengine) 
    bar = progressbar.ProgressBar(max_value=len(dest_tables), widgets=widgets).start() 
    for table_name in dest_tables:
        bar.update(i)
        i=i+1
        answer=dinspector.has_table(table_name)
        if answer:
           #print(table_name+f"={answer}")
           dengine.execute('SET SESSION foreign_key_checks=OFF;') # MySQL   SET SESSION FOREIGN_KEY_CHECKS = 0;')
           dengine.execute("TRUNCATE TABLE %(table_name)s;" % {'table_name': table_name})
 i=1
    print("\nMigrating PostgreSQL->MySQL",end='')
    bar = progressbar.ProgressBar(max_value=len(dest_tables), widgets=widgets).start() 
    for table_name in dest_tables:
        bar.update(i)
        i=i+1							# print('Processing', table_name) / print('Pulling schema from source server')
        table = Table(table_name, smeta, autoload=True)		# print('Creating table on destination server')
        print(table_name)
        destination.execute('SET SESSION foreign_key_checks=OFF;')
        try:
          table.metadata.create_all(dengine)
          NewRecord = quick_mapper(table)
          columns = list(table.columns.keys())
          dest_table = Table(table_name, dmeta, autoload=True)
          dest_columns = list(dest_table.columns.keys())		# print('Transferring records')
          if source.query(table).count() > 0:
            for record in source.query(table).all():
                data = {}
                for column in dest_columns:
                    value = getattr(record, column)
                    if isinstance(table.columns[column].type, TINYINT):
                       value = bool(value)
                    data[str(column)] = value
                destination.merge(NewRecord(**data))
        finally:
          print()
    destination.commit() #    print('Committing changes')

    # To date (mysql 8.0.18) there is NO suitable function inside mysql to re-create indexes.
    print("\nOptimizing tables",end='')
    bar = progressbar.ProgressBar(max_value=len(dest_tables), widgets=widgets).start() 
    i = 1
    for table_name in dest_tables:
        bar.update(i)
        table = Table(table_name, dmeta, autoload=True)
        columns = list(table.primary_key.columns.keys())
        dengine.execute("OPTIMIZE TABLE %(table_name)s;" % {'table_name': table_name}) 
    print("\nRecreating hibernate_unique_key")
    dengine.execute("DROP TABLE hibernate_unique_key;")
    dengine.execute("CREATE TABLE hibernate_unique_key ( next_hi INT(11) NOT NULL AUTO_INCREMENT, PRIMARY KEY (next_hi)) ENGINE=InnoDB DEFAULT CHARSET=utf8;")
    dengine.execute("INSERT INTO hibernate_unique_key (next_hi) values (0);")
    print("\nTruncate logging table")
    dengine.execute("DELETE FROM o_loggingtable;")

def print_usage():
    print("""
Usage: %s -f source_server -t destination_server
    -f, -t = driver://user[:password]@host[:port]/database

Example: %s -f oracle://someuser:PaSsWd@db1/TSH1 \\
    -t mysql://root@db2:3307/reporting
    """ % (sys.argv[0], sys.argv[0]))

def quick_mapper(table):
    Base = declarative_base()
    class GenericMapper(Base):
        __table__ = table
    return GenericMapper

if __name__ == '__main__':
    optlist, tables = getopt.getopt(sys.argv[1:], 'f:t:')

    options = dict(optlist)
    if '-f' not in options or '-t' not in options:
        print_usage()
        raise SystemExit(1)

    pull_data(
        options['-f'],
        options['-t'],
    )