Redshift
Important Capabilities
Capability | Status | Notes |
---|---|---|
Column-level Lineage | ✅ | Optionally enabled via configuration (mixed or sql_based lineage needs to be enabled) |
Data Profiling | ✅ | Optionally enabled via configuration |
Dataset Usage | ✅ | Enabled by default, can be disabled via configuration include_usage_statistics |
Descriptions | ✅ | Enabled by default |
Detect Deleted Entities | ✅ | Enabled via stateful ingestion |
Domains | ✅ | Supported via the domain config field |
Platform Instance | ✅ | Enabled by default |
Table-Level Lineage | ✅ | Optionally enabled via configuration |
This plugin extracts the following:
- Metadata for databases, schemas, views and tables
- Column types associated with each table
- Table, row, and column statistics via optional SQL profiling
- Table lineage
- Usage statistics
Prerequisites
This source needs to access system tables that require extra permissions. To grant these permissions, please alter your datahub Redshift user the following way:
ALTER USER datahub_user WITH SYSLOG ACCESS UNRESTRICTED;
GRANT SELECT ON pg_catalog.svv_table_info to datahub_user;
GRANT SELECT ON pg_catalog.svl_user_info to datahub_user;
Giving a user unrestricted access to system tables gives the user visibility to data generated by other users. For example, STL_QUERY and STL_QUERYTEXT contain the full text of INSERT, UPDATE, and DELETE statements.
Lineage
There are multiple lineage collector implementations as Redshift does not support table lineage out of the box.
stl_scan_based
The stl_scan based collector uses Redshift's stl_insert and stl_scan system tables to discover lineage between tables. Pros:
- Fast
- Reliable
Cons:
- Does not work with Spectrum/external tables because those scans do not show up in stl_scan table.
- If a table is depending on a view then the view won't be listed as dependency. Instead the table will be connected with the view's dependencies.
sql_based
The sql_based based collector uses Redshift's stl_insert to discover all the insert queries and uses sql parsing to discover the dependencies.
Pros:
- Works with Spectrum tables
- Views are connected properly if a table depends on it
Cons:
- Slow.
- Less reliable as the query parser can fail on certain queries
mixed
Using both collector above and first applying the sql based and then the stl_scan based one.
Pros:
- Works with Spectrum tables
- Views are connected properly if a table depends on it
- A bit more reliable than the sql_based one only
Cons:
- Slow
- May be incorrect at times as the query parser can fail on certain queries
The redshift stl redshift tables which are used for getting data lineage retain at most seven days of log history, and sometimes closer to 2-5 days. This means you cannot extract lineage from queries issued outside that window.
Profiling
Profiling runs sql queries on the redshift cluster to get statistics about the tables. To be able to do that, the user needs to have read access to the tables that should be profiled.
If you don't want to grant read access to the tables you can enable table level profiling which will get table statistics without reading the data.
profiling:
profile_table_level_only: true
CLI based Ingestion
Install the Plugin
pip install 'acryl-datahub[redshift]'
Starter Recipe
Check out the following recipe to get started with ingestion! See below for full configuration options.
For general pointers on writing and running a recipe, see our main recipe guide.
type: redshift
config:
# Coordinates
host_port: example.something.us-west-2.redshift.amazonaws.com:5439
database: DemoDatabase
# Credentials
username: user
password: pass
# Options
options:
# driver_option: some-option
include_table_lineage: true
include_usage_statistics: true
# The following options are only used when include_usage_statistics is true
# it appends the domain after the resdhift username which is extracted from the Redshift audit history
# in the format username@email_domain
email_domain: mydomain.com
profiling:
enabled: true
# Only collect table level profiling information
profile_table_level_only: true
sink:
# sink configs
#------------------------------------------------------------------------------
# Extra options when running Redshift behind a proxy</summary>
# This requires you to have already installed the Microsoft ODBC Driver for SQL Server.
# See https://docs.microsoft.com/en-us/sql/connect/python/pyodbc/step-1-configure-development-environment-for-pyodbc-python-development?view=sql-server-ver15
#------------------------------------------------------------------------------
source:
type: redshift
config:
host_port: my-proxy-hostname:5439
options:
connect_args:
# check all available options here: https://pypi.org/project/redshift-connector/
ssl_insecure: "false" # Specifies if IDP hosts server certificate will be verified
sink:
# sink configs
Config Details
- Options
- Schema
Note that a .
is used to denote nested fields in the YAML recipe.
Field | Description |
---|---|
host_port ✅ string | host URL |
bucket_duration Enum | Size of the time window to aggregate usage stats. Default: DAY |
convert_urns_to_lowercase boolean | Whether to convert dataset urns to lowercase. Default: False |
database string | database Default: dev |
default_schema string | The default schema to use if the sql parser fails to parse the schema with sql_based lineage collector Default: public |
email_domain string | Email domain of your organisation so users can be displayed on UI appropriately. |
enable_stateful_lineage_ingestion boolean | Enable stateful lineage ingestion. This will store lineage window timestamps after successful lineage ingestion. and will not run lineage ingestion for same timestamps in subsequent run. Default: True |
enable_stateful_profiling boolean | Enable stateful profiling. This will store profiling timestamps per dataset after successful profiling. and will not run profiling again in subsequent run if table has not been updated. Default: True |
enable_stateful_usage_ingestion boolean | Enable stateful lineage ingestion. This will store usage window timestamps after successful usage ingestion. and will not run usage ingestion for same timestamps in subsequent run. Default: True |
end_time string(date-time) | Latest date of lineage/usage to consider. Default: Current time in UTC |
extra_client_options object | Default: {} |
extract_column_level_lineage boolean | Whether to extract column level lineage. This config works with rest-sink only. Default: True |
format_sql_queries boolean | Whether to format sql queries Default: False |
include_copy_lineage boolean | Whether lineage should be collected from copy commands Default: True |
include_operational_stats boolean | Whether to display operational stats. Default: True |
include_read_operational_stats boolean | Whether to report read operational stats. Experimental. Default: False |
include_table_lineage boolean | Whether table lineage should be ingested. Default: True |
include_table_location_lineage boolean | If the source supports it, include table lineage to the underlying storage location. Default: True |
include_table_rename_lineage boolean | Whether we should follow alter table ... rename to statements when computing lineage. Default: True |
include_tables boolean | Whether tables should be ingested. Default: True |
include_top_n_queries boolean | Whether to ingest the top_n_queries. Default: True |
include_unload_lineage boolean | Whether lineage should be collected from unload commands Default: True |
include_usage_statistics boolean | Generate usage statistic. email_domain config parameter needs to be set if enabled Default: False |
include_view_column_lineage boolean | Populates column-level lineage for view->view and table->view lineage using DataHub's sql parser. Requires include_view_lineage to be enabled. Default: True |
include_view_lineage boolean | Populates view->view and table->view lineage using DataHub's sql parser. Default: True |
include_views boolean | Whether views should be ingested. Default: True |
incremental_lineage boolean | When enabled, emits lineage as incremental to existing lineage already in DataHub. When disabled, re-states lineage on each run. This config works with rest-sink only. Default: False |
is_serverless boolean | Whether target Redshift instance is serverless (alternative is provisioned cluster) Default: False |
lineage_v2_generate_queries boolean | Whether to generate queries entities for the new SQL-based lineage collector. Default: True |
match_fully_qualified_names boolean | Whether schema_pattern is matched against fully qualified schema name <database>.<schema> . Default: False |
options object | Any options specified here will be passed to SQLAlchemy.create_engine as kwargs. To set connection arguments in the URL, specify them under connect_args . |
password string(password) | password |
patch_custom_properties boolean | Whether to patch custom properties on existing datasets rather than replace. Default: True |
platform_instance string | The instance of the platform that all assets produced by this recipe belong to |
platform_instance_map map(str,string) | |
resolve_temp_table_in_lineage boolean | Whether to resolve temp table appear in lineage to upstream permanent tables. Default: True |
sqlalchemy_uri string | URI of database to connect to. See https://docs.sqlalchemy.org/en/14/core/engines.html#database-urls. Takes precedence over other connection parameters. |
start_time string(date-time) | Earliest date of lineage/usage to consider. Default: Last full day in UTC (or hour, depending on bucket_duration ). You can also specify relative time with respect to end_time such as '-7 days' Or '-7d'. |
table_lineage_mode Enum | Which table lineage collector mode to use. Available modes are: [stl_scan_based, sql_based, mixed] Default: mixed |
top_n_queries integer | Number of top queries to save to each table. Default: 10 |
use_file_backed_cache boolean | Whether to use a file backed cache for the view definitions. Default: True |
use_lineage_v2 boolean | Whether to use the new SQL-based lineage collector. Default: False |
username string | username |
env string | The environment that all assets produced by this connector belong to Default: PROD |
domain map(str,AllowDenyPattern) | A class to store allow deny regexes |
domain. key .allowarray | List of regex patterns to include in ingestion Default: ['.*'] |
domain. key .allow.stringstring | |
domain. key .ignoreCaseboolean | Whether to ignore case sensitivity during pattern matching. Default: True |
domain. key .denyarray | List of regex patterns to exclude from ingestion. Default: [] |
domain. key .deny.stringstring | |
profile_pattern AllowDenyPattern | Regex patterns to filter tables (or specific columns) for profiling during ingestion. Note that only tables allowed by the table_pattern will be considered. Default: {'allow': ['.*'], 'deny': [], 'ignoreCase': True} |
profile_pattern.ignoreCase boolean | Whether to ignore case sensitivity during pattern matching. Default: True |
profile_pattern.allow array | List of regex patterns to include in ingestion Default: ['.*'] |
profile_pattern.allow.string string | |
profile_pattern.deny array | List of regex patterns to exclude from ingestion. Default: [] |
profile_pattern.deny.string string | |
s3_lineage_config S3LineageProviderConfig | Common config for S3 lineage generation Default: {'path_specs': [], 'strip_urls': True} |
s3_lineage_config.strip_urls boolean | Strip filename from s3 url. It only applies if path_specs are not specified. Default: True |
s3_lineage_config.path_specs array | List of PathSpec. See below the details about PathSpec Default: [] |
s3_lineage_config.path_specs.PathSpec PathSpec | |
s3_lineage_config.path_specs.PathSpec.include ❓ string | Path to table. Name variable {table} is used to mark the folder with dataset. In absence of {table} , file level dataset will be created. Check below examples for more details. |
s3_lineage_config.path_specs.PathSpec.allow_double_stars boolean | Allow double stars in the include path. This can affect performance significantly if enabled Default: False |
s3_lineage_config.path_specs.PathSpec.default_extension string | For files without extension it will assume the specified file type. If it is not set the files without extensions will be skipped. |
s3_lineage_config.path_specs.PathSpec.enable_compression boolean | Enable or disable processing compressed files. Currently .gz and .bz files are supported. Default: True |
s3_lineage_config.path_specs.PathSpec.sample_files boolean | Not listing all the files but only taking a handful amount of sample file to infer the schema. File count and file size calculation will be disabled. This can affect performance significantly if enabled Default: True |
s3_lineage_config.path_specs.PathSpec.table_name string | Display name of the dataset.Combination of named variables from include path and strings |
s3_lineage_config.path_specs.PathSpec.exclude array | list of paths in glob pattern which will be excluded while scanning for the datasets |
s3_lineage_config.path_specs.PathSpec.exclude.string string | |
s3_lineage_config.path_specs.PathSpec.file_types array | Files with extenstions specified here (subset of default value) only will be scanned to create dataset. Other files will be omitted. Default: ['csv', 'tsv', 'json', 'parquet', 'avro'] |
s3_lineage_config.path_specs.PathSpec.file_types.string string | |
schema_pattern AllowDenyPattern | Regex patterns for schemas to filter in ingestion. Specify regex to only match the schema name. e.g. to match all tables in schema analytics, use the regex 'analytics' Default: {'allow': ['.*'], 'deny': [], 'ignoreCase': True} |
schema_pattern.ignoreCase boolean | Whether to ignore case sensitivity during pattern matching. Default: True |
schema_pattern.allow array | List of regex patterns to include in ingestion Default: ['.*'] |
schema_pattern.allow.string string | |
schema_pattern.deny array | List of regex patterns to exclude from ingestion. Default: [] |
schema_pattern.deny.string string | |
table_pattern AllowDenyPattern | Regex patterns for tables to filter in ingestion. Specify regex to match the entire table name in database.schema.table format. e.g. to match all tables starting with customer in Customer database and public schema, use the regex 'Customer.public.customer.*' Default: {'allow': ['.*'], 'deny': [], 'ignoreCase': True} |
table_pattern.ignoreCase boolean | Whether to ignore case sensitivity during pattern matching. Default: True |
table_pattern.allow array | List of regex patterns to include in ingestion Default: ['.*'] |
table_pattern.allow.string string | |
table_pattern.deny array | List of regex patterns to exclude from ingestion. Default: [] |
table_pattern.deny.string string | |
user_email_pattern AllowDenyPattern | regex patterns for user emails to filter in usage. Default: {'allow': ['.*'], 'deny': [], 'ignoreCase': True} |
user_email_pattern.ignoreCase boolean | Whether to ignore case sensitivity during pattern matching. Default: True |
user_email_pattern.allow array | List of regex patterns to include in ingestion Default: ['.*'] |
user_email_pattern.allow.string string | |
user_email_pattern.deny array | List of regex patterns to exclude from ingestion. Default: [] |
user_email_pattern.deny.string string | |
view_pattern AllowDenyPattern | Regex patterns for views to filter in ingestion. Note: Defaults to table_pattern if not specified. Specify regex to match the entire view name in database.schema.view format. e.g. to match all views starting with customer in Customer database and public schema, use the regex 'Customer.public.customer.*' Default: {'allow': ['.*'], 'deny': [], 'ignoreCase': True} |
view_pattern.ignoreCase boolean | Whether to ignore case sensitivity during pattern matching. Default: True |
view_pattern.allow array | List of regex patterns to include in ingestion Default: ['.*'] |
view_pattern.allow.string string | |
view_pattern.deny array | List of regex patterns to exclude from ingestion. Default: [] |
view_pattern.deny.string string | |
profiling GEProfilingConfig | Default: {'enabled': False, 'operation_config': {'lower_fre... |
profiling.catch_exceptions boolean | Default: True |
profiling.enabled boolean | Whether profiling should be done. Default: False |
profiling.field_sample_values_limit integer | Upper limit for number of sample values to collect for all columns. Default: 20 |
profiling.include_field_distinct_count boolean | Whether to profile for the number of distinct values for each column. Default: True |
profiling.include_field_distinct_value_frequencies boolean | Whether to profile for distinct value frequencies. Default: False |
profiling.include_field_histogram boolean | Whether to profile for the histogram for numeric fields. Default: False |
profiling.include_field_max_value boolean | Whether to profile for the max value of numeric columns. Default: True |
profiling.include_field_mean_value boolean | Whether to profile for the mean value of numeric columns. Default: True |
profiling.include_field_median_value boolean | Whether to profile for the median value of numeric columns. Default: True |
profiling.include_field_min_value boolean | Whether to profile for the min value of numeric columns. Default: True |
profiling.include_field_null_count boolean | Whether to profile for the number of nulls for each column. Default: True |
profiling.include_field_quantiles boolean | Whether to profile for the quantiles of numeric columns. Default: False |
profiling.include_field_sample_values boolean | Whether to profile for the sample values for all columns. Default: True |
profiling.include_field_stddev_value boolean | Whether to profile for the standard deviation of numeric columns. Default: True |
profiling.limit integer | Max number of documents to profile. By default, profiles all documents. |
profiling.max_number_of_fields_to_profile integer | A positive integer that specifies the maximum number of columns to profile for any table. None implies all columns. The cost of profiling goes up significantly as the number of columns to profile goes up. |
profiling.max_workers integer | Number of worker threads to use for profiling. Set to 1 to disable. Default: 20 |
profiling.offset integer | Offset in documents to profile. By default, uses no offset. |
profiling.partition_datetime string(date-time) | If specified, profile only the partition which matches this datetime. If not specified, profile the latest partition. Only Bigquery supports this. |
profiling.partition_profiling_enabled boolean | Whether to profile partitioned tables. Only BigQuery supports this. If enabled, latest partition data is used for profiling. Default: True |
profiling.profile_external_tables boolean | Whether to profile external tables. Only Snowflake and Redshift supports this. Default: False |
profiling.profile_if_updated_since_days number | Profile table only if it has been updated since these many number of days. If set to null , no constraint of last modified time for tables to profile. Supported only in snowflake and BigQuery . |
profiling.profile_table_level_only boolean | Whether to perform profiling at table-level only, or include column-level profiling as well. Default: False |
profiling.profile_table_row_count_estimate_only boolean | Use an approximate query for row count. This will be much faster but slightly less accurate. Only supported for Postgres and MySQL. Default: False |
profiling.profile_table_row_limit integer | Profile tables only if their row count is less then specified count. If set to null , no limit on the row count of tables to profile. Supported only in snowflake and BigQuery Default: 5000000 |
profiling.profile_table_size_limit integer | Profile tables only if their size is less then specified GBs. If set to null , no limit on the size of tables to profile. Supported only in snowflake and BigQuery Default: 5 |
profiling.query_combiner_enabled boolean | This feature is still experimental and can be disabled if it causes issues. Reduces the total number of queries issued and speeds up profiling by dynamically combining SQL queries where possible. Default: True |
profiling.report_dropped_profiles boolean | Whether to report datasets or dataset columns which were not profiled. Set to True for debugging purposes. Default: False |
profiling.sample_size integer | Number of rows to be sampled from table for column level profiling.Applicable only if use_sampling is set to True. Default: 10000 |
profiling.turn_off_expensive_profiling_metrics boolean | Whether to turn off expensive profiling or not. This turns off profiling for quantiles, distinct_value_frequencies, histogram & sample_values. This also limits maximum number of fields being profiled to 10. Default: False |
profiling.use_sampling boolean | Whether to profile column level stats on sample of table. Only BigQuery and Snowflake support this. If enabled, profiling is done on rows sampled from table. Sampling is not done for smaller tables. Default: True |
profiling.operation_config OperationConfig | Experimental feature. To specify operation configs. |
profiling.operation_config.lower_freq_profile_enabled boolean | Whether to do profiling at lower freq or not. This does not do any scheduling just adds additional checks to when not to run profiling. Default: False |
profiling.operation_config.profile_date_of_month integer | Number between 1 to 31 for date of month (both inclusive). If not specified, defaults to Nothing and this field does not take affect. |
profiling.operation_config.profile_day_of_week integer | Number between 0 to 6 for day of week (both inclusive). 0 is Monday and 6 is Sunday. If not specified, defaults to Nothing and this field does not take affect. |
stateful_ingestion StatefulStaleMetadataRemovalConfig | Base specialized config for Stateful Ingestion with stale metadata removal capability. |
stateful_ingestion.enabled boolean | Whether or not to enable stateful ingest. Default: True if datahub-rest sink is used or if a datahub_api is specified, otherwise False Default: False |
stateful_ingestion.remove_stale_metadata boolean | Soft-deletes the entities present in the last successful run but missing in the current run with stateful_ingestion enabled. Default: True |
The JSONSchema for this configuration is inlined below.
{
"title": "RedshiftConfig",
"description": "Base configuration class for stateful ingestion for source configs to inherit from.",
"type": "object",
"properties": {
"enable_stateful_profiling": {
"title": "Enable Stateful Profiling",
"description": "Enable stateful profiling. This will store profiling timestamps per dataset after successful profiling. and will not run profiling again in subsequent run if table has not been updated. ",
"default": true,
"type": "boolean"
},
"enable_stateful_lineage_ingestion": {
"title": "Enable Stateful Lineage Ingestion",
"description": "Enable stateful lineage ingestion. This will store lineage window timestamps after successful lineage ingestion. and will not run lineage ingestion for same timestamps in subsequent run. ",
"default": true,
"type": "boolean"
},
"bucket_duration": {
"description": "Size of the time window to aggregate usage stats.",
"default": "DAY",
"allOf": [
{
"$ref": "#/definitions/BucketDuration"
}
]
},
"end_time": {
"title": "End Time",
"description": "Latest date of lineage/usage to consider. Default: Current time in UTC",
"type": "string",
"format": "date-time"
},
"start_time": {
"title": "Start Time",
"description": "Earliest date of lineage/usage to consider. Default: Last full day in UTC (or hour, depending on `bucket_duration`). You can also specify relative time with respect to end_time such as '-7 days' Or '-7d'.",
"type": "string",
"format": "date-time"
},
"enable_stateful_usage_ingestion": {
"title": "Enable Stateful Usage Ingestion",
"description": "Enable stateful lineage ingestion. This will store usage window timestamps after successful usage ingestion. and will not run usage ingestion for same timestamps in subsequent run. ",
"default": true,
"type": "boolean"
},
"top_n_queries": {
"title": "Top N Queries",
"description": "Number of top queries to save to each table.",
"default": 10,
"exclusiveMinimum": 0,
"type": "integer"
},
"user_email_pattern": {
"title": "User Email Pattern",
"description": "regex patterns for user emails to filter in usage.",
"default": {
"allow": [
".*"
],
"deny": [],
"ignoreCase": true
},
"allOf": [
{
"$ref": "#/definitions/AllowDenyPattern"
}
]
},
"include_operational_stats": {
"title": "Include Operational Stats",
"description": "Whether to display operational stats.",
"default": true,
"type": "boolean"
},
"include_read_operational_stats": {
"title": "Include Read Operational Stats",
"description": "Whether to report read operational stats. Experimental.",
"default": false,
"type": "boolean"
},
"format_sql_queries": {
"title": "Format Sql Queries",
"description": "Whether to format sql queries",
"default": false,
"type": "boolean"
},
"include_top_n_queries": {
"title": "Include Top N Queries",
"description": "Whether to ingest the top_n_queries.",
"default": true,
"type": "boolean"
},
"email_domain": {
"title": "Email Domain",
"description": "Email domain of your organisation so users can be displayed on UI appropriately.",
"type": "string"
},
"s3_lineage_config": {
"title": "S3 Lineage Config",
"description": "Common config for S3 lineage generation",
"default": {
"path_specs": [],
"strip_urls": true
},
"allOf": [
{
"$ref": "#/definitions/S3LineageProviderConfig"
}
]
},
"env": {
"title": "Env",
"description": "The environment that all assets produced by this connector belong to",
"default": "PROD",
"type": "string"
},
"platform_instance_map": {
"title": "Platform Instance Map",
"description": "A holder for platform -> platform_instance mappings to generate correct dataset urns",
"type": "object",
"additionalProperties": {
"type": "string"
}
},
"incremental_lineage": {
"title": "Incremental Lineage",
"description": "When enabled, emits lineage as incremental to existing lineage already in DataHub. When disabled, re-states lineage on each run. This config works with rest-sink only.",
"default": false,
"type": "boolean"
},
"convert_urns_to_lowercase": {
"title": "Convert Urns To Lowercase",
"description": "Whether to convert dataset urns to lowercase.",
"default": false,
"type": "boolean"
},
"platform_instance": {
"title": "Platform Instance",
"description": "The instance of the platform that all assets produced by this recipe belong to",
"type": "string"
},
"stateful_ingestion": {
"$ref": "#/definitions/StatefulStaleMetadataRemovalConfig"
},
"options": {
"title": "Options",
"description": "Any options specified here will be passed to [SQLAlchemy.create_engine](https://docs.sqlalchemy.org/en/14/core/engines.html#sqlalchemy.create_engine) as kwargs. To set connection arguments in the URL, specify them under `connect_args`.",
"type": "object"
},
"schema_pattern": {
"title": "Schema Pattern",
"description": "Regex patterns for schemas to filter in ingestion. Specify regex to only match the schema name. e.g. to match all tables in schema analytics, use the regex 'analytics'",
"default": {
"allow": [
".*"
],
"deny": [],
"ignoreCase": true
},
"allOf": [
{
"$ref": "#/definitions/AllowDenyPattern"
}
]
},
"table_pattern": {
"title": "Table Pattern",
"description": "Regex patterns for tables to filter in ingestion. Specify regex to match the entire table name in database.schema.table format. e.g. to match all tables starting with customer in Customer database and public schema, use the regex 'Customer.public.customer.*'",
"default": {
"allow": [
".*"
],
"deny": [],
"ignoreCase": true
},
"allOf": [
{
"$ref": "#/definitions/AllowDenyPattern"
}
]
},
"view_pattern": {
"title": "View Pattern",
"description": "Regex patterns for views to filter in ingestion. Note: Defaults to table_pattern if not specified. Specify regex to match the entire view name in database.schema.view format. e.g. to match all views starting with customer in Customer database and public schema, use the regex 'Customer.public.customer.*'",
"default": {
"allow": [
".*"
],
"deny": [],
"ignoreCase": true
},
"allOf": [
{
"$ref": "#/definitions/AllowDenyPattern"
}
]
},
"profile_pattern": {
"title": "Profile Pattern",
"description": "Regex patterns to filter tables (or specific columns) for profiling during ingestion. Note that only tables allowed by the `table_pattern` will be considered.",
"default": {
"allow": [
".*"
],
"deny": [],
"ignoreCase": true
},
"allOf": [
{
"$ref": "#/definitions/AllowDenyPattern"
}
]
},
"domain": {
"title": "Domain",
"description": "Attach domains to databases, schemas or tables during ingestion using regex patterns. Domain key can be a guid like *urn:li:domain:ec428203-ce86-4db3-985d-5a8ee6df32ba* or a string like \"Marketing\".) If you provide strings, then datahub will attempt to resolve this name to a guid, and will error out if this fails. There can be multiple domain keys specified.",
"default": {},
"type": "object",
"additionalProperties": {
"$ref": "#/definitions/AllowDenyPattern"
}
},
"include_views": {
"title": "Include Views",
"description": "Whether views should be ingested.",
"default": true,
"type": "boolean"
},
"include_tables": {
"title": "Include Tables",
"description": "Whether tables should be ingested.",
"default": true,
"type": "boolean"
},
"include_table_location_lineage": {
"title": "Include Table Location Lineage",
"description": "If the source supports it, include table lineage to the underlying storage location.",
"default": true,
"type": "boolean"
},
"include_view_lineage": {
"title": "Include View Lineage",
"description": "Populates view->view and table->view lineage using DataHub's sql parser.",
"default": true,
"type": "boolean"
},
"include_view_column_lineage": {
"title": "Include View Column Lineage",
"description": "Populates column-level lineage for view->view and table->view lineage using DataHub's sql parser. Requires `include_view_lineage` to be enabled.",
"default": true,
"type": "boolean"
},
"use_file_backed_cache": {
"title": "Use File Backed Cache",
"description": "Whether to use a file backed cache for the view definitions.",
"default": true,
"type": "boolean"
},
"profiling": {
"title": "Profiling",
"default": {
"enabled": false,
"operation_config": {
"lower_freq_profile_enabled": false,
"profile_day_of_week": null,
"profile_date_of_month": null
},
"limit": null,
"offset": null,
"report_dropped_profiles": false,
"turn_off_expensive_profiling_metrics": false,
"profile_table_level_only": false,
"include_field_null_count": true,
"include_field_distinct_count": true,
"include_field_min_value": true,
"include_field_max_value": true,
"include_field_mean_value": true,
"include_field_median_value": true,
"include_field_stddev_value": true,
"include_field_quantiles": false,
"include_field_distinct_value_frequencies": false,
"include_field_histogram": false,
"include_field_sample_values": true,
"field_sample_values_limit": 20,
"max_number_of_fields_to_profile": null,
"profile_if_updated_since_days": null,
"profile_table_size_limit": 5,
"profile_table_row_limit": 5000000,
"profile_table_row_count_estimate_only": false,
"max_workers": 20,
"query_combiner_enabled": true,
"catch_exceptions": true,
"partition_profiling_enabled": true,
"partition_datetime": null,
"use_sampling": true,
"sample_size": 10000,
"profile_external_tables": false
},
"allOf": [
{
"$ref": "#/definitions/GEProfilingConfig"
}
]
},
"username": {
"title": "Username",
"description": "username",
"type": "string"
},
"password": {
"title": "Password",
"description": "password",
"type": "string",
"writeOnly": true,
"format": "password"
},
"host_port": {
"title": "Host Port",
"description": "host URL",
"type": "string"
},
"database": {
"title": "Database",
"description": "database",
"default": "dev",
"type": "string"
},
"sqlalchemy_uri": {
"title": "Sqlalchemy Uri",
"description": "URI of database to connect to. See https://docs.sqlalchemy.org/en/14/core/engines.html#database-urls. Takes precedence over other connection parameters.",
"type": "string"
},
"default_schema": {
"title": "Default Schema",
"description": "The default schema to use if the sql parser fails to parse the schema with `sql_based` lineage collector",
"default": "public",
"type": "string"
},
"is_serverless": {
"title": "Is Serverless",
"description": "Whether target Redshift instance is serverless (alternative is provisioned cluster)",
"default": false,
"type": "boolean"
},
"use_lineage_v2": {
"title": "Use Lineage V2",
"description": "Whether to use the new SQL-based lineage collector.",
"default": false,
"type": "boolean"
},
"lineage_v2_generate_queries": {
"title": "Lineage V2 Generate Queries",
"description": "Whether to generate queries entities for the new SQL-based lineage collector.",
"default": true,
"type": "boolean"
},
"include_table_lineage": {
"title": "Include Table Lineage",
"description": "Whether table lineage should be ingested.",
"default": true,
"type": "boolean"
},
"include_copy_lineage": {
"title": "Include Copy Lineage",
"description": "Whether lineage should be collected from copy commands",
"default": true,
"type": "boolean"
},
"include_usage_statistics": {
"title": "Include Usage Statistics",
"description": "Generate usage statistic. email_domain config parameter needs to be set if enabled",
"default": false,
"type": "boolean"
},
"include_unload_lineage": {
"title": "Include Unload Lineage",
"description": "Whether lineage should be collected from unload commands",
"default": true,
"type": "boolean"
},
"include_table_rename_lineage": {
"title": "Include Table Rename Lineage",
"description": "Whether we should follow `alter table ... rename to` statements when computing lineage. ",
"default": true,
"type": "boolean"
},
"table_lineage_mode": {
"description": "Which table lineage collector mode to use. Available modes are: [stl_scan_based, sql_based, mixed]",
"default": "mixed",
"allOf": [
{
"$ref": "#/definitions/LineageMode"
}
]
},
"extra_client_options": {
"title": "Extra Client Options",
"default": {},
"type": "object"
},
"match_fully_qualified_names": {
"title": "Match Fully Qualified Names",
"description": "Whether `schema_pattern` is matched against fully qualified schema name `<database>.<schema>`.",
"default": false,
"type": "boolean"
},
"extract_column_level_lineage": {
"title": "Extract Column Level Lineage",
"description": "Whether to extract column level lineage. This config works with rest-sink only.",
"default": true,
"type": "boolean"
},
"patch_custom_properties": {
"title": "Patch Custom Properties",
"description": "Whether to patch custom properties on existing datasets rather than replace.",
"default": true,
"type": "boolean"
},
"resolve_temp_table_in_lineage": {
"title": "Resolve Temp Table In Lineage",
"description": "Whether to resolve temp table appear in lineage to upstream permanent tables.",
"default": true,
"type": "boolean"
}
},
"required": [
"host_port"
],
"additionalProperties": false,
"definitions": {
"BucketDuration": {
"title": "BucketDuration",
"description": "An enumeration.",
"enum": [
"DAY",
"HOUR"
],
"type": "string"
},
"AllowDenyPattern": {
"title": "AllowDenyPattern",
"description": "A class to store allow deny regexes",
"type": "object",
"properties": {
"allow": {
"title": "Allow",
"description": "List of regex patterns to include in ingestion",
"default": [
".*"
],
"type": "array",
"items": {
"type": "string"
}
},
"deny": {
"title": "Deny",
"description": "List of regex patterns to exclude from ingestion.",
"default": [],
"type": "array",
"items": {
"type": "string"
}
},
"ignoreCase": {
"title": "Ignorecase",
"description": "Whether to ignore case sensitivity during pattern matching.",
"default": true,
"type": "boolean"
}
},
"additionalProperties": false
},
"PathSpec": {
"title": "PathSpec",
"type": "object",
"properties": {
"include": {
"title": "Include",
"description": "Path to table. Name variable `{table}` is used to mark the folder with dataset. In absence of `{table}`, file level dataset will be created. Check below examples for more details.",
"type": "string"
},
"exclude": {
"title": "Exclude",
"description": "list of paths in glob pattern which will be excluded while scanning for the datasets",
"type": "array",
"items": {
"type": "string"
}
},
"file_types": {
"title": "File Types",
"description": "Files with extenstions specified here (subset of default value) only will be scanned to create dataset. Other files will be omitted.",
"default": [
"csv",
"tsv",
"json",
"parquet",
"avro"
],
"type": "array",
"items": {
"type": "string"
}
},
"default_extension": {
"title": "Default Extension",
"description": "For files without extension it will assume the specified file type. If it is not set the files without extensions will be skipped.",
"type": "string"
},
"table_name": {
"title": "Table Name",
"description": "Display name of the dataset.Combination of named variables from include path and strings",
"type": "string"
},
"enable_compression": {
"title": "Enable Compression",
"description": "Enable or disable processing compressed files. Currently .gz and .bz files are supported.",
"default": true,
"type": "boolean"
},
"sample_files": {
"title": "Sample Files",
"description": "Not listing all the files but only taking a handful amount of sample file to infer the schema. File count and file size calculation will be disabled. This can affect performance significantly if enabled",
"default": true,
"type": "boolean"
},
"allow_double_stars": {
"title": "Allow Double Stars",
"description": "Allow double stars in the include path. This can affect performance significantly if enabled",
"default": false,
"type": "boolean"
}
},
"required": [
"include"
],
"additionalProperties": false
},
"S3LineageProviderConfig": {
"title": "S3LineageProviderConfig",
"description": "Any source that produces s3 lineage from/to Datasets should inherit this class.",
"type": "object",
"properties": {
"path_specs": {
"title": "Path Specs",
"description": "List of PathSpec. See below the details about PathSpec",
"default": [],
"type": "array",
"items": {
"$ref": "#/definitions/PathSpec"
}
},
"strip_urls": {
"title": "Strip Urls",
"description": "Strip filename from s3 url. It only applies if path_specs are not specified.",
"default": true,
"type": "boolean"
}
},
"additionalProperties": false
},
"DynamicTypedStateProviderConfig": {
"title": "DynamicTypedStateProviderConfig",
"type": "object",
"properties": {
"type": {
"title": "Type",
"description": "The type of the state provider to use. For DataHub use `datahub`",
"type": "string"
},
"config": {
"title": "Config",
"description": "The configuration required for initializing the state provider. Default: The datahub_api config if set at pipeline level. Otherwise, the default DatahubClientConfig. See the defaults (https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/graph/client.py#L19).",
"default": {},
"type": "object"
}
},
"required": [
"type"
],
"additionalProperties": false
},
"StatefulStaleMetadataRemovalConfig": {
"title": "StatefulStaleMetadataRemovalConfig",
"description": "Base specialized config for Stateful Ingestion with stale metadata removal capability.",
"type": "object",
"properties": {
"enabled": {
"title": "Enabled",
"description": "Whether or not to enable stateful ingest. Default: True if datahub-rest sink is used or if a `datahub_api` is specified, otherwise False",
"default": false,
"type": "boolean"
},
"remove_stale_metadata": {
"title": "Remove Stale Metadata",
"description": "Soft-deletes the entities present in the last successful run but missing in the current run with stateful_ingestion enabled.",
"default": true,
"type": "boolean"
}
},
"additionalProperties": false
},
"OperationConfig": {
"title": "OperationConfig",
"type": "object",
"properties": {
"lower_freq_profile_enabled": {
"title": "Lower Freq Profile Enabled",
"description": "Whether to do profiling at lower freq or not. This does not do any scheduling just adds additional checks to when not to run profiling.",
"default": false,
"type": "boolean"
},
"profile_day_of_week": {
"title": "Profile Day Of Week",
"description": "Number between 0 to 6 for day of week (both inclusive). 0 is Monday and 6 is Sunday. If not specified, defaults to Nothing and this field does not take affect.",
"type": "integer"
},
"profile_date_of_month": {
"title": "Profile Date Of Month",
"description": "Number between 1 to 31 for date of month (both inclusive). If not specified, defaults to Nothing and this field does not take affect.",
"type": "integer"
}
},
"additionalProperties": false
},
"GEProfilingConfig": {
"title": "GEProfilingConfig",
"type": "object",
"properties": {
"enabled": {
"title": "Enabled",
"description": "Whether profiling should be done.",
"default": false,
"type": "boolean"
},
"operation_config": {
"title": "Operation Config",
"description": "Experimental feature. To specify operation configs.",
"allOf": [
{
"$ref": "#/definitions/OperationConfig"
}
]
},
"limit": {
"title": "Limit",
"description": "Max number of documents to profile. By default, profiles all documents.",
"type": "integer"
},
"offset": {
"title": "Offset",
"description": "Offset in documents to profile. By default, uses no offset.",
"type": "integer"
},
"report_dropped_profiles": {
"title": "Report Dropped Profiles",
"description": "Whether to report datasets or dataset columns which were not profiled. Set to `True` for debugging purposes.",
"default": false,
"type": "boolean"
},
"turn_off_expensive_profiling_metrics": {
"title": "Turn Off Expensive Profiling Metrics",
"description": "Whether to turn off expensive profiling or not. This turns off profiling for quantiles, distinct_value_frequencies, histogram & sample_values. This also limits maximum number of fields being profiled to 10.",
"default": false,
"type": "boolean"
},
"profile_table_level_only": {
"title": "Profile Table Level Only",
"description": "Whether to perform profiling at table-level only, or include column-level profiling as well.",
"default": false,
"type": "boolean"
},
"include_field_null_count": {
"title": "Include Field Null Count",
"description": "Whether to profile for the number of nulls for each column.",
"default": true,
"type": "boolean"
},
"include_field_distinct_count": {
"title": "Include Field Distinct Count",
"description": "Whether to profile for the number of distinct values for each column.",
"default": true,
"type": "boolean"
},
"include_field_min_value": {
"title": "Include Field Min Value",
"description": "Whether to profile for the min value of numeric columns.",
"default": true,
"type": "boolean"
},
"include_field_max_value": {
"title": "Include Field Max Value",
"description": "Whether to profile for the max value of numeric columns.",
"default": true,
"type": "boolean"
},
"include_field_mean_value": {
"title": "Include Field Mean Value",
"description": "Whether to profile for the mean value of numeric columns.",
"default": true,
"type": "boolean"
},
"include_field_median_value": {
"title": "Include Field Median Value",
"description": "Whether to profile for the median value of numeric columns.",
"default": true,
"type": "boolean"
},
"include_field_stddev_value": {
"title": "Include Field Stddev Value",
"description": "Whether to profile for the standard deviation of numeric columns.",
"default": true,
"type": "boolean"
},
"include_field_quantiles": {
"title": "Include Field Quantiles",
"description": "Whether to profile for the quantiles of numeric columns.",
"default": false,
"type": "boolean"
},
"include_field_distinct_value_frequencies": {
"title": "Include Field Distinct Value Frequencies",
"description": "Whether to profile for distinct value frequencies.",
"default": false,
"type": "boolean"
},
"include_field_histogram": {
"title": "Include Field Histogram",
"description": "Whether to profile for the histogram for numeric fields.",
"default": false,
"type": "boolean"
},
"include_field_sample_values": {
"title": "Include Field Sample Values",
"description": "Whether to profile for the sample values for all columns.",
"default": true,
"type": "boolean"
},
"field_sample_values_limit": {
"title": "Field Sample Values Limit",
"description": "Upper limit for number of sample values to collect for all columns.",
"default": 20,
"type": "integer"
},
"max_number_of_fields_to_profile": {
"title": "Max Number Of Fields To Profile",
"description": "A positive integer that specifies the maximum number of columns to profile for any table. `None` implies all columns. The cost of profiling goes up significantly as the number of columns to profile goes up.",
"exclusiveMinimum": 0,
"type": "integer"
},
"profile_if_updated_since_days": {
"title": "Profile If Updated Since Days",
"description": "Profile table only if it has been updated since these many number of days. If set to `null`, no constraint of last modified time for tables to profile. Supported only in `snowflake` and `BigQuery`.",
"exclusiveMinimum": 0,
"type": "number"
},
"profile_table_size_limit": {
"title": "Profile Table Size Limit",
"description": "Profile tables only if their size is less then specified GBs. If set to `null`, no limit on the size of tables to profile. Supported only in `snowflake` and `BigQuery`",
"default": 5,
"type": "integer"
},
"profile_table_row_limit": {
"title": "Profile Table Row Limit",
"description": "Profile tables only if their row count is less then specified count. If set to `null`, no limit on the row count of tables to profile. Supported only in `snowflake` and `BigQuery`",
"default": 5000000,
"type": "integer"
},
"profile_table_row_count_estimate_only": {
"title": "Profile Table Row Count Estimate Only",
"description": "Use an approximate query for row count. This will be much faster but slightly less accurate. Only supported for Postgres and MySQL. ",
"default": false,
"type": "boolean"
},
"max_workers": {
"title": "Max Workers",
"description": "Number of worker threads to use for profiling. Set to 1 to disable.",
"default": 20,
"type": "integer"
},
"query_combiner_enabled": {
"title": "Query Combiner Enabled",
"description": "*This feature is still experimental and can be disabled if it causes issues.* Reduces the total number of queries issued and speeds up profiling by dynamically combining SQL queries where possible.",
"default": true,
"type": "boolean"
},
"catch_exceptions": {
"title": "Catch Exceptions",
"default": true,
"type": "boolean"
},
"partition_profiling_enabled": {
"title": "Partition Profiling Enabled",
"description": "Whether to profile partitioned tables. Only BigQuery supports this. If enabled, latest partition data is used for profiling.",
"default": true,
"type": "boolean"
},
"partition_datetime": {
"title": "Partition Datetime",
"description": "If specified, profile only the partition which matches this datetime. If not specified, profile the latest partition. Only Bigquery supports this.",
"type": "string",
"format": "date-time"
},
"use_sampling": {
"title": "Use Sampling",
"description": "Whether to profile column level stats on sample of table. Only BigQuery and Snowflake support this. If enabled, profiling is done on rows sampled from table. Sampling is not done for smaller tables. ",
"default": true,
"type": "boolean"
},
"sample_size": {
"title": "Sample Size",
"description": "Number of rows to be sampled from table for column level profiling.Applicable only if `use_sampling` is set to True.",
"default": 10000,
"type": "integer"
},
"profile_external_tables": {
"title": "Profile External Tables",
"description": "Whether to profile external tables. Only Snowflake and Redshift supports this.",
"default": false,
"type": "boolean"
}
},
"additionalProperties": false
},
"LineageMode": {
"title": "LineageMode",
"description": "An enumeration.",
"enum": [
"sql_based",
"stl_scan_based",
"mixed"
]
}
}
}
Code Coordinates
- Class Name:
datahub.ingestion.source.redshift.redshift.RedshiftSource
- Browse on GitHub
Questions
If you've got any questions on configuring ingestion for Redshift, feel free to ping us on our Slack.