Skip to content

✨ add table creation order computation with FK constraints #98

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 125 additions & 0 deletions src/mysql_to_sqlite3/mysql_utils.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
"""Miscellaneous MySQL utilities."""

import typing as t
from collections import defaultdict, deque

from mysql.connector import CharacterSet
from mysql.connector.abstracts import MySQLConnectionAbstract, MySQLCursorAbstract
from mysql.connector.charsets import MYSQL_CHARACTER_SETS


Expand Down Expand Up @@ -39,3 +41,126 @@
yield CharSet(index, charset, info[1])
except KeyError:
continue


def fetch_schema_metadata(cursor: MySQLCursorAbstract) -> t.Tuple[t.Set[str], t.List[t.Tuple[str, str]]]:
"""Fetch schema metadata from the database.

Returns:
tables: all base tables in `schema`
edges: list of (child, parent) pairs for every FK
"""
# 1. all ordinary tables
cursor.execute(
"""
SELECT TABLE_NAME
FROM information_schema.TABLES
WHERE TABLE_SCHEMA = SCHEMA()
AND TABLE_TYPE = 'BASE TABLE';
"""
)
# Use a more explicit approach to handle the row data
tables: t.Set[str] = set()
for row in cursor.fetchall():
# Extract table name from row
table_name: str
try:
# Try to get the first element
first_element = row[0] if isinstance(row, (list, tuple)) else row
table_name = str(first_element) if first_element is not None else ""
except (IndexError, TypeError):

Check warning on line 71 in src/mysql_to_sqlite3/mysql_utils.py

View check run for this annotation

Codecov / codecov/patch

src/mysql_to_sqlite3/mysql_utils.py#L71

Added line #L71 was not covered by tests
# If that fails, try other approaches
if hasattr(row, "TABLE_NAME"):
table_name = str(row.TABLE_NAME) if row.TABLE_NAME is not None else ""

Check warning on line 74 in src/mysql_to_sqlite3/mysql_utils.py

View check run for this annotation

Codecov / codecov/patch

src/mysql_to_sqlite3/mysql_utils.py#L73-L74

Added lines #L73 - L74 were not covered by tests
else:
table_name = str(row) if row is not None else ""

Check warning on line 76 in src/mysql_to_sqlite3/mysql_utils.py

View check run for this annotation

Codecov / codecov/patch

src/mysql_to_sqlite3/mysql_utils.py#L76

Added line #L76 was not covered by tests
tables.add(table_name)

# 2. FK edges (child -> parent)
cursor.execute(
"""
SELECT TABLE_NAME AS child, REFERENCED_TABLE_NAME AS parent
FROM information_schema.KEY_COLUMN_USAGE
WHERE TABLE_SCHEMA = SCHEMA()
AND REFERENCED_TABLE_NAME IS NOT NULL;
"""
)
# Use a more explicit approach to handle the row data
edges: t.List[t.Tuple[str, str]] = []
for row in cursor.fetchall():
# Extract child and parent from row
child: str
parent: str
try:
# Try to get the elements as sequence
if isinstance(row, (list, tuple)) and len(row) >= 2:
child = str(row[0]) if row[0] is not None else ""
parent = str(row[1]) if row[1] is not None else ""
# Try to access as dictionary or object
elif hasattr(row, "child") and hasattr(row, "parent"):
child = str(row.child) if row.child is not None else ""
parent = str(row.parent) if row.parent is not None else ""
# Try to access as dictionary with string keys
elif isinstance(row, dict) and "child" in row and "parent" in row:
child = str(row["child"]) if row["child"] is not None else ""
parent = str(row["parent"]) if row["parent"] is not None else ""
else:
# Skip if we can't extract the data
continue
except (IndexError, TypeError, KeyError):

Check warning on line 110 in src/mysql_to_sqlite3/mysql_utils.py

View check run for this annotation

Codecov / codecov/patch

src/mysql_to_sqlite3/mysql_utils.py#L110

Added line #L110 was not covered by tests
# Skip if any error occurs
continue

Check warning on line 112 in src/mysql_to_sqlite3/mysql_utils.py

View check run for this annotation

Codecov / codecov/patch

src/mysql_to_sqlite3/mysql_utils.py#L112

Added line #L112 was not covered by tests

edges.append((child, parent))

return tables, edges


def topo_sort_tables(
tables: t.Set[str], edges: t.List[t.Tuple[str, str]]
) -> t.Tuple[t.List[str], t.List[t.Tuple[str, str]]]:
"""Perform a topological sort on tables based on foreign key dependencies.

Returns:
ordered: tables in FK-safe creation order
cyclic_edges: any edges that keep the graph cyclic (empty if a pure DAG)
"""
# dependency graph: child → {parents}
deps: t.Dict[str, t.Set[str]] = {tbl: set() for tbl in tables}
# reverse edges: parent → {children}
rev: t.Dict[str, t.Set[str]] = defaultdict(set)

for child, parent in edges:
deps[child].add(parent)
rev[parent].add(child)

queue: deque[str] = deque(tbl for tbl, parents in deps.items() if not parents)
ordered: t.List[str] = []

while queue:
table = queue.popleft()
ordered.append(table)
# "remove" table from graph
for child in rev[table]:
deps[child].discard(table)
if not deps[child]:
queue.append(child)

# any table still having parents is in a cycle
cyclic_edges: t.List[t.Tuple[str, str]] = [
(child, parent) for child, parents in deps.items() if parents for parent in parents
]
return ordered, cyclic_edges


def compute_creation_order(mysql_conn: MySQLConnectionAbstract) -> t.Tuple[t.List[str], t.List[t.Tuple[str, str]]]:
"""Compute the table creation order respecting foreign key constraints.

Returns:
A tuple (ordered_tables, cyclic_edges) where cyclic_edges is empty when the schema is acyclic.
"""
with mysql_conn.cursor() as cur:
tables: t.Set[str]
edges: t.List[t.Tuple[str, str]]
tables, edges = fetch_schema_metadata(cur)
return topo_sort_tables(tables, edges)
47 changes: 42 additions & 5 deletions src/mysql_to_sqlite3/transporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from mysql.connector.types import RowItemType
from tqdm import tqdm, trange

from mysql_to_sqlite3.mysql_utils import CHARSET_INTRODUCERS
from mysql_to_sqlite3.mysql_utils import CHARSET_INTRODUCERS, compute_creation_order
from mysql_to_sqlite3.sqlite_utils import (
CollatingSequences,
Integer_Types,
Expand Down Expand Up @@ -678,14 +678,45 @@
)
tables = (row[0].decode() for row in self._mysql_cur.fetchall()) # type: ignore[union-attr]

# Convert tables iterable to a list for reuse
table_list: t.List[str] = []
for table_name in tables:
if isinstance(table_name, bytes):
table_name = table_name.decode()

Check warning on line 685 in src/mysql_to_sqlite3/transporter.py

View check run for this annotation

Codecov / codecov/patch

src/mysql_to_sqlite3/transporter.py#L685

Added line #L685 was not covered by tests
# Ensure table_name is a string
table_str = str(table_name) if table_name is not None else ""
table_list.append(table_str)

# Try to compute the table creation order to respect foreign key constraints
try:
if hasattr(self, "_mysql"):
# Compute the table creation order to respect foreign key constraints
ordered_tables: t.List[str]
cyclic_edges: t.List[t.Tuple[str, str]]
ordered_tables, cyclic_edges = compute_creation_order(self._mysql)

# Filter ordered_tables to only include tables we want to transfer
ordered_tables = [table for table in ordered_tables if table in table_list]

# Log information about cyclic dependencies
if cyclic_edges:
self._logger.warning(
"Circular foreign key dependencies detected: %s",
", ".join(f"{child} -> {parent}" for child, parent in cyclic_edges),
)
else:
# If _mysql attribute is not available (e.g., in tests), use the original table list
ordered_tables = table_list
except Exception as e: # pylint: disable=W0718
# If anything goes wrong, fall back to the original table list
self._logger.warning("Failed to compute table creation order: %s", str(e))
ordered_tables = table_list

try:
# turn off foreign key checking in SQLite while transferring data
self._sqlite_cur.execute("PRAGMA foreign_keys=OFF")

for table_name in tables:
if isinstance(table_name, bytes):
table_name = table_name.decode()

for table_name in ordered_tables:
self._logger.info(
"%s%sTransferring table %s",
"[WITHOUT DATA] " if self._without_data else "",
Expand Down Expand Up @@ -749,6 +780,12 @@
# re-enable foreign key checking once done transferring
self._sqlite_cur.execute("PRAGMA foreign_keys=ON")

# Check for any foreign key constraint violations
self._sqlite_cur.execute("PRAGMA foreign_key_check")
fk_violations: t.List[sqlite3.Row] = self._sqlite_cur.fetchall()
if fk_violations:
self._logger.warning("Foreign key constraint violations detected: %s", fk_violations)

if self._vacuum:
self._logger.info("Vacuuming created SQLite database file.\nThis might take a while.")
self._sqlite_cur.execute("VACUUM")
Expand Down
Loading