Skip to content

API docs: reference documentation for classes and functions of datarepo project.

Databases catalog module

Catalog

A catalog that manages multiple databases and provides access to their tables.

Source code in src/datarepo/core/catalog/catalog.py
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
class Catalog:
    """A catalog that manages multiple databases and provides access to their tables."""

    def __init__(self, dbs: dict[str, Database]):
        """Initialize the Catalog.

        Args:
            dbs (dict[str, Database]): A dictionary of database names and their corresponding Database objects.
        """
        self._dbs = dbs
        self._global_args: dict[str, Any] | None = None

    def set_global_args(self, global_args: dict[str, Any]) -> None:
        """Set global arguments for all database queries.

        Args:
            global_args (dict[str, Any]): A dictionary of global arguments to apply to all database queries.
        """
        self._global_args = global_args

    def db(self, db_name: str) -> Database:
        """Get a database from the catalog.

        Args:
            db_name (str): The name of the database.

        Raises:
            KeyError: If the database is not found.

        Returns:
            Database: The requested database.
        """
        db = self._dbs.get(db_name)

        if db is None:
            raise KeyError(
                f"Database '{db_name}' not found. Available databases: {self.dbs()}"
            )

        if self._global_args is None:
            return db

        return DatabaseWithGlobalArgs(db, self._global_args)

    def dbs(self) -> list[str]:
        """Get a list of database names in the catalog.

        Returns:
            list[str]: A list of database names.
        """
        return list(self._dbs.keys())

__init__(dbs)

Initialize the Catalog.

Parameters:

Name Type Description Default
dbs dict[str, Database]

A dictionary of database names and their corresponding Database objects.

required
Source code in src/datarepo/core/catalog/catalog.py
188
189
190
191
192
193
194
195
def __init__(self, dbs: dict[str, Database]):
    """Initialize the Catalog.

    Args:
        dbs (dict[str, Database]): A dictionary of database names and their corresponding Database objects.
    """
    self._dbs = dbs
    self._global_args: dict[str, Any] | None = None

db(db_name)

Get a database from the catalog.

Parameters:

Name Type Description Default
db_name str

The name of the database.

required

Raises:

Type Description
KeyError

If the database is not found.

Returns:

Name Type Description
Database Database

The requested database.

Source code in src/datarepo/core/catalog/catalog.py
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
def db(self, db_name: str) -> Database:
    """Get a database from the catalog.

    Args:
        db_name (str): The name of the database.

    Raises:
        KeyError: If the database is not found.

    Returns:
        Database: The requested database.
    """
    db = self._dbs.get(db_name)

    if db is None:
        raise KeyError(
            f"Database '{db_name}' not found. Available databases: {self.dbs()}"
        )

    if self._global_args is None:
        return db

    return DatabaseWithGlobalArgs(db, self._global_args)

dbs()

Get a list of database names in the catalog.

Returns:

Type Description
list[str]

list[str]: A list of database names.

Source code in src/datarepo/core/catalog/catalog.py
229
230
231
232
233
234
235
def dbs(self) -> list[str]:
    """Get a list of database names in the catalog.

    Returns:
        list[str]: A list of database names.
    """
    return list(self._dbs.keys())

set_global_args(global_args)

Set global arguments for all database queries.

Parameters:

Name Type Description Default
global_args dict[str, Any]

A dictionary of global arguments to apply to all database queries.

required
Source code in src/datarepo/core/catalog/catalog.py
197
198
199
200
201
202
203
def set_global_args(self, global_args: dict[str, Any]) -> None:
    """Set global arguments for all database queries.

    Args:
        global_args (dict[str, Any]): A dictionary of global arguments to apply to all database queries.
    """
    self._global_args = global_args

Database

Bases: Protocol

A protocol for a database that provides access to tables.

Source code in src/datarepo/core/catalog/catalog.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
class Database(Protocol):
    """A protocol for a database that provides access to tables."""

    def get_tables(self, show_deprecated: bool = False) -> dict[str, TableProtocol]:
        """Get a dictionary of tables in the database.

        Args:
            show_deprecated (bool, optional): Whether to include deprecated tables. Defaults to False.
        """
        ...

    def tables(self, show_deprecated: bool = False) -> list[str]:
        """Get a list of table names in the database.

        Args:
            show_deprecated (bool, optional): Whether to include deprecated tables. Defaults to False.

        Returns:
            list[str]: A list of table names.
        """
        return list(self.get_tables(show_deprecated).keys())

    def table(self, name: str, *args: Any, **kwargs: Any) -> NlkDataFrame:
        """Get a table from the database.

        Args:
            name (str): The name of the table.

        Returns:
            NlkDataFrame: The requested table.
        """
        ...

get_tables(show_deprecated=False)

Get a dictionary of tables in the database.

Parameters:

Name Type Description Default
show_deprecated bool

Whether to include deprecated tables. Defaults to False.

False
Source code in src/datarepo/core/catalog/catalog.py
12
13
14
15
16
17
18
def get_tables(self, show_deprecated: bool = False) -> dict[str, TableProtocol]:
    """Get a dictionary of tables in the database.

    Args:
        show_deprecated (bool, optional): Whether to include deprecated tables. Defaults to False.
    """
    ...

table(name, *args, **kwargs)

Get a table from the database.

Parameters:

Name Type Description Default
name str

The name of the table.

required

Returns:

Name Type Description
NlkDataFrame NlkDataFrame

The requested table.

Source code in src/datarepo/core/catalog/catalog.py
31
32
33
34
35
36
37
38
39
40
def table(self, name: str, *args: Any, **kwargs: Any) -> NlkDataFrame:
    """Get a table from the database.

    Args:
        name (str): The name of the table.

    Returns:
        NlkDataFrame: The requested table.
    """
    ...

tables(show_deprecated=False)

Get a list of table names in the database.

Parameters:

Name Type Description Default
show_deprecated bool

Whether to include deprecated tables. Defaults to False.

False

Returns:

Type Description
list[str]

list[str]: A list of table names.

Source code in src/datarepo/core/catalog/catalog.py
20
21
22
23
24
25
26
27
28
29
def tables(self, show_deprecated: bool = False) -> list[str]:
    """Get a list of table names in the database.

    Args:
        show_deprecated (bool, optional): Whether to include deprecated tables. Defaults to False.

    Returns:
        list[str]: A list of table names.
    """
    return list(self.get_tables(show_deprecated).keys())

ModuleDatabase

Bases: Database

A database that is implemented as a Python module.

Source code in src/datarepo/core/catalog/catalog.py
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
class ModuleDatabase(Database):
    """A database that is implemented as a Python module."""

    def __init__(self, db: ModuleType) -> None:
        """Initialize the ModuleDatabase.

        Example usage:
            ``` py
            import my_database_module
            db = ModuleDatabase(my_database_module)
            ```

        Args:
            db (ModuleType): The database module.
        """
        self.db = db

    def __getattr__(self, name: str):
        # HACK: to maintain backwards compatibility when accessing module attributes
        return self._get_table(name)

    def get_tables(self, show_deprecated: bool = False) -> dict[str, TableProtocol]:
        """Get a dictionary of tables in the database.

        Example usage:
            ``` py
            db = ModuleDatabase(my_database_module)
            tables = db.get_tables(show_deprecated=True)
            ```

        Args:
            show_deprecated (bool, optional): Whether to include deprecated tables. Defaults to False.

        Returns:
            dict[str, TableProtocol]: A dictionary of table names and their corresponding TableProtocol objects.
        """
        methods = dir(self.db)

        tables = {}
        for name in methods:
            table = self._get_table(name)
            if table is None:
                continue

            if table.table_metadata.is_deprecated and not show_deprecated:
                continue

            tables[name] = table

        return tables

    def table(self, name: str, *args: Any, **kwargs: Any) -> NlkDataFrame:
        """Get a table from the database.

        Example usage:
            ``` py
            db = ModuleDatabase(my_database_module)
            table = db.table("my_table")
            ```

        Args:
            name (str): The name of the table.

        Raises:
            KeyError: If the table is not found.

        Returns:
            NlkDataFrame: The requested table.
        """
        tbl = self._get_table(name)
        if tbl is None:
            raise KeyError(f"Table '{name}' not found in database")

        if tbl.table_metadata.is_deprecated:
            warnings.warn(f"The table '{name}' is deprecated", DeprecationWarning)

        return tbl(*args, **kwargs)

    def _get_table(self, name: str) -> TableProtocol | None:
        """Get a table from the database.

        Args:
            name (str): The name of the table.

        Returns:
            TableProtocol | None: The requested table or None if not found.
        """
        table = getattr(self.db, name)
        if not hasattr(table, "table_metadata"):
            return None

        return cast(TableProtocol, table)

__init__(db)

Initialize the ModuleDatabase.

Example usage
import my_database_module
db = ModuleDatabase(my_database_module)

Parameters:

Name Type Description Default
db ModuleType

The database module.

required
Source code in src/datarepo/core/catalog/catalog.py
46
47
48
49
50
51
52
53
54
55
56
57
58
def __init__(self, db: ModuleType) -> None:
    """Initialize the ModuleDatabase.

    Example usage:
        ``` py
        import my_database_module
        db = ModuleDatabase(my_database_module)
        ```

    Args:
        db (ModuleType): The database module.
    """
    self.db = db

get_tables(show_deprecated=False)

Get a dictionary of tables in the database.

Example usage
db = ModuleDatabase(my_database_module)
tables = db.get_tables(show_deprecated=True)

Parameters:

Name Type Description Default
show_deprecated bool

Whether to include deprecated tables. Defaults to False.

False

Returns:

Type Description
dict[str, TableProtocol]

dict[str, TableProtocol]: A dictionary of table names and their corresponding TableProtocol objects.

Source code in src/datarepo/core/catalog/catalog.py
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
def get_tables(self, show_deprecated: bool = False) -> dict[str, TableProtocol]:
    """Get a dictionary of tables in the database.

    Example usage:
        ``` py
        db = ModuleDatabase(my_database_module)
        tables = db.get_tables(show_deprecated=True)
        ```

    Args:
        show_deprecated (bool, optional): Whether to include deprecated tables. Defaults to False.

    Returns:
        dict[str, TableProtocol]: A dictionary of table names and their corresponding TableProtocol objects.
    """
    methods = dir(self.db)

    tables = {}
    for name in methods:
        table = self._get_table(name)
        if table is None:
            continue

        if table.table_metadata.is_deprecated and not show_deprecated:
            continue

        tables[name] = table

    return tables

table(name, *args, **kwargs)

Get a table from the database.

Example usage
db = ModuleDatabase(my_database_module)
table = db.table("my_table")

Parameters:

Name Type Description Default
name str

The name of the table.

required

Raises:

Type Description
KeyError

If the table is not found.

Returns:

Name Type Description
NlkDataFrame NlkDataFrame

The requested table.

Source code in src/datarepo/core/catalog/catalog.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
def table(self, name: str, *args: Any, **kwargs: Any) -> NlkDataFrame:
    """Get a table from the database.

    Example usage:
        ``` py
        db = ModuleDatabase(my_database_module)
        table = db.table("my_table")
        ```

    Args:
        name (str): The name of the table.

    Raises:
        KeyError: If the table is not found.

    Returns:
        NlkDataFrame: The requested table.
    """
    tbl = self._get_table(name)
    if tbl is None:
        raise KeyError(f"Table '{name}' not found in database")

    if tbl.table_metadata.is_deprecated:
        warnings.warn(f"The table '{name}' is deprecated", DeprecationWarning)

    return tbl(*args, **kwargs)

Dataframes module

Tables module

DeltaCacheOptions dataclass

Source code in src/datarepo/core/tables/deltalake_table.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
@dataclass
class DeltaCacheOptions:
    # Path to the directory where files are cached. This can be the same for all tables
    # since the S3 table prefix is included in the cached paths.
    file_cache_path: str
    # Duration for which the _last_checkpoint file is cached. A longer duration means
    # we can reuse cached checkpoints for longer, which could improve loading performance
    # from not downloading new checkpoint parquets. However, we also need to load more
    # individual transaction jsons, so this should not be set too high.
    # A reasonable value for a table with frequent updates (e.g. binned spikes) is 30m.
    file_cache_last_checkpoint_valid_duration: str | None = None

    def to_storage_options(self) -> dict[str, Any]:
        """Convert the cache options to a dictionary of storage options.

        Returns:
            dict[str, Any]: A dictionary of storage options that can be used with DeltaTable.
        """
        opts = {
            "file_cache_path": os.path.expanduser(self.file_cache_path),
        }
        if self.file_cache_last_checkpoint_valid_duration is not None:
            opts["file_cache_last_checkpoint_valid_duration"] = (
                self.file_cache_last_checkpoint_valid_duration
            )
        return opts

to_storage_options()

Convert the cache options to a dictionary of storage options.

Returns:

Type Description
dict[str, Any]

dict[str, Any]: A dictionary of storage options that can be used with DeltaTable.

Source code in src/datarepo/core/tables/deltalake_table.py
49
50
51
52
53
54
55
56
57
58
59
60
61
62
def to_storage_options(self) -> dict[str, Any]:
    """Convert the cache options to a dictionary of storage options.

    Returns:
        dict[str, Any]: A dictionary of storage options that can be used with DeltaTable.
    """
    opts = {
        "file_cache_path": os.path.expanduser(self.file_cache_path),
    }
    if self.file_cache_last_checkpoint_valid_duration is not None:
        opts["file_cache_last_checkpoint_valid_duration"] = (
            self.file_cache_last_checkpoint_valid_duration
        )
    return opts

DeltalakeTable

Bases: TableProtocol

A table that is backed by a Delta Lake table.

Source code in src/datarepo/core/tables/deltalake_table.py
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
class DeltalakeTable(TableProtocol):
    """A table that is backed by a Delta Lake table."""

    def __init__(
        self,
        name: str,
        uri: str,
        schema: pa.Schema,
        description: str = "",
        docs_filters: list[Filter] = [],
        docs_columns: list[str] | None = None,
        roapi_opts: RoapiOptions | None = None,
        unique_columns: list[str] | None = None,
        table_metadata_args: dict[str, Any] | None = None,
        stats_cols: list[str] | None = None,
        extra_cols: list[tuple[pl.Expr, str]] | None = None,
    ):
        """Initialize the DeltalakeTable.

        Args:
            name (str): table name, used as the table identifier in the DeltaTable
            uri (str): uri of the table, e.g. "s3://bucket/path/to/table"
            schema (pa.Schema): schema of the table, used to define the table structure
            description (str, optional): description of the table, used for documentation. Defaults to "".
            docs_filters (list[Filter], optional): documentation filters, used to filter the table in the documentation. Defaults to [].
            docs_columns (list[str] | None, optional): documentation columns, used to define the columns in the documentation. Defaults to None.
            roapi_opts (RoapiOptions | None, optional): ROAPI options, used to configure the ROAPI for the table. Defaults to None.
            unique_columns (list[str] | None, optional): unique columns in the table, used to optimize the read performance. Defaults to None.
            table_metadata_args (dict[str, Any] | None, optional): table metadata arguments, used to configure the table metadata. Defaults to None.
            stats_cols (list[str] | None, optional): statistics columns, used to define the columns that have statistics. Defaults to None.
            extra_cols (list[tuple[pl.Expr, str]] | None, optional): extra columns to add to the table, where each tuple contains a Polars expression and its type annotation. Defaults to None.
        """
        self.name = name
        self.uri = uri
        self.schema = schema
        self.unique_columns = unique_columns
        self.stats_cols = stats_cols or []
        self.extra_cols = extra_cols or []

        self.table_metadata = TableMetadata(
            table_type="DELTA_LAKE",
            description=description,
            docs_args={"filters": docs_filters, "columns": docs_columns},
            roapi_opts=roapi_opts or DeltaRoapiOptions(),
            **(table_metadata_args or {}),
        )

    def get_schema(self) -> TableSchema:
        """Generate and return the schema of the table, including partitions and columns.

        Returns:
            TableSchema: table schema containing partition and column information.
        """
        dt = self.delta_table()
        schema = self.schema
        partition_cols = dt.metadata().partition_columns
        filters = {
            f.column: f.value
            for f in self.table_metadata.docs_args.get("filters", [])
            if isinstance(f, Filter)
        }
        partitions = [
            {
                "column_name": col,
                "type_annotation": str(schema.field(col).type),
                "value": filters.get(col),
            }
            for col in partition_cols
        ]
        columns = [
            {
                "name": name,
                "type": str(schema.field(name).type),
                "has_stats": name in partition_cols or name in self.stats_cols,
            }
            for name in schema.names
        ]
        columns += [
            {
                "name": expr.meta.output_name(),
                "type": expr_type,
                "readonly": True,
            }
            for expr, expr_type in self.extra_cols
        ]
        return TableSchema(
            partitions=partitions,
            columns=columns,
        )

    def __call__(
        self,
        filters: DeltaInputFilters | None = None,
        columns: list[str] | None = None,
        boto3_session: boto3.Session | None = None,
        endpoint_url: str | None = None,
        timeout: str | None = None,
        cache_options: DeltaCacheOptions | None = None,
        **kwargs: Any,
    ) -> NlkDataFrame:
        """Fetch a dataframe from the Delta Lake table.

        Args:
            filters (DeltaInputFilters | None, optional): filters to apply to the table. Defaults to None.
            columns (list[str] | None, optional): columns to select from the table. Defaults to None.
            boto3_session (boto3.Session | None, optional): boto3 session to use for S3 access. Defaults to None.
            endpoint_url (str | None, optional): endpoint URL for S3 access. Defaults to None.
            timeout (str | None, optional): timeout for S3 access. Defaults to None.
            cache_options (DeltaCacheOptions | None, optional): cache options for the Delta Lake table. Defaults to None.

        Returns:
            NlkDataFrame: a dataframe containing the data from the Delta Lake table, filtered and selected according to the provided parameters.
        """
        storage_options = {
            "timeout": timeout or DEFAULT_TIMEOUT,
            **get_storage_options(
                boto3_session=boto3_session, endpoint_url=endpoint_url
            ),
        }
        if cache_options is not None:
            storage_options = {
                **storage_options,
                **cache_options.to_storage_options(),
            }
        dt = self.delta_table(storage_options=storage_options)

        return self.construct_df(dt=dt, filters=filters, columns=columns)

    def construct_df(
        self,
        dt: DeltaTable,
        filters: DeltaInputFilters | None = None,
        columns: list[str] | None = None,
    ) -> NlkDataFrame:
        """Construct a dataframe from the Delta Lake table.

        Args:
            dt (DeltaTable): The DeltaTable object representing the Delta Lake table.
            filters (DeltaInputFilters | None, optional): filters to apply to the table. Defaults to None.
            columns (list[str] | None, optional): columns to select from the table. Defaults to None.

        Returns:
            NlkDataFrame: a dataframe containing the data from the Delta Lake table, filtered and selected according to the provided parameters.
        """
        # Use schema defined on this table, the physical schema in deltalake metadata might be different
        schema = self.schema

        predicate_str = datafusion_predicate_from_filters(schema, filters)

        # These should not be read because they don't exist in the delta table
        extra_col_exprs = [expr for expr, _ in self.extra_cols]
        extra_column_names = set(expr.meta.output_name() for expr in extra_col_exprs)

        columns_to_read = None
        unique_column_names = set(self.unique_columns or [])
        if columns:
            columns_to_read = list(
                (set(columns) | unique_column_names) - extra_column_names
            )

        # TODO(peter): consider a sql builder for more complex queries?
        select_cols = (
            ", ".join([f'"{col}"' for col in columns_to_read])
            if columns_to_read
            else "*"
        )
        condition = f"WHERE {predicate_str}" if predicate_str else ""
        query_string = f"""
            SELECT {select_cols}
            FROM "{self.name}"
            {condition}
        """
        with warnings.catch_warnings():
            # Ignore ExperimentalWarning emitted from QueryBuilder
            warnings.filterwarnings("ignore", category=ExperimentalWarning)
            batches = (
                QueryBuilder().register(self.name, dt).execute(query_string).fetchall()
            )

        # Since we might cast unique string columns to categoricals, use a string cache to
        # improve performance when combining multiple dataframes
        with pl.StringCache():
            if batches:
                frame = pl.from_arrow(batches, rechunk=False)
                frame = _normalize_df(frame, self.schema, columns=columns_to_read)
            else:
                # If dataset is empty, the returned dataframe will have no columns
                frame = _empty_normalized_df(schema)

            if self.extra_cols:
                frame = frame.with_columns(extra_col_exprs)

            if self.unique_columns:
                # Cast unique string columns to categoricals first
                # In some cases, this reduces peak memory usage up to 50%
                curr_schema = frame.schema
                cat_schema = {
                    col: pl.Categorical
                    for col in curr_schema
                    if curr_schema[col] == pl.String
                }
                frame = (
                    frame.cast(cat_schema)
                    .unique(subset=self.unique_columns, maintain_order=True)
                    .cast(curr_schema)
                )

        if columns:
            frame = frame.select(columns)

        return frame.lazy()

    def delta_table(
        self, storage_options: dict[str, Any] | None = None, version: int | None = None
    ) -> DeltaTable:
        """Get the DeltaTable object for this table.

        Args:
            storage_options (dict[str, Any] | None, optional): Storage options for the DeltaTable, such as S3 access credentials. Defaults to None.
            version (int | None, optional): Version of the Delta table to read. If None, the latest version is used. Defaults to None.

        Returns:
            DeltaTable: The DeltaTable object representing the Delta Lake table.
        """
        return DeltaTable(
            table_uri=self.uri, storage_options=storage_options, version=version
        )

__call__(filters=None, columns=None, boto3_session=None, endpoint_url=None, timeout=None, cache_options=None, **kwargs)

Fetch a dataframe from the Delta Lake table.

Parameters:

Name Type Description Default
filters DeltaInputFilters | None

filters to apply to the table. Defaults to None.

None
columns list[str] | None

columns to select from the table. Defaults to None.

None
boto3_session Session | None

boto3 session to use for S3 access. Defaults to None.

None
endpoint_url str | None

endpoint URL for S3 access. Defaults to None.

None
timeout str | None

timeout for S3 access. Defaults to None.

None
cache_options DeltaCacheOptions | None

cache options for the Delta Lake table. Defaults to None.

None

Returns:

Name Type Description
NlkDataFrame NlkDataFrame

a dataframe containing the data from the Delta Lake table, filtered and selected according to the provided parameters.

Source code in src/datarepo/core/tables/deltalake_table.py
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
def __call__(
    self,
    filters: DeltaInputFilters | None = None,
    columns: list[str] | None = None,
    boto3_session: boto3.Session | None = None,
    endpoint_url: str | None = None,
    timeout: str | None = None,
    cache_options: DeltaCacheOptions | None = None,
    **kwargs: Any,
) -> NlkDataFrame:
    """Fetch a dataframe from the Delta Lake table.

    Args:
        filters (DeltaInputFilters | None, optional): filters to apply to the table. Defaults to None.
        columns (list[str] | None, optional): columns to select from the table. Defaults to None.
        boto3_session (boto3.Session | None, optional): boto3 session to use for S3 access. Defaults to None.
        endpoint_url (str | None, optional): endpoint URL for S3 access. Defaults to None.
        timeout (str | None, optional): timeout for S3 access. Defaults to None.
        cache_options (DeltaCacheOptions | None, optional): cache options for the Delta Lake table. Defaults to None.

    Returns:
        NlkDataFrame: a dataframe containing the data from the Delta Lake table, filtered and selected according to the provided parameters.
    """
    storage_options = {
        "timeout": timeout or DEFAULT_TIMEOUT,
        **get_storage_options(
            boto3_session=boto3_session, endpoint_url=endpoint_url
        ),
    }
    if cache_options is not None:
        storage_options = {
            **storage_options,
            **cache_options.to_storage_options(),
        }
    dt = self.delta_table(storage_options=storage_options)

    return self.construct_df(dt=dt, filters=filters, columns=columns)

__init__(name, uri, schema, description='', docs_filters=[], docs_columns=None, roapi_opts=None, unique_columns=None, table_metadata_args=None, stats_cols=None, extra_cols=None)

Initialize the DeltalakeTable.

Parameters:

Name Type Description Default
name str

table name, used as the table identifier in the DeltaTable

required
uri str

uri of the table, e.g. "s3://bucket/path/to/table"

required
schema Schema

schema of the table, used to define the table structure

required
description str

description of the table, used for documentation. Defaults to "".

''
docs_filters list[Filter]

documentation filters, used to filter the table in the documentation. Defaults to [].

[]
docs_columns list[str] | None

documentation columns, used to define the columns in the documentation. Defaults to None.

None
roapi_opts RoapiOptions | None

ROAPI options, used to configure the ROAPI for the table. Defaults to None.

None
unique_columns list[str] | None

unique columns in the table, used to optimize the read performance. Defaults to None.

None
table_metadata_args dict[str, Any] | None

table metadata arguments, used to configure the table metadata. Defaults to None.

None
stats_cols list[str] | None

statistics columns, used to define the columns that have statistics. Defaults to None.

None
extra_cols list[tuple[Expr, str]] | None

extra columns to add to the table, where each tuple contains a Polars expression and its type annotation. Defaults to None.

None
Source code in src/datarepo/core/tables/deltalake_table.py
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
def __init__(
    self,
    name: str,
    uri: str,
    schema: pa.Schema,
    description: str = "",
    docs_filters: list[Filter] = [],
    docs_columns: list[str] | None = None,
    roapi_opts: RoapiOptions | None = None,
    unique_columns: list[str] | None = None,
    table_metadata_args: dict[str, Any] | None = None,
    stats_cols: list[str] | None = None,
    extra_cols: list[tuple[pl.Expr, str]] | None = None,
):
    """Initialize the DeltalakeTable.

    Args:
        name (str): table name, used as the table identifier in the DeltaTable
        uri (str): uri of the table, e.g. "s3://bucket/path/to/table"
        schema (pa.Schema): schema of the table, used to define the table structure
        description (str, optional): description of the table, used for documentation. Defaults to "".
        docs_filters (list[Filter], optional): documentation filters, used to filter the table in the documentation. Defaults to [].
        docs_columns (list[str] | None, optional): documentation columns, used to define the columns in the documentation. Defaults to None.
        roapi_opts (RoapiOptions | None, optional): ROAPI options, used to configure the ROAPI for the table. Defaults to None.
        unique_columns (list[str] | None, optional): unique columns in the table, used to optimize the read performance. Defaults to None.
        table_metadata_args (dict[str, Any] | None, optional): table metadata arguments, used to configure the table metadata. Defaults to None.
        stats_cols (list[str] | None, optional): statistics columns, used to define the columns that have statistics. Defaults to None.
        extra_cols (list[tuple[pl.Expr, str]] | None, optional): extra columns to add to the table, where each tuple contains a Polars expression and its type annotation. Defaults to None.
    """
    self.name = name
    self.uri = uri
    self.schema = schema
    self.unique_columns = unique_columns
    self.stats_cols = stats_cols or []
    self.extra_cols = extra_cols or []

    self.table_metadata = TableMetadata(
        table_type="DELTA_LAKE",
        description=description,
        docs_args={"filters": docs_filters, "columns": docs_columns},
        roapi_opts=roapi_opts or DeltaRoapiOptions(),
        **(table_metadata_args or {}),
    )

construct_df(dt, filters=None, columns=None)

Construct a dataframe from the Delta Lake table.

Parameters:

Name Type Description Default
dt DeltaTable

The DeltaTable object representing the Delta Lake table.

required
filters DeltaInputFilters | None

filters to apply to the table. Defaults to None.

None
columns list[str] | None

columns to select from the table. Defaults to None.

None

Returns:

Name Type Description
NlkDataFrame NlkDataFrame

a dataframe containing the data from the Delta Lake table, filtered and selected according to the provided parameters.

Source code in src/datarepo/core/tables/deltalake_table.py
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
def construct_df(
    self,
    dt: DeltaTable,
    filters: DeltaInputFilters | None = None,
    columns: list[str] | None = None,
) -> NlkDataFrame:
    """Construct a dataframe from the Delta Lake table.

    Args:
        dt (DeltaTable): The DeltaTable object representing the Delta Lake table.
        filters (DeltaInputFilters | None, optional): filters to apply to the table. Defaults to None.
        columns (list[str] | None, optional): columns to select from the table. Defaults to None.

    Returns:
        NlkDataFrame: a dataframe containing the data from the Delta Lake table, filtered and selected according to the provided parameters.
    """
    # Use schema defined on this table, the physical schema in deltalake metadata might be different
    schema = self.schema

    predicate_str = datafusion_predicate_from_filters(schema, filters)

    # These should not be read because they don't exist in the delta table
    extra_col_exprs = [expr for expr, _ in self.extra_cols]
    extra_column_names = set(expr.meta.output_name() for expr in extra_col_exprs)

    columns_to_read = None
    unique_column_names = set(self.unique_columns or [])
    if columns:
        columns_to_read = list(
            (set(columns) | unique_column_names) - extra_column_names
        )

    # TODO(peter): consider a sql builder for more complex queries?
    select_cols = (
        ", ".join([f'"{col}"' for col in columns_to_read])
        if columns_to_read
        else "*"
    )
    condition = f"WHERE {predicate_str}" if predicate_str else ""
    query_string = f"""
        SELECT {select_cols}
        FROM "{self.name}"
        {condition}
    """
    with warnings.catch_warnings():
        # Ignore ExperimentalWarning emitted from QueryBuilder
        warnings.filterwarnings("ignore", category=ExperimentalWarning)
        batches = (
            QueryBuilder().register(self.name, dt).execute(query_string).fetchall()
        )

    # Since we might cast unique string columns to categoricals, use a string cache to
    # improve performance when combining multiple dataframes
    with pl.StringCache():
        if batches:
            frame = pl.from_arrow(batches, rechunk=False)
            frame = _normalize_df(frame, self.schema, columns=columns_to_read)
        else:
            # If dataset is empty, the returned dataframe will have no columns
            frame = _empty_normalized_df(schema)

        if self.extra_cols:
            frame = frame.with_columns(extra_col_exprs)

        if self.unique_columns:
            # Cast unique string columns to categoricals first
            # In some cases, this reduces peak memory usage up to 50%
            curr_schema = frame.schema
            cat_schema = {
                col: pl.Categorical
                for col in curr_schema
                if curr_schema[col] == pl.String
            }
            frame = (
                frame.cast(cat_schema)
                .unique(subset=self.unique_columns, maintain_order=True)
                .cast(curr_schema)
            )

    if columns:
        frame = frame.select(columns)

    return frame.lazy()

delta_table(storage_options=None, version=None)

Get the DeltaTable object for this table.

Parameters:

Name Type Description Default
storage_options dict[str, Any] | None

Storage options for the DeltaTable, such as S3 access credentials. Defaults to None.

None
version int | None

Version of the Delta table to read. If None, the latest version is used. Defaults to None.

None

Returns:

Name Type Description
DeltaTable DeltaTable

The DeltaTable object representing the Delta Lake table.

Source code in src/datarepo/core/tables/deltalake_table.py
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
def delta_table(
    self, storage_options: dict[str, Any] | None = None, version: int | None = None
) -> DeltaTable:
    """Get the DeltaTable object for this table.

    Args:
        storage_options (dict[str, Any] | None, optional): Storage options for the DeltaTable, such as S3 access credentials. Defaults to None.
        version (int | None, optional): Version of the Delta table to read. If None, the latest version is used. Defaults to None.

    Returns:
        DeltaTable: The DeltaTable object representing the Delta Lake table.
    """
    return DeltaTable(
        table_uri=self.uri, storage_options=storage_options, version=version
    )

get_schema()

Generate and return the schema of the table, including partitions and columns.

Returns:

Name Type Description
TableSchema TableSchema

table schema containing partition and column information.

Source code in src/datarepo/core/tables/deltalake_table.py
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
def get_schema(self) -> TableSchema:
    """Generate and return the schema of the table, including partitions and columns.

    Returns:
        TableSchema: table schema containing partition and column information.
    """
    dt = self.delta_table()
    schema = self.schema
    partition_cols = dt.metadata().partition_columns
    filters = {
        f.column: f.value
        for f in self.table_metadata.docs_args.get("filters", [])
        if isinstance(f, Filter)
    }
    partitions = [
        {
            "column_name": col,
            "type_annotation": str(schema.field(col).type),
            "value": filters.get(col),
        }
        for col in partition_cols
    ]
    columns = [
        {
            "name": name,
            "type": str(schema.field(name).type),
            "has_stats": name in partition_cols or name in self.stats_cols,
        }
        for name in schema.names
    ]
    columns += [
        {
            "name": expr.meta.output_name(),
            "type": expr_type,
            "readonly": True,
        }
        for expr, expr_type in self.extra_cols
    ]
    return TableSchema(
        partitions=partitions,
        columns=columns,
    )

Filter

Bases: NamedTuple

Filter represents a condition to be applied to a column in a table.

Source code in src/datarepo/core/tables/filters.py
20
21
22
23
24
25
class Filter(NamedTuple):
    """Filter represents a condition to be applied to a column in a table."""

    column: str
    operator: FilterOperator
    value: Any

ParquetTable

Bases: TableProtocol

A table that is stored in Parquet format.

Source code in src/datarepo/core/tables/parquet_table.py
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
class ParquetTable(TableProtocol):
    """A table that is stored in Parquet format."""

    def __init__(
        self,
        name: str,
        uri: str,
        partitioning: list[Partition],
        partitioning_scheme: PartitioningScheme = PartitioningScheme.DIRECTORY,
        description: str = "",
        docs_filters: list[Filter] = [],
        docs_columns: list[str] | None = None,
        roapi_opts: RoapiOptions | None = None,
        parquet_file_name: str = "df.parquet",
        table_metadata_args: dict[str, Any] | None = None,
    ):
        """Initialize the ParquetTable.

        Args:
            name (str): name of the table, used for documentation and metadata.
            uri (str): uri of the table, typically an S3 bucket path.
            partitioning (list[Partition]): partitioning scheme for the table.
                This is a list of Partition objects, which define the columns and types used for partitioning
            partitioning_scheme (PartitioningScheme, optional): scheme used for partitioning.
                Defaults to PartitioningScheme.DIRECTORY.
            description (str, optional): description of the table, used for documentation.
                Defaults to "".
            docs_filters (list[Filter], optional): documentation filters for the table.
                These filters are used to generate documentation and are not applied to the data.
                Defaults to [].
            docs_columns (list[str] | None, optional): docsumentation columns for the table.
                These columns are used to generate documentation and are not applied to the data.
                Defaults to None.
            roapi_opts (RoapiOptions | None, optional): Read-only API options for the table.
                These options are used to configure the ROAPI endpoint for the table.
                Defaults to None.
            parquet_file_name (str, optional): parquet file name to use when building file fragments.
            table_metadata_args (dict[str, Any] | None, optional): additional metadata arguments for the table.

        Raises:
            ValueError: if the partitioning_scheme is not a valid PartitioningScheme.
        """
        if not isinstance(partitioning_scheme, PartitioningScheme):
            raise ValueError(f"Invalid partitioning scheme, got {partitioning_scheme}")

        self.name = name
        self.uri = uri
        self.partitioning = partitioning
        self.partitioning_scheme = partitioning_scheme

        self.table_metadata = TableMetadata(
            table_type="PARQUET",
            description=description,
            docs_args={"filters": docs_filters, "columns": docs_columns},
            roapi_opts=roapi_opts,
            **(table_metadata_args or {}),
        )

        self.parquet_file_name = parquet_file_name

    def get_schema(self) -> TableSchema:
        """Generates the schema of the table, including partitions and columns.

        Returns:
            TableSchema: table schema containing partitions and columns.
        """
        partitions = [
            {
                "column_name": filter.column,
                "type_annotation": type(filter.value).__name__,
                "value": filter.value,
            }
            for filter in self.table_metadata.docs_args.get("filters", [])
        ]

        # Pop the selected columns so that we still pull the full schema below
        docs_args = {**self.table_metadata.docs_args}
        docs_args.pop("columns")

        columns = None
        if docs_args or not partitions:
            table: NlkDataFrame = self(**docs_args)
            columns = [
                {
                    "name": key,
                    "type": type.__str__(),
                }
                for key, type in table.schema.items()
            ]

        return TableSchema(partitions=partitions, columns=columns)

    def __call__(
        self,
        filters: InputFilters | None = None,
        columns: Optional[list[str]] = None,
        boto3_session: boto3.Session | None = None,
        endpoint_url: str | None = None,
        **kwargs: Any,
    ) -> NlkDataFrame:
        """Fetches data from the Parquet table based on the provided filters and columns.

        Args:
            filters (InputFilters | None, optional): filters to apply to the data. Defaults to None.
            columns (Optional[list[str]], optional): columns to select from the data. Defaults to None.
            boto3_session (boto3.Session | None, optional): boto3 session to use for S3 access. Defaults to None.
            endpoint_url (str | None, optional): endpoint URL for S3 access. Defaults to None.

        Returns:
            NlkDataFrame: A DataFrame containing the filtered data from the Parquet table.
        """
        normalized_filters = normalize_filters(filters)
        uri, remaining_partitions, remaining_filters, applied_filters = (
            self._build_uri_from_filters(normalized_filters)
        )

        storage_options = get_storage_options(
            boto3_session=boto3_session,
            endpoint_url=endpoint_url,
        )

        df = pl.scan_parquet(
            uri,
            hive_partitioning=len(remaining_partitions) > 0,
            hive_schema={
                partition.column: partition.col_type
                for partition in remaining_partitions
            },
            allow_missing_columns=True,
            storage_options=storage_options,
        )

        if applied_filters:
            # Add columns removed from partitions and added to uri
            df = df.with_columns(
                pl.lit(f.value)
                .cast(
                    next(
                        partition.col_type
                        for partition in self.partitioning
                        if partition.column == f.column
                    )
                )
                .alias(f.column)
                for f in applied_filters
            )

        if remaining_filters:
            filter_expr = _filters_to_expr(remaining_filters)
            if filter_expr is not None:
                df = df.filter(filter_expr)

        if columns:
            df = df.select(columns)

        return df

    def build_file_fragment(self, filters: list[Filter]) -> str:
        """
        Returns a file path from the base table URI with the given filters.
        This will raise an error if the filter does not specify all partitions.

        This is currently used to generate the file path used by ROAPI to infer schemas.
        """
        uri_with_prefix, partitions, _, _ = self._build_uri_from_filters(
            normalize_filters(filters), include_base_uri=False
        )
        if len(partitions) > 0:
            partition_names = [partition.column for partition in partitions]
            raise ValueError(
                f"Not enough partitions specified, missing: {partition_names}"
            )

        return path.join(uri_with_prefix, self.parquet_file_name)

    def _build_uri_from_filters(
        self,
        filters: NormalizedFilters,
        include_base_uri: bool = True,
    ) -> tuple[str, list[Partition], NormalizedFilters, list[Filter]]:
        """Attempts to build an S3 list prefix from the given filters.
        We do this because pyarrow runs an S3 List() query on the base URI
        before applying filters to possible files. This can be slow.

        We can force pyarrow to do more sophisticated prefix filtering
        if we pre-construct the URI before making the call. If a partition
        has exactly one filter that uses strict equality, we know that
        it will contain that filter string in the URI. We attempt to do this
        for each partition filter, in order, until we encounter a
        partition that:
            1. does not have a filter, or
            2. has a more than one filters, or
            3. has filters that are not strict equality checks.

        This gives us the longest URL that can be used as a prefix filter for
        all files returned by the S3 List() call.

        In a benchmark, this brought reading ~1 million rows of binned spike files
        from 12s to 1.5s.

        In the future, we can further optimize this by building a list of
        candidate URIs and doing separate prefix filtering with each of those, in parallel.

        Long-term we should push this logic into pyarrow and make an upstream commit.

        NOTE: Add trailing slash - this is important to ensure that,
        when partitioning only by implant ID, a future 5-digit implant beginning with the same 4 digits
        as a 4-digit implant is not included in a 4-digit implant's query.
        """
        uri = self.uri if include_base_uri else ""

        if not filters or not self.partitioning:
            return uri, self.partitioning, filters, []

        partitions = list(self.partitioning)
        filters = [list(filter_set) for filter_set in filters]
        applied_filters = []

        for i, partition in enumerate(self.partitioning):
            partition_filters = [
                exactly_one_equality_filter(partition, f) for f in filters
            ]

            # either 0 or multiple filters for this partition,
            # break and deal with the s3 list() query
            if any(partition_filter is None for partition_filter in partition_filters):
                break

            # Only move forward if all remaining filter sets have the same partition filter
            if not all(
                partition_filter == partition_filters[0]
                for partition_filter in partition_filters
            ):
                break

            partition_filter = partition_filters[0]

            if self.partitioning_scheme == PartitioningScheme.DIRECTORY:
                partition_component = str(partition_filter.value)
            elif self.partitioning_scheme == PartitioningScheme.HIVE:
                partition_component = (
                    partition.column + "=" + str(partition_filter.value)
                )

            uri = path.join(uri, partition_component)

            # remove the partition and filter since it's been applied
            # technically might not need to remove the partitions,
            # but they are semantically meaningless as we already have
            # constructed the URI, so we can pop them
            partitions.remove(partition)
            for filter_set in filters:
                filter_set.remove(partition_filter)

            applied_filters.append(partition_filter)

        uri = path.join(
            uri, ""
        )  # trailing slash prevents inclusion of partitions that are subsets of other partitions

        return (uri, partitions, filters, applied_filters)

__call__(filters=None, columns=None, boto3_session=None, endpoint_url=None, **kwargs)

Fetches data from the Parquet table based on the provided filters and columns.

Parameters:

Name Type Description Default
filters InputFilters | None

filters to apply to the data. Defaults to None.

None
columns Optional[list[str]]

columns to select from the data. Defaults to None.

None
boto3_session Session | None

boto3 session to use for S3 access. Defaults to None.

None
endpoint_url str | None

endpoint URL for S3 access. Defaults to None.

None

Returns:

Name Type Description
NlkDataFrame NlkDataFrame

A DataFrame containing the filtered data from the Parquet table.

Source code in src/datarepo/core/tables/parquet_table.py
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
def __call__(
    self,
    filters: InputFilters | None = None,
    columns: Optional[list[str]] = None,
    boto3_session: boto3.Session | None = None,
    endpoint_url: str | None = None,
    **kwargs: Any,
) -> NlkDataFrame:
    """Fetches data from the Parquet table based on the provided filters and columns.

    Args:
        filters (InputFilters | None, optional): filters to apply to the data. Defaults to None.
        columns (Optional[list[str]], optional): columns to select from the data. Defaults to None.
        boto3_session (boto3.Session | None, optional): boto3 session to use for S3 access. Defaults to None.
        endpoint_url (str | None, optional): endpoint URL for S3 access. Defaults to None.

    Returns:
        NlkDataFrame: A DataFrame containing the filtered data from the Parquet table.
    """
    normalized_filters = normalize_filters(filters)
    uri, remaining_partitions, remaining_filters, applied_filters = (
        self._build_uri_from_filters(normalized_filters)
    )

    storage_options = get_storage_options(
        boto3_session=boto3_session,
        endpoint_url=endpoint_url,
    )

    df = pl.scan_parquet(
        uri,
        hive_partitioning=len(remaining_partitions) > 0,
        hive_schema={
            partition.column: partition.col_type
            for partition in remaining_partitions
        },
        allow_missing_columns=True,
        storage_options=storage_options,
    )

    if applied_filters:
        # Add columns removed from partitions and added to uri
        df = df.with_columns(
            pl.lit(f.value)
            .cast(
                next(
                    partition.col_type
                    for partition in self.partitioning
                    if partition.column == f.column
                )
            )
            .alias(f.column)
            for f in applied_filters
        )

    if remaining_filters:
        filter_expr = _filters_to_expr(remaining_filters)
        if filter_expr is not None:
            df = df.filter(filter_expr)

    if columns:
        df = df.select(columns)

    return df

__init__(name, uri, partitioning, partitioning_scheme=PartitioningScheme.DIRECTORY, description='', docs_filters=[], docs_columns=None, roapi_opts=None, parquet_file_name='df.parquet', table_metadata_args=None)

Initialize the ParquetTable.

Parameters:

Name Type Description Default
name str

name of the table, used for documentation and metadata.

required
uri str

uri of the table, typically an S3 bucket path.

required
partitioning list[Partition]

partitioning scheme for the table. This is a list of Partition objects, which define the columns and types used for partitioning

required
partitioning_scheme PartitioningScheme

scheme used for partitioning. Defaults to PartitioningScheme.DIRECTORY.

DIRECTORY
description str

description of the table, used for documentation. Defaults to "".

''
docs_filters list[Filter]

documentation filters for the table. These filters are used to generate documentation and are not applied to the data. Defaults to [].

[]
docs_columns list[str] | None

docsumentation columns for the table. These columns are used to generate documentation and are not applied to the data. Defaults to None.

None
roapi_opts RoapiOptions | None

Read-only API options for the table. These options are used to configure the ROAPI endpoint for the table. Defaults to None.

None
parquet_file_name str

parquet file name to use when building file fragments.

'df.parquet'
table_metadata_args dict[str, Any] | None

additional metadata arguments for the table.

None

Raises:

Type Description
ValueError

if the partitioning_scheme is not a valid PartitioningScheme.

Source code in src/datarepo/core/tables/parquet_table.py
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
def __init__(
    self,
    name: str,
    uri: str,
    partitioning: list[Partition],
    partitioning_scheme: PartitioningScheme = PartitioningScheme.DIRECTORY,
    description: str = "",
    docs_filters: list[Filter] = [],
    docs_columns: list[str] | None = None,
    roapi_opts: RoapiOptions | None = None,
    parquet_file_name: str = "df.parquet",
    table_metadata_args: dict[str, Any] | None = None,
):
    """Initialize the ParquetTable.

    Args:
        name (str): name of the table, used for documentation and metadata.
        uri (str): uri of the table, typically an S3 bucket path.
        partitioning (list[Partition]): partitioning scheme for the table.
            This is a list of Partition objects, which define the columns and types used for partitioning
        partitioning_scheme (PartitioningScheme, optional): scheme used for partitioning.
            Defaults to PartitioningScheme.DIRECTORY.
        description (str, optional): description of the table, used for documentation.
            Defaults to "".
        docs_filters (list[Filter], optional): documentation filters for the table.
            These filters are used to generate documentation and are not applied to the data.
            Defaults to [].
        docs_columns (list[str] | None, optional): docsumentation columns for the table.
            These columns are used to generate documentation and are not applied to the data.
            Defaults to None.
        roapi_opts (RoapiOptions | None, optional): Read-only API options for the table.
            These options are used to configure the ROAPI endpoint for the table.
            Defaults to None.
        parquet_file_name (str, optional): parquet file name to use when building file fragments.
        table_metadata_args (dict[str, Any] | None, optional): additional metadata arguments for the table.

    Raises:
        ValueError: if the partitioning_scheme is not a valid PartitioningScheme.
    """
    if not isinstance(partitioning_scheme, PartitioningScheme):
        raise ValueError(f"Invalid partitioning scheme, got {partitioning_scheme}")

    self.name = name
    self.uri = uri
    self.partitioning = partitioning
    self.partitioning_scheme = partitioning_scheme

    self.table_metadata = TableMetadata(
        table_type="PARQUET",
        description=description,
        docs_args={"filters": docs_filters, "columns": docs_columns},
        roapi_opts=roapi_opts,
        **(table_metadata_args or {}),
    )

    self.parquet_file_name = parquet_file_name

build_file_fragment(filters)

Returns a file path from the base table URI with the given filters. This will raise an error if the filter does not specify all partitions.

This is currently used to generate the file path used by ROAPI to infer schemas.

Source code in src/datarepo/core/tables/parquet_table.py
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
def build_file_fragment(self, filters: list[Filter]) -> str:
    """
    Returns a file path from the base table URI with the given filters.
    This will raise an error if the filter does not specify all partitions.

    This is currently used to generate the file path used by ROAPI to infer schemas.
    """
    uri_with_prefix, partitions, _, _ = self._build_uri_from_filters(
        normalize_filters(filters), include_base_uri=False
    )
    if len(partitions) > 0:
        partition_names = [partition.column for partition in partitions]
        raise ValueError(
            f"Not enough partitions specified, missing: {partition_names}"
        )

    return path.join(uri_with_prefix, self.parquet_file_name)

get_schema()

Generates the schema of the table, including partitions and columns.

Returns:

Name Type Description
TableSchema TableSchema

table schema containing partitions and columns.

Source code in src/datarepo/core/tables/parquet_table.py
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
def get_schema(self) -> TableSchema:
    """Generates the schema of the table, including partitions and columns.

    Returns:
        TableSchema: table schema containing partitions and columns.
    """
    partitions = [
        {
            "column_name": filter.column,
            "type_annotation": type(filter.value).__name__,
            "value": filter.value,
        }
        for filter in self.table_metadata.docs_args.get("filters", [])
    ]

    # Pop the selected columns so that we still pull the full schema below
    docs_args = {**self.table_metadata.docs_args}
    docs_args.pop("columns")

    columns = None
    if docs_args or not partitions:
        table: NlkDataFrame = self(**docs_args)
        columns = [
            {
                "name": key,
                "type": type.__str__(),
            }
            for key, type in table.schema.items()
        ]

    return TableSchema(partitions=partitions, columns=columns)

PartitioningScheme

Bases: Enum

Defines the partitioning scheme for the table.

DIRECTORY - e.g. s3://bucket/5956/2024-03-24 HIVE - e.g. s3://bucket/implant_id=5956/date=2024-03-24

Source code in src/datarepo/core/tables/util.py
25
26
27
28
29
30
31
32
33
34
class PartitioningScheme(Enum):
    """
    Defines the partitioning scheme for the table.

    DIRECTORY - e.g. s3://bucket/5956/2024-03-24
    HIVE - e.g. s3://bucket/implant_id=5956/date=2024-03-24
    """

    DIRECTORY = 1
    HIVE = 2

TableMetadata dataclass

Information about a table used for documentation generation.

Source code in src/datarepo/core/tables/metadata.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
@dataclass
class TableMetadata:
    """
    Information about a table used for documentation generation.
    """

    table_type: str
    description: str
    docs_args: Dict[str, Any]
    latency_info: str | None = None
    example_notebook: str | None = None
    data_input: str | None = None
    is_deprecated: bool = False
    roapi_opts: RoapiOptions | None = None

TableProtocol

Bases: Protocol

Source code in src/datarepo/core/tables/metadata.py
46
47
48
49
50
51
52
53
54
55
56
class TableProtocol(Protocol):
    # Properties used to generate the web catalog & roapi config
    table_metadata: TableMetadata

    def __call__(self, **kwargs: Dict[str, Any]) -> NlkDataFrame: ...

    def get_schema(self) -> TableSchema:
        """
        Returns the schema of the table, used to generate the web catalog.
        """
        ...

get_schema()

Returns the schema of the table, used to generate the web catalog.

Source code in src/datarepo/core/tables/metadata.py
52
53
54
55
56
def get_schema(self) -> TableSchema:
    """
    Returns the schema of the table, used to generate the web catalog.
    """
    ...

TableSchema dataclass

TableSchema represents the schema of a table, including partitions and columns.

Source code in src/datarepo/core/tables/metadata.py
38
39
40
41
42
43
@dataclass
class TableSchema:
    """TableSchema represents the schema of a table, including partitions and columns."""

    partitions: list[TablePartition]
    columns: list[TableColumn]

table(*args, **kwargs)

Decorator to define a table using a function.

Example uage
@table(description="This is a sample table.")
def my_table_function(param1, param2):
    # Function logic to create a table
    return NlkDataFrame(...)

Returns:

Type Description
Callable[[U], U] | Callable[[Any], Callable[[U], U]]

Callable[[U], U] | Callable[[Any], Callable[[U], U]]: A decorator that wraps a function to create a table.

Source code in src/datarepo/core/tables/decorator.py
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
def table(*args, **kwargs) -> Callable[[U], U] | Callable[[Any], Callable[[U], U]]:
    """Decorator to define a table using a function.

    Example uage:
        ``` py
        @table(description="This is a sample table.")
        def my_table_function(param1, param2):
            # Function logic to create a table
            return NlkDataFrame(...)
        ```

    Returns:
        Callable[[U], U] | Callable[[Any], Callable[[U], U]]: A decorator that wraps a function to create a table.
    """

    def wrapper(func):
        return FunctionTable(
            table_metadata=TableMetadata(
                table_type="FUNCTION",
                description=func.__doc__.strip() if func.__doc__ else "",
                docs_args=kwargs.get("docs_args", {}),
                latency_info=kwargs.get("latency_info"),
                example_notebook=kwargs.get("example_notebook"),
                data_input=kwargs.get("data_input"),
                is_deprecated=kwargs.get("is_deprecated", False),
            ),
            func=func,
        )

    if len(args) == 0:
        return wrapper
    else:
        return wrapper(args[0])