Skip to content

API Reference

Constants

DatabaseConnectionType

Bases: Enum

Enum for database connection types.

Source code in supabase_pydantic/util/constants.py
class DatabaseConnectionType(Enum):
    """Enum for database connection types."""

    LOCAL = 'local'
    DB_URL = 'db_url'

FrameWorkType

Bases: Enum

Enum for framework types.

Source code in supabase_pydantic/util/constants.py
class FrameWorkType(Enum):
    """Enum for framework types."""

    FASTAPI = 'fastapi'

OrmType

Bases: Enum

Enum for file types.

Source code in supabase_pydantic/util/constants.py
class OrmType(Enum):
    """Enum for file types."""

    PYDANTIC = 'pydantic'
    SQLALCHEMY = 'sqlalchemy'

WriterClassType

Bases: Enum

Enum for writer class types.

Source code in supabase_pydantic/util/constants.py
class WriterClassType(Enum):
    """Enum for writer class types."""

    BASE = 'base'  # The main Row model with all fields
    BASE_WITH_PARENT = 'base_with_parent'
    PARENT = 'parent'
    INSERT = 'insert'  # Model for insert operations - auto-generated fields optional
    UPDATE = 'update'  # Model for update operations - all fields optional

WriterConfig dataclass

Source code in supabase_pydantic/util/constants.py
@dataclass
class WriterConfig:
    file_type: OrmType
    framework_type: FrameWorkType
    filename: str
    directory: str
    enabled: bool

    def ext(self) -> str:
        """Get the file extension based on the file name."""
        return self.filename.split('.')[-1]

    def name(self) -> str:
        """Get the file name without the extension."""
        return self.filename.split('.')[0]

    def fpath(self) -> str:
        """Get the full file path."""
        return os.path.join(self.directory, self.filename)

    def to_dict(self) -> dict[str, str]:
        """Convert the WriterConfig object to a dictionary."""
        return {
            'file_type': str(self.file_type),
            'framework_type': str(self.framework_type),
            'filename': self.filename,
            'directory': self.directory,
            'enabled': str(self.enabled),
        }

ext()

Get the file extension based on the file name.

Source code in supabase_pydantic/util/constants.py
def ext(self) -> str:
    """Get the file extension based on the file name."""
    return self.filename.split('.')[-1]

fpath()

Get the full file path.

Source code in supabase_pydantic/util/constants.py
def fpath(self) -> str:
    """Get the full file path."""
    return os.path.join(self.directory, self.filename)

name()

Get the file name without the extension.

Source code in supabase_pydantic/util/constants.py
def name(self) -> str:
    """Get the file name without the extension."""
    return self.filename.split('.')[0]

to_dict()

Convert the WriterConfig object to a dictionary.

Source code in supabase_pydantic/util/constants.py
def to_dict(self) -> dict[str, str]:
    """Convert the WriterConfig object to a dictionary."""
    return {
        'file_type': str(self.file_type),
        'framework_type': str(self.framework_type),
        'filename': self.filename,
        'directory': self.directory,
        'enabled': str(self.enabled),
    }

Dataclasses

ColumnInfo dataclass

Bases: AsDictParent

Column information.

Source code in supabase_pydantic/util/dataclasses.py
@dataclass
class ColumnInfo(AsDictParent):
    """Column information."""

    name: str
    post_gres_datatype: str
    datatype: str
    user_defined_values: list[str] | None = field(default_factory=list)
    unique_partners: list[str] | None = field(default_factory=list)
    alias: str | None = None
    default: str | None = None
    max_length: int | None = None
    is_nullable: bool | None = True
    primary: bool = False
    is_unique: bool = False
    is_foreign_key: bool = False
    constraint_definition: str | None = None
    is_identity: bool = False  # For auto-generated identity columns

    @property
    def has_default(self) -> bool:
        """Check if the column has a default value."""
        return self.default is not None

    @property
    def is_generated(self) -> bool:
        """Check if the column is auto-generated (identity or serial)."""
        return self.is_identity or (self.default is not None and 'nextval' in str(self.default).lower())

    def orm_imports(self, orm_type: OrmType = OrmType.PYDANTIC) -> set[str | None]:
        """Get the unique import statements for a column."""
        imports = set()  # future proofing in case multiple imports are needed
        if orm_type == OrmType.SQLALCHEMY:
            i = get_sqlalchemy_type(self.post_gres_datatype, ('Any', 'from sqlalchemy import Column'))[1]
        else:
            i = get_pydantic_type(self.post_gres_datatype)[1]
        imports.add(i)
        return imports

    def orm_datatype(self, orm_type: OrmType = OrmType.PYDANTIC) -> str:
        """Get the datatype for a column."""
        if orm_type == OrmType.SQLALCHEMY:
            return get_sqlalchemy_type(self.post_gres_datatype)[0]

        return get_pydantic_type(self.post_gres_datatype)[0]

    def is_user_defined_type(self) -> bool:
        """Check if the column is a user-defined type."""
        return self.post_gres_datatype == 'USER-DEFINED'

    def nullable(self) -> bool:
        """Check if the column is nullable."""
        return self.is_nullable if self.is_nullable is not None else False

has_default property

Check if the column has a default value.

is_generated property

Check if the column is auto-generated (identity or serial).

is_user_defined_type()

Check if the column is a user-defined type.

Source code in supabase_pydantic/util/dataclasses.py
def is_user_defined_type(self) -> bool:
    """Check if the column is a user-defined type."""
    return self.post_gres_datatype == 'USER-DEFINED'

nullable()

Check if the column is nullable.

Source code in supabase_pydantic/util/dataclasses.py
def nullable(self) -> bool:
    """Check if the column is nullable."""
    return self.is_nullable if self.is_nullable is not None else False

orm_datatype(orm_type=OrmType.PYDANTIC)

Get the datatype for a column.

Source code in supabase_pydantic/util/dataclasses.py
def orm_datatype(self, orm_type: OrmType = OrmType.PYDANTIC) -> str:
    """Get the datatype for a column."""
    if orm_type == OrmType.SQLALCHEMY:
        return get_sqlalchemy_type(self.post_gres_datatype)[0]

    return get_pydantic_type(self.post_gres_datatype)[0]

orm_imports(orm_type=OrmType.PYDANTIC)

Get the unique import statements for a column.

Source code in supabase_pydantic/util/dataclasses.py
def orm_imports(self, orm_type: OrmType = OrmType.PYDANTIC) -> set[str | None]:
    """Get the unique import statements for a column."""
    imports = set()  # future proofing in case multiple imports are needed
    if orm_type == OrmType.SQLALCHEMY:
        i = get_sqlalchemy_type(self.post_gres_datatype, ('Any', 'from sqlalchemy import Column'))[1]
    else:
        i = get_pydantic_type(self.post_gres_datatype)[1]
    imports.add(i)
    return imports

ConstraintInfo dataclass

Bases: AsDictParent

Source code in supabase_pydantic/util/dataclasses.py
@dataclass
class ConstraintInfo(AsDictParent):
    constraint_name: str
    raw_constraint_type: str
    constraint_definition: str
    columns: list[str] = field(default_factory=list)

    def constraint_type(self) -> str:
        """Get the constraint type."""
        return CONSTRAINT_TYPE_MAP.get(self.raw_constraint_type.lower(), 'OTHER')

constraint_type()

Get the constraint type.

Source code in supabase_pydantic/util/dataclasses.py
def constraint_type(self) -> str:
    """Get the constraint type."""
    return CONSTRAINT_TYPE_MAP.get(self.raw_constraint_type.lower(), 'OTHER')

TableInfo dataclass

Bases: AsDictParent

Source code in supabase_pydantic/util/dataclasses.py
@dataclass
class TableInfo(AsDictParent):
    name: str
    schema: str = 'public'
    table_type: Literal['BASE TABLE', 'VIEW'] = 'BASE TABLE'
    is_bridge: bool = False  # whether the table is a bridge table
    columns: list[ColumnInfo] = field(default_factory=list)
    foreign_keys: list[ForeignKeyInfo] = field(default_factory=list)
    constraints: list[ConstraintInfo] = field(default_factory=list)
    relationships: list[RelationshipInfo] = field(default_factory=list)
    generated_data: list[dict] = field(default_factory=list)

    def add_column(self, column: ColumnInfo) -> None:
        """Add a column to the table."""
        self.columns.append(column)

    def add_foreign_key(self, fk: ForeignKeyInfo) -> None:
        """Add a foreign key to the table."""
        self.foreign_keys.append(fk)

    def add_constraint(self, constraint: ConstraintInfo) -> None:
        """Add a constraint to the table."""
        self.constraints.append(constraint)

    def aliasing_in_columns(self) -> bool:
        """Check if any column within a table has an alias."""
        return any(bool(c.alias is not None) for c in self.columns)

    def table_dependencies(self) -> set[str]:
        """Get the table dependencies (foreign tables) for a table."""
        return set([fk.foreign_table_name for fk in self.foreign_keys])

    def primary_key(self) -> list[str]:
        """Get the primary key for a table."""
        if self.table_type == 'BASE TABLE':
            for constraint in self.constraints:
                if constraint.constraint_type() == 'PRIMARY KEY':
                    return constraint.columns
        return []  # Return an empty list if no primary key is found

    def primary_is_composite(self) -> bool:
        """Check if the primary key is composite."""
        return len(self.primary_key()) > 1

    def get_primary_columns(self, sort_results: bool = False) -> list[ColumnInfo]:
        """Get the primary columns for a table."""
        return self._get_columns(is_primary=True, sort_results=sort_results)

    def get_secondary_columns(self, sort_results: bool = False) -> list[ColumnInfo]:
        """Get the secondary columns for a table."""
        return self._get_columns(is_primary=False, sort_results=sort_results)

    def _get_columns(self, is_primary: bool = True, sort_results: bool = False) -> list[ColumnInfo]:
        """Private function to get the primary or secondary columns for a table."""
        if is_primary:
            res = [c for c in self.columns if c.name in self.primary_key()]
        else:
            res = [c for c in self.columns if c.name not in self.primary_key()]

        if sort_results:
            res.sort(key=lambda x: x.name)

        return res

    def sort_and_separate_columns(
        self, separate_nullable: bool = False, separate_primary_key: bool = False
    ) -> SortedColumns:
        """Sort and combine columns based on is_nullable attribute.

        Args:
            separate_nullable: Whether to separate nullable and non-nullable columns.
            separate_primary_key: Whether to separate primary key and secondary columns.

        Returns:
            A dictionary with keys, nullable, non_nullable, and remaining as keys
            and lists of ColumnInfo objects as values.
        """
        # result: dict[str, list[ColumnInfo]] = {'keys': [], 'nullable': [], 'non_nullable': [], 'remaining': []}
        result: SortedColumns = SortedColumns([], [], [], [])
        if separate_primary_key:
            result.primary_keys = self.get_primary_columns(sort_results=True)
            result.remaining = self.get_secondary_columns(sort_results=True)
        else:
            result.remaining = sorted(self.columns, key=lambda x: x.name)

        if separate_nullable:
            nullable_columns = [column for column in result.remaining if column.is_nullable]  # already sorted
            non_nullable_columns = [column for column in result.remaining if not column.is_nullable]

            # Combine them with non-nullable first
            result.nullable = nullable_columns
            result.non_nullable = non_nullable_columns
            result.remaining = []

        return result

    def has_unique_constraint(self) -> bool:
        """Check if the table has unique constraints."""
        return any(c.constraint_type() == 'UNIQUE' for c in self.constraints)

_get_columns(is_primary=True, sort_results=False)

Private function to get the primary or secondary columns for a table.

Source code in supabase_pydantic/util/dataclasses.py
def _get_columns(self, is_primary: bool = True, sort_results: bool = False) -> list[ColumnInfo]:
    """Private function to get the primary or secondary columns for a table."""
    if is_primary:
        res = [c for c in self.columns if c.name in self.primary_key()]
    else:
        res = [c for c in self.columns if c.name not in self.primary_key()]

    if sort_results:
        res.sort(key=lambda x: x.name)

    return res

add_column(column)

Add a column to the table.

Source code in supabase_pydantic/util/dataclasses.py
def add_column(self, column: ColumnInfo) -> None:
    """Add a column to the table."""
    self.columns.append(column)

add_constraint(constraint)

Add a constraint to the table.

Source code in supabase_pydantic/util/dataclasses.py
def add_constraint(self, constraint: ConstraintInfo) -> None:
    """Add a constraint to the table."""
    self.constraints.append(constraint)

add_foreign_key(fk)

Add a foreign key to the table.

Source code in supabase_pydantic/util/dataclasses.py
def add_foreign_key(self, fk: ForeignKeyInfo) -> None:
    """Add a foreign key to the table."""
    self.foreign_keys.append(fk)

aliasing_in_columns()

Check if any column within a table has an alias.

Source code in supabase_pydantic/util/dataclasses.py
def aliasing_in_columns(self) -> bool:
    """Check if any column within a table has an alias."""
    return any(bool(c.alias is not None) for c in self.columns)

get_primary_columns(sort_results=False)

Get the primary columns for a table.

Source code in supabase_pydantic/util/dataclasses.py
def get_primary_columns(self, sort_results: bool = False) -> list[ColumnInfo]:
    """Get the primary columns for a table."""
    return self._get_columns(is_primary=True, sort_results=sort_results)

get_secondary_columns(sort_results=False)

Get the secondary columns for a table.

Source code in supabase_pydantic/util/dataclasses.py
def get_secondary_columns(self, sort_results: bool = False) -> list[ColumnInfo]:
    """Get the secondary columns for a table."""
    return self._get_columns(is_primary=False, sort_results=sort_results)

has_unique_constraint()

Check if the table has unique constraints.

Source code in supabase_pydantic/util/dataclasses.py
def has_unique_constraint(self) -> bool:
    """Check if the table has unique constraints."""
    return any(c.constraint_type() == 'UNIQUE' for c in self.constraints)

primary_is_composite()

Check if the primary key is composite.

Source code in supabase_pydantic/util/dataclasses.py
def primary_is_composite(self) -> bool:
    """Check if the primary key is composite."""
    return len(self.primary_key()) > 1

primary_key()

Get the primary key for a table.

Source code in supabase_pydantic/util/dataclasses.py
def primary_key(self) -> list[str]:
    """Get the primary key for a table."""
    if self.table_type == 'BASE TABLE':
        for constraint in self.constraints:
            if constraint.constraint_type() == 'PRIMARY KEY':
                return constraint.columns
    return []  # Return an empty list if no primary key is found

sort_and_separate_columns(separate_nullable=False, separate_primary_key=False)

Sort and combine columns based on is_nullable attribute.

Parameters:

Name Type Description Default
separate_nullable bool

Whether to separate nullable and non-nullable columns.

False
separate_primary_key bool

Whether to separate primary key and secondary columns.

False

Returns:

Type Description
SortedColumns

A dictionary with keys, nullable, non_nullable, and remaining as keys

SortedColumns

and lists of ColumnInfo objects as values.

Source code in supabase_pydantic/util/dataclasses.py
def sort_and_separate_columns(
    self, separate_nullable: bool = False, separate_primary_key: bool = False
) -> SortedColumns:
    """Sort and combine columns based on is_nullable attribute.

    Args:
        separate_nullable: Whether to separate nullable and non-nullable columns.
        separate_primary_key: Whether to separate primary key and secondary columns.

    Returns:
        A dictionary with keys, nullable, non_nullable, and remaining as keys
        and lists of ColumnInfo objects as values.
    """
    # result: dict[str, list[ColumnInfo]] = {'keys': [], 'nullable': [], 'non_nullable': [], 'remaining': []}
    result: SortedColumns = SortedColumns([], [], [], [])
    if separate_primary_key:
        result.primary_keys = self.get_primary_columns(sort_results=True)
        result.remaining = self.get_secondary_columns(sort_results=True)
    else:
        result.remaining = sorted(self.columns, key=lambda x: x.name)

    if separate_nullable:
        nullable_columns = [column for column in result.remaining if column.is_nullable]  # already sorted
        non_nullable_columns = [column for column in result.remaining if not column.is_nullable]

        # Combine them with non-nullable first
        result.nullable = nullable_columns
        result.non_nullable = non_nullable_columns
        result.remaining = []

    return result

table_dependencies()

Get the table dependencies (foreign tables) for a table.

Source code in supabase_pydantic/util/dataclasses.py
def table_dependencies(self) -> set[str]:
    """Get the table dependencies (foreign tables) for a table."""
    return set([fk.foreign_table_name for fk in self.foreign_keys])

Abstract Writer Classes

AbstractClassWriter

Bases: ABC

Source code in supabase_pydantic/util/writers/abstract_classes.py
class AbstractClassWriter(ABC):
    def __init__(
        self, table: TableInfo, class_type: WriterClassType = WriterClassType.BASE, null_defaults: bool = False
    ):
        self.table = table
        self.class_type = class_type
        self._null_defaults = null_defaults
        self.name = to_pascal_case(self.table.name)

    @staticmethod
    def _proper_name(name: str, use_base: bool = False) -> str:
        return to_pascal_case(name) + (BASE_CLASS_POSTFIX if use_base else '')

    def write_class(
        self,
        add_fk: bool = False,
    ) -> str:
        """Method to write the complete class definition."""
        return self.write_definition() + self.write_docs() + self.write_columns(add_fk)

    @abstractmethod
    def write_operational_class(self) -> str | None:
        """Method to generate operational class definitions."""
        return None

    @abstractmethod
    def write_name(self) -> str:
        """Method to generate the header for the base class."""
        raise NotImplementedError('write_name not implemented')

    @abstractmethod
    def write_metaclass(self, metaclasses: list[str] | None = None) -> str | None:
        """Method to generate the metaclasses for the class."""
        raise NotImplementedError('write_metaclass not implemented')

    @abstractmethod
    def write_docs(self) -> str:
        """Method to generate the docstrings for the class."""
        raise NotImplementedError('write_docs not implemented')

    def write_definition(self) -> str:
        """Method to generate the class definition for the class."""
        name = self.write_name()
        metaclass = self.write_metaclass()
        result = f'class {name}({metaclass}):' if metaclass else f'class {name}:'

        # print(f'\nwrite_definition() for table {self.table.name}:')
        # print(f'  class_type: {self.class_type}')
        # print(f'  name: {name}')
        # print(f'  metaclass: {metaclass}')
        # print(f'  result: {result}')
        return result

    @abstractmethod
    def write_primary_keys(self) -> str | None:
        """Method to generate primary key definitions for the class."""
        raise NotImplementedError('write_primary_keys not implemented')

    @abstractmethod
    def write_primary_columns(self) -> str | None:
        """Method to generate column definitions for the class."""
        raise NotImplementedError('write_primary_columns not implemented')

    @abstractmethod
    def write_foreign_columns(self, use_base: bool = False) -> str | None:
        """Method to generate foreign column definitions for the class."""
        raise NotImplementedError('write_foreign_columns not implemented')

    @staticmethod
    def column_section(comment_title: str, columns: list[str]) -> str:
        """Method to generate a section of columns."""
        return f'\t# {comment_title}\n' + '\n'.join([f'\t{c}' for c in columns])

    def write_columns(self, add_fk: bool = False) -> str:
        """Method to generate column definitions for the class."""
        keys = self.write_primary_keys()
        cols = self.write_primary_columns()
        fcols = self.write_foreign_columns() if add_fk else None

        columns = [x for x in [keys, cols, fcols] if x is not None]
        return '\n\n'.join(columns)

column_section(comment_title, columns) staticmethod

Method to generate a section of columns.

Source code in supabase_pydantic/util/writers/abstract_classes.py
@staticmethod
def column_section(comment_title: str, columns: list[str]) -> str:
    """Method to generate a section of columns."""
    return f'\t# {comment_title}\n' + '\n'.join([f'\t{c}' for c in columns])

write_class(add_fk=False)

Method to write the complete class definition.

Source code in supabase_pydantic/util/writers/abstract_classes.py
def write_class(
    self,
    add_fk: bool = False,
) -> str:
    """Method to write the complete class definition."""
    return self.write_definition() + self.write_docs() + self.write_columns(add_fk)

write_columns(add_fk=False)

Method to generate column definitions for the class.

Source code in supabase_pydantic/util/writers/abstract_classes.py
def write_columns(self, add_fk: bool = False) -> str:
    """Method to generate column definitions for the class."""
    keys = self.write_primary_keys()
    cols = self.write_primary_columns()
    fcols = self.write_foreign_columns() if add_fk else None

    columns = [x for x in [keys, cols, fcols] if x is not None]
    return '\n\n'.join(columns)

write_definition()

Method to generate the class definition for the class.

Source code in supabase_pydantic/util/writers/abstract_classes.py
def write_definition(self) -> str:
    """Method to generate the class definition for the class."""
    name = self.write_name()
    metaclass = self.write_metaclass()
    result = f'class {name}({metaclass}):' if metaclass else f'class {name}:'

    # print(f'\nwrite_definition() for table {self.table.name}:')
    # print(f'  class_type: {self.class_type}')
    # print(f'  name: {name}')
    # print(f'  metaclass: {metaclass}')
    # print(f'  result: {result}')
    return result

write_docs() abstractmethod

Method to generate the docstrings for the class.

Source code in supabase_pydantic/util/writers/abstract_classes.py
@abstractmethod
def write_docs(self) -> str:
    """Method to generate the docstrings for the class."""
    raise NotImplementedError('write_docs not implemented')

write_foreign_columns(use_base=False) abstractmethod

Method to generate foreign column definitions for the class.

Source code in supabase_pydantic/util/writers/abstract_classes.py
@abstractmethod
def write_foreign_columns(self, use_base: bool = False) -> str | None:
    """Method to generate foreign column definitions for the class."""
    raise NotImplementedError('write_foreign_columns not implemented')

write_metaclass(metaclasses=None) abstractmethod

Method to generate the metaclasses for the class.

Source code in supabase_pydantic/util/writers/abstract_classes.py
@abstractmethod
def write_metaclass(self, metaclasses: list[str] | None = None) -> str | None:
    """Method to generate the metaclasses for the class."""
    raise NotImplementedError('write_metaclass not implemented')

write_name() abstractmethod

Method to generate the header for the base class.

Source code in supabase_pydantic/util/writers/abstract_classes.py
@abstractmethod
def write_name(self) -> str:
    """Method to generate the header for the base class."""
    raise NotImplementedError('write_name not implemented')

write_operational_class() abstractmethod

Method to generate operational class definitions.

Source code in supabase_pydantic/util/writers/abstract_classes.py
@abstractmethod
def write_operational_class(self) -> str | None:
    """Method to generate operational class definitions."""
    return None

write_primary_columns() abstractmethod

Method to generate column definitions for the class.

Source code in supabase_pydantic/util/writers/abstract_classes.py
@abstractmethod
def write_primary_columns(self) -> str | None:
    """Method to generate column definitions for the class."""
    raise NotImplementedError('write_primary_columns not implemented')

write_primary_keys() abstractmethod

Method to generate primary key definitions for the class.

Source code in supabase_pydantic/util/writers/abstract_classes.py
@abstractmethod
def write_primary_keys(self) -> str | None:
    """Method to generate primary key definitions for the class."""
    raise NotImplementedError('write_primary_keys not implemented')

AbstractFileWriter

Bases: ABC

Source code in supabase_pydantic/util/writers/abstract_classes.py
class AbstractFileWriter(ABC):
    def __init__(
        self,
        tables: list[TableInfo],
        file_path: str,
        writer: type[AbstractClassWriter],
        add_null_parent_classes: bool = False,
    ):
        self.tables = tables
        self.file_path = file_path
        self.add_null_parent_classes = add_null_parent_classes
        self.writer = writer
        self.jstr = '\n\n\n'

    def write(self) -> str:
        """Method to write the complete file."""
        # order is important here
        parts = [
            self.write_imports(),
            self.write_custom_classes(),
            self.write_base_classes(),
            self.write_operational_classes(),
        ]

        # filter None and join parts
        return self.jstr.join(p for p in parts if p is not None) + '\n'

    def save(self, overwrite: bool = False) -> tuple[str, str | None]:
        """Method to save the file."""
        fp = Path(self.file_path)
        base, ext, directory = fp.stem, fp.suffix, str(fp.parent)
        latest_file = os.path.join(directory, f'{base}_latest{ext}')
        with open(latest_file, 'w') as f:
            f.write(self.write())

        if not overwrite:
            versioned_file = generate_unique_filename(base, ext, directory)
            with open(versioned_file, 'w') as f:
                f.write(self.write())

            return latest_file, versioned_file

        return latest_file, None

    def join(self, strings: list[str]) -> str:
        """Method to join strings."""
        return self.jstr.join(strings)

    @abstractmethod
    def write_imports(self) -> str:
        """Method to generate import statements for the file."""
        raise NotImplementedError('write_imports not implemented')

    @abstractmethod
    def write_custom_classes(self) -> str | None:
        """Method to generate custom class definitions for the file."""
        raise NotImplementedError('write_custom_classes not implemented')

    @abstractmethod
    def write_base_classes(self) -> str:
        """Method to generate class definitions for the file."""
        raise NotImplementedError('write_base_classes not implemented')

    @abstractmethod
    def write_operational_classes(self) -> str | None:
        """Method to generate operational class definitions for the file."""
        raise NotImplementedError('write_operational_classes not implemented')

join(strings)

Method to join strings.

Source code in supabase_pydantic/util/writers/abstract_classes.py
def join(self, strings: list[str]) -> str:
    """Method to join strings."""
    return self.jstr.join(strings)

save(overwrite=False)

Method to save the file.

Source code in supabase_pydantic/util/writers/abstract_classes.py
def save(self, overwrite: bool = False) -> tuple[str, str | None]:
    """Method to save the file."""
    fp = Path(self.file_path)
    base, ext, directory = fp.stem, fp.suffix, str(fp.parent)
    latest_file = os.path.join(directory, f'{base}_latest{ext}')
    with open(latest_file, 'w') as f:
        f.write(self.write())

    if not overwrite:
        versioned_file = generate_unique_filename(base, ext, directory)
        with open(versioned_file, 'w') as f:
            f.write(self.write())

        return latest_file, versioned_file

    return latest_file, None

write()

Method to write the complete file.

Source code in supabase_pydantic/util/writers/abstract_classes.py
def write(self) -> str:
    """Method to write the complete file."""
    # order is important here
    parts = [
        self.write_imports(),
        self.write_custom_classes(),
        self.write_base_classes(),
        self.write_operational_classes(),
    ]

    # filter None and join parts
    return self.jstr.join(p for p in parts if p is not None) + '\n'

write_base_classes() abstractmethod

Method to generate class definitions for the file.

Source code in supabase_pydantic/util/writers/abstract_classes.py
@abstractmethod
def write_base_classes(self) -> str:
    """Method to generate class definitions for the file."""
    raise NotImplementedError('write_base_classes not implemented')

write_custom_classes() abstractmethod

Method to generate custom class definitions for the file.

Source code in supabase_pydantic/util/writers/abstract_classes.py
@abstractmethod
def write_custom_classes(self) -> str | None:
    """Method to generate custom class definitions for the file."""
    raise NotImplementedError('write_custom_classes not implemented')

write_imports() abstractmethod

Method to generate import statements for the file.

Source code in supabase_pydantic/util/writers/abstract_classes.py
@abstractmethod
def write_imports(self) -> str:
    """Method to generate import statements for the file."""
    raise NotImplementedError('write_imports not implemented')

write_operational_classes() abstractmethod

Method to generate operational class definitions for the file.

Source code in supabase_pydantic/util/writers/abstract_classes.py
@abstractmethod
def write_operational_classes(self) -> str | None:
    """Method to generate operational class definitions for the file."""
    raise NotImplementedError('write_operational_classes not implemented')

Marshaling Logic

add_constraints_to_table_details(tables, schema, constraints)

Add constraints to the table details.

Source code in supabase_pydantic/util/marshalers.py
def add_constraints_to_table_details(tables: dict, schema: str, constraints: list) -> None:
    """Add constraints to the table details."""
    for row in constraints:
        (constraint_name, table_name, columns, constraint_type, constraint_definition) = row

        # Remove schema from the beginning of table_name if present
        if table_name.startswith(f'{schema}.'):
            table_name = table_name[len(schema) + 1 :]  # Remove schema and the dot  # noqa: E203
        table_name = table_name.lstrip('.')  # Remove any leading dots
        table_key = (schema, table_name)

        # Create the constraint and add it to the table
        if table_key in tables:
            constraint = ConstraintInfo(
                constraint_name=constraint_name,
                columns=[standardize_column_name(c) or str(c) for c in columns],
                raw_constraint_type=constraint_type,
                constraint_definition=constraint_definition,
            )
            tables[table_key].add_constraint(constraint)

add_foreign_key_info_to_table_details(tables, fk_details)

Add foreign key information to the table details.

Skips foreign keys where either the source or target table is missing. This ensures that all foreign keys have valid relationship types.

Source code in supabase_pydantic/util/marshalers.py
def add_foreign_key_info_to_table_details(tables: dict, fk_details: list) -> None:
    """Add foreign key information to the table details.

    Skips foreign keys where either the source or target table is missing.
    This ensures that all foreign keys have valid relationship types.
    """
    for row in fk_details:
        (
            table_schema,
            table_name,
            column_name,
            foreign_table_schema,
            foreign_table_name,
            foreign_column_name,
            constraint_name,
        ) = row
        table_key = (table_schema, table_name)
        foreign_table_key = (foreign_table_schema, foreign_table_name)

        # Skip if either table is missing
        if table_key not in tables or foreign_table_key not in tables:
            missing_source = table_key not in tables
            missing_target = foreign_table_key not in tables
            if missing_target and not missing_source:
                logging.debug(
                    f'Foreign key {constraint_name} references table {foreign_table_schema}.{foreign_table_name} '
                    f'which is not in the current analysis. If you need complete relationship information, '
                    f'consider including the {foreign_table_schema} schema in your analysis.'
                )
            else:
                logging.debug(
                    f'Skipping foreign key {constraint_name} - missing source table {table_schema}.{table_name}'
                )
            continue

        # Determine relationship type
        relation_type = None
        if table_key in tables and foreign_table_key in tables:
            logging.debug(
                f'Analyzing relationship for {table_key[1]}.{column_name} '
                f'-> {foreign_table_key[1]}.{foreign_column_name}'
            )

            # First check if this is a one-to-one relationship
            # This happens when the foreign key is the only primary key in either table
            is_one_to_one = False
            found_composite_key = False

            # Check if foreign key is the only primary key in source table
            logging.debug(f'Checking constraints in source table {table_key[1]}:')
            for constraint in tables[table_key].constraints:
                logging.debug(f'  - Constraint: {constraint.raw_constraint_type}, columns: {constraint.columns}')
                if constraint.raw_constraint_type == 'p':  # primary key
                    # Check if the foreign key column is part of the primary key
                    if column_name in constraint.columns:
                        # If it's part of a composite key, it should be many-to-one
                        if len(constraint.columns) > 1:
                            logging.debug(f'    Found composite primary key including {column_name}')
                            # Found a composite key, so this must be many-to-one
                            found_composite_key = True
                            break
                        else:
                            logging.debug(f'    Found single primary key constraint on {column_name}')
                            is_one_to_one = True

            # If we found a composite key, it's definitely many-to-one
            if found_composite_key:
                is_one_to_one = False
            # Otherwise check the target table
            elif not is_one_to_one:
                logging.debug(f'Checking constraints in target table {foreign_table_key[1]}:')
                for constraint in tables[foreign_table_key].constraints:
                    logging.debug(f'  - Constraint: {constraint.raw_constraint_type}, columns: {constraint.columns}')
                    if constraint.raw_constraint_type == 'p':  # primary key
                        # Check if the foreign key column is part of the primary key
                        if foreign_column_name in constraint.columns:
                            # If it's part of a composite key, it should be many-to-one
                            if len(constraint.columns) > 1:
                                logging.debug(f'    Found composite primary key including {foreign_column_name}')
                                # Found a composite key, so this must be many-to-one
                                found_composite_key = True
                                break
                            else:
                                logging.debug(f'    Found single primary key constraint on {foreign_column_name}')
                                is_one_to_one = True

            # If we found a composite key in either table, it's many-to-one
            if found_composite_key:
                is_one_to_one = False

            if is_one_to_one:
                relation_type = RelationType.ONE_TO_ONE
                logging.debug('Detected ONE_TO_ONE relationship')
            else:
                # If not one-to-one, check if it's many-to-many
                fk_columns = [
                    fk for fk in tables[table_key].foreign_keys if fk.foreign_table_name == foreign_table_name
                ]
                if len(fk_columns) > 1:
                    relation_type = RelationType.MANY_TO_MANY
                    logging.debug('Detected MANY_TO_MANY relationship (multiple foreign keys to same table)')
                else:
                    # If not one-to-one or many-to-many, then it's a many-to-one relationship
                    # from the perspective of the table with the foreign key
                    relation_type = RelationType.MANY_TO_ONE
                    logging.debug('Detected MANY_TO_ONE relationship (default case)')

        if table_key in tables:
            fk_info = ForeignKeyInfo(
                constraint_name=constraint_name,
                column_name=standardize_column_name(column_name) or column_name,
                foreign_table_name=foreign_table_name,
                foreign_column_name=standardize_column_name(foreign_column_name) or foreign_column_name,
                relation_type=relation_type,
                foreign_table_schema=foreign_table_schema,
            )
            tables[table_key].add_foreign_key(fk_info)

add_relationships_to_table_details(tables, fk_details)

Add relationships to the table details.

Source code in supabase_pydantic/util/marshalers.py
def add_relationships_to_table_details(tables: dict, fk_details: list) -> None:
    """Add relationships to the table details."""
    # Process relationships
    for row in fk_details:
        (
            table_schema,
            table_name,
            column_name,
            foreign_table_schema,
            foreign_table_name,
            foreign_column_name,
            constraint_name,
        ) = row
        table_key = (table_schema, table_name)
        foreign_table_key = (foreign_table_schema, foreign_table_name)

        # Skip if either table doesn't exist
        if table_key not in tables or foreign_table_key not in tables:
            continue

        table = tables[table_key]
        foreign_table = tables[foreign_table_key]

        # If this is a bridge table, create MANY_TO_MANY relationships between the tables it connects
        if table.is_bridge:
            # Get all foreign keys in the bridge table
            bridge_fks = table.foreign_keys
            # For each pair of tables connected by the bridge table
            for i, fk1 in enumerate(bridge_fks):
                for fk2 in bridge_fks[i + 1 :]:  # noqa: E203
                    # Add MANY_TO_MANY relationships between the connected tables
                    # Use the bridge table's schema since all tables are in the same schema
                    table1_key = (table.schema, fk1.foreign_table_name)
                    table2_key = (table.schema, fk2.foreign_table_name)
                    if table1_key in tables and table2_key in tables:
                        # Add relationship from table1 to table2
                        tables[table1_key].relationships.append(
                            RelationshipInfo(
                                table_name=table1_key[1],
                                related_table_name=fk2.foreign_table_name,
                                relation_type=RelationType.MANY_TO_MANY,
                            )
                        )
                        # Add relationship from table2 to table1
                        tables[table2_key].relationships.append(
                            RelationshipInfo(
                                table_name=table2_key[1],
                                related_table_name=fk1.foreign_table_name,
                                relation_type=RelationType.MANY_TO_MANY,
                            )
                        )

        # For non-bridge tables, determine the relationship type
        fk_columns = [fk for fk in table.foreign_keys if fk.foreign_table_name == foreign_table_name]
        if len(fk_columns) == 1:
            # One-to-Many or One-to-One
            is_source_unique = any(col.name == column_name and (col.is_unique or col.primary) for col in table.columns)
            is_target_unique = any(
                col.name == foreign_column_name and (col.is_unique or col.primary) for col in foreign_table.columns
            )

            if is_source_unique and is_target_unique:
                relation_type = RelationType.ONE_TO_ONE
            else:
                relation_type = RelationType.ONE_TO_MANY
        else:
            # Many-to-Many
            relation_type = RelationType.MANY_TO_MANY

        # Add relationship to both tables
        if table_key in tables:
            tables[table_key].relationships.append(
                RelationshipInfo(
                    table_name=table_key[1],
                    related_table_name=foreign_table_key[1],
                    relation_type=relation_type,
                )
            )
        if foreign_table_key in tables:
            tables[foreign_table_key].relationships.append(
                RelationshipInfo(
                    table_name=foreign_table_key[1],
                    related_table_name=table_key[1],
                    relation_type=relation_type,
                )
            )

add_user_defined_types_to_tables(tables, schema, enum_types, enum_type_mapping)

Get user defined types and add them to ColumnInfo.

Source code in supabase_pydantic/util/marshalers.py
def add_user_defined_types_to_tables(
    tables: dict[tuple[str, str], TableInfo], schema: str, enum_types: list, enum_type_mapping: list
) -> None:
    """Get user defined types and add them to ColumnInfo."""
    enums = get_enum_types(enum_types, schema)
    mappings = get_user_type_mappings(enum_type_mapping, schema)

    for mapping in mappings:
        table_key = (schema, mapping.table_name)
        enum_values = next((e.enum_values for e in enums if e.type_name == mapping.type_name), None)
        if table_key in tables:
            if mapping.column_name in [c.name for c in tables[table_key].columns]:
                for col in tables[table_key].columns:
                    if col.name == mapping.column_name:
                        col.user_defined_values = enum_values
                        break
            else:
                print('Column name not found in table columns for adding user defined values: ', mapping.column_name)
        else:
            print('Table key not found in tables for adding user defined values: ', tables[table_key])

analyze_bridge_tables(tables)

Analyze if each table is a bridge table.

Source code in supabase_pydantic/util/marshalers.py
def analyze_bridge_tables(tables: dict) -> None:
    """Analyze if each table is a bridge table."""
    for table in tables.values():
        table.is_bridge = is_bridge_table(table)
        if table.is_bridge:
            # Update all foreign key relationships to MANY_TO_MANY
            for fk in table.foreign_keys:
                logging.debug(
                    f'Setting {table.name}.{fk.column_name} -> {fk.foreign_table_name}.{fk.foreign_column_name} to MANY_TO_MANY'  # noqa: E501
                )
                fk.relation_type = RelationType.MANY_TO_MANY

analyze_table_relationships(tables)

Analyze table relationships.

Source code in supabase_pydantic/util/marshalers.py
def analyze_table_relationships(tables: dict) -> None:
    """Analyze table relationships."""
    # Keep track of processed relationships to avoid duplicate analysis
    processed_constraints = set()

    for table in tables.values():
        for fk in table.foreign_keys:
            # Skip if we've already processed this constraint
            if fk.constraint_name in processed_constraints:
                continue

            # Get the foreign table
            foreign_table = next(
                (t for t in tables.values() if t.name == fk.foreign_table_name and t.schema == fk.foreign_table_schema),
                None,
            )
            if not foreign_table:
                continue

            # Determine relationship types for both directions
            forward_type, reverse_type = determine_relationship_type(table, foreign_table, fk)

            # Set the forward relationship type
            fk.relation_type = forward_type

            # Handle the reverse relationship
            existing_fk = next((f for f in foreign_table.foreign_keys if f.constraint_name == fk.constraint_name), None)

            if existing_fk:
                # Update existing reverse foreign key
                existing_fk.relation_type = reverse_type
            else:
                # Create new reverse foreign key
                reverse_fk = ForeignKeyInfo(
                    constraint_name=fk.constraint_name,
                    column_name=fk.foreign_column_name,
                    foreign_table_name=table.name,
                    foreign_column_name=fk.column_name,
                    relation_type=reverse_type,
                )
                foreign_table.foreign_keys.append(reverse_fk)

            # Mark this constraint as processed
            processed_constraints.add(fk.constraint_name)

column_name_is_reserved(column_name)

Check if the column name is a reserved keyword or built-in name or starts with model_.

Source code in supabase_pydantic/util/marshalers.py
def column_name_is_reserved(column_name: str) -> bool:
    """Check if the column name is a reserved keyword or built-in name or starts with model_."""
    return column_name in dir(builtins) or column_name in keyword.kwlist or column_name.startswith('model_')

column_name_reserved_exceptions(column_name)

Check for select exceptions to the reserved column name check.

Source code in supabase_pydantic/util/marshalers.py
def column_name_reserved_exceptions(column_name: str) -> bool:
    """Check for select exceptions to the reserved column name check."""
    exceptions = ['id']
    return column_name.lower() in exceptions

construct_table_info(column_details, fk_details, constraints, enum_types, enum_type_mapping, schema='public')

Construct TableInfo objects from column and foreign key details.

Source code in supabase_pydantic/util/marshalers.py
def construct_table_info(
    column_details: list,
    fk_details: list,
    constraints: list,
    # user_defined_types: list,
    enum_types: list,
    enum_type_mapping: list,
    schema: str = 'public',
) -> list[TableInfo]:
    """Construct TableInfo objects from column and foreign key details."""
    # Construct table information
    tables = get_table_details_from_columns(column_details)
    add_foreign_key_info_to_table_details(tables, fk_details)
    add_constraints_to_table_details(tables, schema, constraints)
    add_relationships_to_table_details(tables, fk_details)
    add_user_defined_types_to_tables(tables, schema, enum_types, enum_type_mapping)

    # Update columns with constraints
    update_columns_with_constraints(tables)
    update_column_constraint_definitions(tables)
    analyze_bridge_tables(tables)
    for _ in range(2):
        # TODO: update this fn to avoid running twice.
        # print('running analyze_table_relationships ' + str(i))
        analyze_table_relationships(tables)  # run twice to ensure all relationships are captured

    return list(tables.values())

determine_relationship_type(source_table, target_table, fk)

Determine the relationship type between two tables based on their constraints.

Parameters:

Name Type Description Default
source_table TableInfo

The table containing the foreign key

required
target_table TableInfo

The table being referenced by the foreign key

required
fk ForeignKeyInfo

The foreign key information

required

Returns:

Type Description
tuple[RelationType, RelationType]

A tuple of (forward_type, reverse_type) representing the relationship in both directions

Source code in supabase_pydantic/util/marshalers.py
def determine_relationship_type(
    source_table: TableInfo, target_table: TableInfo, fk: ForeignKeyInfo
) -> tuple[RelationType, RelationType]:
    """Determine the relationship type between two tables based on their constraints.

    Args:
        source_table: The table containing the foreign key
        target_table: The table being referenced by the foreign key
        fk: The foreign key information

    Returns:
        A tuple of (forward_type, reverse_type) representing the relationship in both directions
    """
    # Check primary key constraints
    source_primary_constraints = [
        c for c in source_table.constraints if c.raw_constraint_type == 'p' and fk.column_name in c.columns
    ]
    target_primary_constraints = [
        c for c in target_table.constraints if c.raw_constraint_type == 'p' and fk.foreign_column_name in c.columns
    ]

    # Check if columns are sole primary keys
    is_source_sole_primary = any(len(c.columns) == 1 for c in source_primary_constraints)
    is_target_sole_primary = any(len(c.columns) == 1 for c in target_primary_constraints)

    # Check uniqueness constraints
    is_source_unique = is_source_sole_primary or any(
        col.name == fk.column_name and col.is_unique for col in source_table.columns
    )
    is_target_unique = is_target_sole_primary or any(
        col.is_unique and col.name == fk.foreign_column_name for col in target_table.columns
    )

    # Log the analysis
    logging.debug(
        f'Analyzing relationship: {source_table.name}.{fk.column_name} -> {target_table.name}.{fk.foreign_column_name}'
    )
    logging.debug(f'Source uniqueness: {is_source_unique}, Target uniqueness: {is_target_unique}')

    # Determine relationship type
    if is_source_unique and is_target_unique:
        # If both sides are unique, it's a one-to-one relationship
        logging.debug('ONE_TO_ONE: Both sides are unique')
        return RelationType.ONE_TO_ONE, RelationType.ONE_TO_ONE
    elif is_target_unique:
        # If only target is unique, it's many-to-one from source to target
        logging.debug('MANY_TO_ONE: Target is unique, source is not')
        return RelationType.MANY_TO_ONE, RelationType.ONE_TO_MANY
    elif is_source_unique:
        # If only source is unique, it's one-to-many from source to target
        logging.debug('ONE_TO_MANY: Source is unique, target is not')
        return RelationType.ONE_TO_MANY, RelationType.MANY_TO_ONE
    else:
        # If neither side is unique, it's many-to-many
        logging.debug('MANY_TO_MANY: Neither side is unique')
        return RelationType.MANY_TO_MANY, RelationType.MANY_TO_MANY

get_alias(column_name)

Provide the original column name as an alias for Pydantic.

Source code in supabase_pydantic/util/marshalers.py
def get_alias(column_name: str) -> str | None:
    """Provide the original column name as an alias for Pydantic."""
    return (
        column_name
        if column_name_is_reserved(column_name) and not column_name_reserved_exceptions(column_name)
        else None
    )

get_enum_types(enum_types, schema)

Get enum types.

Source code in supabase_pydantic/util/marshalers.py
def get_enum_types(enum_types: list, schema: str) -> list[UserEnumType]:
    """Get enum types."""
    enums = []
    for row in enum_types:
        (
            type_name,
            namespace,
            owner,
            category,
            is_defined,
            t,  # type, typtype
            enum_values,
        ) = row
        if t == 'e' and namespace == schema:
            enums.append(
                UserEnumType(
                    type_name,
                    namespace,
                    owner,
                    category,
                    is_defined,
                    t,
                    enum_values,
                )
            )
    return enums

get_table_details_from_columns(column_details)

Get the table details from the column details.

Source code in supabase_pydantic/util/marshalers.py
def get_table_details_from_columns(column_details: list) -> dict[tuple[str, str], TableInfo]:
    """Get the table details from the column details."""
    tables = {}
    for row in column_details:
        (
            schema,
            table_name,
            column_name,
            default,
            is_nullable,
            data_type,
            max_length,
            table_type,
            identity_generation,
        ) = row
        table_key: tuple[str, str] = (schema, table_name)
        if table_key not in tables:
            tables[table_key] = TableInfo(name=table_name, schema=schema, table_type=table_type)
        column_info = ColumnInfo(
            name=standardize_column_name(column_name) or column_name,
            alias=get_alias(column_name),
            post_gres_datatype=data_type,
            datatype=PYDANTIC_TYPE_MAP.get(data_type, ('Any, from typing import Any'))[0],
            default=default,
            is_nullable=is_nullable == 'YES',
            max_length=max_length,
            is_identity=identity_generation is not None,
        )
        tables[table_key].add_column(column_info)

    return tables

get_unique_columns_from_constraints(constraint)

Get unique columns from constraints.

Source code in supabase_pydantic/util/marshalers.py
def get_unique_columns_from_constraints(constraint: ConstraintInfo) -> list[str | Any]:
    """Get unique columns from constraints."""
    unique_columns = []
    if constraint.constraint_type() == 'UNIQUE':
        match = re.match(r'UNIQUE \(([^)]+)\)', constraint.constraint_definition)
        if match:
            columns = match.group(1).split(',')
            unique_columns = [c.strip() for c in columns]
    return unique_columns

get_user_type_mappings(enum_type_mapping, schema)

Get user type mappings.

Source code in supabase_pydantic/util/marshalers.py
def get_user_type_mappings(enum_type_mapping: list, schema: str) -> list[UserTypeMapping]:
    """Get user type mappings."""
    mappings = []
    for row in enum_type_mapping:
        (
            column_name,
            table_name,
            namespace,
            type_name,
            type_category,
            type_description,
        ) = row
        if namespace == schema:
            mappings.append(
                UserTypeMapping(
                    column_name,
                    table_name,
                    namespace,
                    type_name,
                    type_category,
                    type_description,
                )
            )
    return mappings

is_bridge_table(table)

Check if the table is a bridge table.

Source code in supabase_pydantic/util/marshalers.py
def is_bridge_table(table: TableInfo) -> bool:
    """Check if the table is a bridge table."""
    logging.debug(f'Analyzing if {table.name} is a bridge table')
    logging.debug(f'Foreign keys: {[fk.column_name for fk in table.foreign_keys]}')

    # Check for at least two foreign keys
    if len(table.foreign_keys) < 2:
        logging.debug('Not a bridge table: Less than 2 foreign keys')
        return False

    # Identify columns that are both primary keys and part of foreign keys
    primary_foreign_keys = [
        col.name
        for col in table.columns
        if col.primary and any(fk.column_name == col.name for fk in table.foreign_keys)
    ]
    logging.debug(f'Primary foreign keys: {primary_foreign_keys}')

    # Check if there are at least two such columns
    if len(primary_foreign_keys) < 2:
        logging.debug('Not a bridge table: Less than 2 primary foreign keys')
        return False

    # Get all primary key columns
    primary_keys = [col.name for col in table.columns if col.primary]
    logging.debug(f'All primary keys: {primary_keys}')

    # Consider the table a bridge table if the primary key is composite and includes at least two foreign key columns
    if len(primary_foreign_keys) == len(primary_keys):
        logging.debug('Is bridge table: All primary keys are foreign keys')
        return True

    logging.debug('Not a bridge table: Some primary keys are not foreign keys')
    return False

parse_constraint_definition_for_fk(constraint_definition)

Parse the foreign key definition from the constraint.

Source code in supabase_pydantic/util/marshalers.py
def parse_constraint_definition_for_fk(constraint_definition: str) -> tuple[str, str, str] | None:
    """Parse the foreign key definition from the constraint."""
    match = re.match(r'FOREIGN KEY \(([^)]+)\) REFERENCES (\S+)\(([^)]+)\)', constraint_definition)
    if match:
        column_name = match.group(1)
        foreign_table_name = match.group(2)
        foreign_column_name = match.group(3)

        return column_name, foreign_table_name, foreign_column_name
    return None

standardize_column_name(column_name)

Check if the column name is a reserved keyword or built-in name and replace it if necessary.

Source code in supabase_pydantic/util/marshalers.py
def standardize_column_name(column_name: str) -> str | None:
    """Check if the column name is a reserved keyword or built-in name and replace it if necessary."""
    return (
        f'field_{column_name}'
        if column_name_is_reserved(column_name) and not column_name_reserved_exceptions(column_name)
        else column_name
    )

update_column_constraint_definitions(tables)

Update columns with their CHECK constraint definitions.

Source code in supabase_pydantic/util/marshalers.py
def update_column_constraint_definitions(tables: dict) -> None:
    """Update columns with their CHECK constraint definitions."""
    for table in tables.values():
        if table.columns is None or len(table.columns) == 0:
            continue
        if table.constraints is None or len(table.constraints) == 0:
            continue

        # iterate through columns and constraints
        for column in table.columns:
            for constraint in table.constraints:
                # Only process CHECK constraints that affect this column
                if constraint.constraint_type() == 'CHECK' and len(constraint.columns) == 1:
                    if column.name == constraint.columns[0]:
                        column.constraint_definition = constraint.constraint_definition

update_columns_with_constraints(tables)

Update columns with constraints.

Source code in supabase_pydantic/util/marshalers.py
def update_columns_with_constraints(tables: dict) -> None:
    """Update columns with constraints."""
    for table in tables.values():
        if table.columns is None or len(table.columns) == 0:
            continue
        if table.constraints is None or len(table.constraints) == 0:
            continue

        # iterate through columns and constraints
        for column in table.columns:
            for constraint in table.constraints:
                for col in constraint.columns:
                    if column.name == col:
                        if constraint.constraint_type() == 'PRIMARY KEY':
                            column.primary = True
                        if constraint.constraint_type() == 'UNIQUE':
                            column.is_unique = True
                            column.unique_partners = get_unique_columns_from_constraints(constraint)
                        if constraint.constraint_type() == 'FOREIGN KEY':
                            column.is_foreign_key = True
                        if constraint.constraint_type() == 'CHECK' and len(constraint.columns) == 1:
                            column.constraint_definition = constraint.constraint_definition