Setup automatic entity identification in Snowflake

PHOTO EMBED

Fri Aug 11 2023 07:23:19 GMT+0000 (Coordinated Universal Time)

Saved by @Tilores #snowflake #identityresolution

USE DATABASE DEMO;

ALTER TABLE customers SET CHANGE_TRACKING = TRUE;

CREATE or REPLACE TABLE customers_entities(
    "entity_id" VARCHAR,
    "customer_id" VARCHAR
);

CREATE OR REPLACE TABLE TILORES_CONFIG (
    name varchar(255) NOT NULL,
    value varchar(255) default NULL
);

INSERT INTO TILORES_CONFIG VALUES
    ('last_sync', CURRENT_TIMESTAMP()::string),
    ('fetch_on_next_run', 'true');

SELECT * FROM TILORES_CONFIG;
      
CREATE OR REPLACE TASK tilores_sync
    SCHEDULE = '1 MINUTE'
    AS
    DECLARE
        now STRING;
        last_sync STRING;
        fetch_on_next_run STRING;
    BEGIN 
        now := (SELECT CURRENT_TIMESTAMP());
        last_sync := (SELECT value FROM TILORES_CONFIG WHERE name='last_sync');
        fetch_on_next_run := (SELECT value FROM TILORES_CONFIG WHERE name='fetch_on_next_run');
        
        SELECT tilores_ingest(OBJECT_CONSTRUCT(*)) AS ingested 
        FROM (
            SELECT * EXCLUDE(METADATA$ACTION, METADATA$ISUPDATE, METADATA$ROW_ID)
            FROM customers
            CHANGES(INFORMATION => DEFAULT)
                AT(TIMESTAMP => to_timestamp_tz(:last_sync))
            WHERE METADATA$ACTION='INSERT');

        IF (fetch_on_next_run = 'true') THEN
            MERGE INTO customers_entities TARGET USING (SELECT tilores_entity_by_record_id("id"):id AS "entity_id", "id" FROM customers) SOURCE
            ON TARGET."customer_id" = SOURCE."id"
            WHEN MATCHED THEN
              UPDATE SET
                  TARGET."entity_id" = SOURCE."entity_id"
            WHEN NOT MATCHED THEN
              INSERT ("entity_id", "customer_id")
                  VALUES (SOURCE."entity_id", SOURCE."id");
        END IF;
            
        UPDATE TILORES_CONFIG
            SET value = :now
            WHERE name = 'last_sync';
        UPDATE TILORES_CONFIG
            SET value = (SELECT 
                            CASE
                                WHEN count(*) > 0 THEN 'true'
                                ELSE 'false'
                            END AS fetch_on_next_run
                        FROM customers
                        CHANGES(INFORMATION => DEFAULT)
                            AT(TIMESTAMP => to_timestamp_tz(:last_sync))
                        WHERE METADATA$ACTION='INSERT')
            WHERE name = 'fetch_on_next_run';
    END;

ALTER TASK tilores_sync RESUME;

SELECT *
FROM TABLE(information_schema.task_history())
ORDER BY scheduled_time;
content_copyCOPY

https://tilores.io/content/Automating-Data-Deduplication-in-Snowflake