xtkt

module
v0.0.866 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 12, 2023 License: MIT

README ΒΆ

 __  __  ______  __  __   ______  
/\_\_\_\/\__  _\/\ \/ /  /\__  _\ 
\/_/\_\/\/_/\ \/\ \  _"-.\/_/\ \/ 
  /\_\/\_\ \ \_\ \ \_\ \_\  \ \_\ 
  \/_/\/_/  \/_/  \/_/\/_/   \/_/ 
                                  

Release on Homebrew Commit activity (branch) Go Report Card

xtkt ("extract") is an opinionated data extraction tool that follows the Singer.io specification. Supported sources include RESTful-APIs, databases and files (csv, jsonl). HTML scraping in beta.

xtkt can be pipe'd to any target that meets the Singer.io specification but has been designed and tested for databases such as SQLite & Postgres. Each stream is handled independently and deletion-at-source is not detected.

Both new and updated records (per unique_key at source) are sent to your target as new records (with subsequent unique key _sdc_surrogate_key).

Determine which records are processed by xtkt and subsequently sent to your target by using a bookmark. A bookmark can be either a field within the records indicating the latest record processed (e.g. updated_at) or set to new-record-detection (records.bookmark_path: [*], not advised for large data) (see examples below).

In the absence of a bookmark, all records will be processed and sent to your target. This may be suitable if you want to detect hard-deletion in your data model (using _sdc_time_extracted).

Records can be filtered prior to being processed by xtkt using the records.filter_field_paths field in your JSON configuration file (see examples below).

xtkt can also listen for incoming messages (designed for webhooks) and continuously pipe them to your target. Bookmarks and History are not considered when "source_type": "listen".

Fields can be dropped from records prior to being sent to your target using the records.drop_field_paths field in your JSON configuration file (see examples below). This may be suitable for dropping redundant, large objects within a record.

Fields can be hashed within records prior to being sent to your target using the records.sensitive_field_paths field in your JSON configuration file (see examples below). This may be suitable for handling sensitive data.

Intelligent data fields (REMOVED FOR NOW) can be added to your records using OpenAI LLM models using the records.intelligent_fields field in your JSON configuration file (see examples below, requires environment variable OPENAI_API_KEY).

Both integers and floats are sent as floats. All fields are considered NULLABLE.

xtkt is still in development (currently v0.0.8)

πŸ’» Installation

Locally: git clone git@github.com:5amCurfew/xtkt.git; go build

via Homebrew: brew tap 5amCurfew/5amCurfew; brew install 5amCurfew/5amCurfew/xtkt

$ xtkt --help
xtkt is a command line interface to extract data from a RESTful API, database or files to pipe to any target that meets the Singer.io specification

Usage:
  xtkt <PATH_TO_CONFIG_JSON> [flags]

Flags:
  -h, --help          help for xtkt
  -s, --save-schema   save the schema to a file after extraction
  -v, --version       version for xtkt

πŸ”© Using with Singer.io Targets

Install targets (Python) in _targets/ in virtual environments:

  1. python3 -m venv ./_targets/target-name
  2. source ./_targets/target-name/bin/activate
  3. python3 -m pip install target-name
  4. deactivate
xtkt config.json | ./_targets/target-name/bin/target-name`

For example:

xtkt config_github.json | ./_targets/pipelinewise-target-postgres/bin/target-postgres -c config_target_postgres.json 

I have been using jq to view stdout messages in development. For example:

$ xtkt config_github.json 2>&1 | jq .

πŸ’Ύ Metadata

xtkt adds the following metadata to records

  • _sdc_surrogate_key: SHA256 of a record
  • _sdc_natural_key: the unique key identifier of the record at source
  • _sdc_time_extracted: a timestamp (R3339) at the time of the data extraction

πŸ”§ Config.json

xtkt

{
    "stream_name": "<stream_name>", // required, <string>: the name of your stream
    "source_type": "<source_type>", // required, <string>: one of either db, file, html, rest or listen
    "url": "<url>", // required, <string>: address of the data source (e.g. REST-ful API address, database connection URL, relative file path etc)
    "records": { // required <object>: describes handling of records
        "unique_key_path": ["<key_path_1>", "<key_path_2>", ...], // required <array[string]>: path to unique key of records
        "bookmark_path": ["<key_path_1>", "<key_path_1>", ...], // optional <array[string]>: path to bookmark within records
        "drop_field_paths": [ // optional <array[array]>: paths to remove within records
            ["<key_path_1>", "<key_path_1>", ...], // required <array[string]>
            ...
        ],
        "filter_field_paths": [ // optional <array[object]>
            {
                "field_path": ["<key_path_1>", "<key_path_2>", ...], // required <array[string]>: path to field within records
                "operation": "<operation>", // required <string>: one of equal_to, not_equal_to, greater_than, less_than
                "value": "<value>" // required <string/number>: value to filter against
            },
            ...
        ],
        "sensitive_field_paths": [ // optional <array[array]>: array of paths of fields to hash
            ["<sensitive_path_1_1>", "<sensitive_path_1_2>", ...], // required <array[string]>
            ...
        ],
    }
    ...

db, html and listen

    ...
    "db": { // optional <object>: required when "source_type": "db"
        "table": "<table>" // required <string>: table name in database
    },
    "html": { // optional <object>: required when "source_type": "html"
        "elements_path": "<elements_path>", // required <string>: css identifier of elements parent
        "elements": [ // required <array[object]>
            {
            "name": "<element_name>", // required <string>: resulting field name in record
            "path": "<element_path>" // required <string>: css identifier of record
            },
            ...
        ]
    },
    "listen": { // optional <object>: required when "source_type": "listen"
        "collection_interval": "<collection_interval>", // required <int>: period of collection in seconds before emitting record messages
        "port": "<port>" // required <string>: port declaration of xtkt API
    },
    ...

rest

    ...
    "rest": { // optional <object>: required when "source_type": "rest"
        "sleep": "<sleep>", // required <int>: number of seconds between pagination requests
        "auth": { // optional <object>: describe the authorisation strategy
            "required": "<required>", // required <boolean>: is authorisation required?
            "strategy": "<strategy>", // optional <string>: required if "required": true, one of either basic, token or oauth
            "basic": { // optional <object>: required if "strategy": "basic"
                "username": "<username>", // required <string>
                "password": "<password>" // required <string>
            },
            "token": { // optional <object>: required if "strategy": "token"
                "header": "<header>", // required <string>: authorisation header name
                "header_value": "<header_value>" // required <string> authorisation header value
            },
            "oauth": { // optional <object>: required if "strategy": "oauth"
                "client_id": "<client_id>", // required <string>
                "client_secret": "<client_secret>", // required <string>
                "refresh_token": "<refresh_token>", // required <string>
                "token_url": "<token_url>" // required <string>
            }
        },
        "response": { // required <object>: describes the REST-ful API response handling
            "records_path": ["<records_path_1>", "<records_path_2>", ...], // optional <string>: path to records in response (omit if immediately returned)
            "pagination": "<pagination>", // required <boolean>: is there pagination in the response?
            "pagination_strategy": "<pagination_strategy>", // optional <string>: required if "pagination": true, one of either "next" or "query"
            "pagination_next_path": ["<pagination_next_path_1>", "<pagination_next_path_2>", ...], // optional <array[string]>: required if "pagination_strategy": "next", path to "next" URL in response
            "pagination_query": { // optional <object>: required if "pagination_strategy": "query", describes pagination query strategy
                "query_parameter": "<query_parameter>", // required <string>: parameter name for URL pagination
                "query_value": "<query_value>", // required <int>: initial value after base URL is called
                "query_increment": "<query_increment>" // required <int>: query parameter increment
            }
        }
    }
    ...

πŸš€ Examples

Rick & Morty API

No authentication required, records found in the response "results" array, paginated using "next", new-record-detection used for bookmark

config.json

{
    "stream_name": "rick_and_morty_characters",
    "source_type": "rest",
    "url": "https://rickandmortyapi.com/api/character",
    "records": {
        "unique_key_path": ["id"],
        "bookmark_path": ["*"],
        "drop_field_paths": [
            ["episode"],
            ["origin", "url"]
        ],
        "filter_field_paths": [
            {
                "field_path": ["gender"],
                "operation": "equal_to",
                "value": "Female"
            }
        ],
        "sensitive_field_paths": [
            ["name"],
            ["location", "name"]
        ]
    },
    "rest": {
        "sleep": 0,
        "auth": {
            "required": false
        },
        "response": {
            "records_path": ["results"],
            "pagination": true,
            "pagination_strategy": "next",
            "pagination_next_path": ["info", "next"]
        }
    }
}

Github API

Token authentication required, records returned immediately as an array, pagination using query parameter, bookmark'd using "commit.author.date" in record

config.json

{
    "stream_name": "xtkt_github_commits",
    "source_type": "rest",
    "url": "https://api.github.com/repos/5amCurfew/xtkt/commits",
    "records": {
        "unique_key_path": ["sha"],
        "bookmark_path": ["commit", "author", "date"],
        "drop_field_paths": [
            ["author"],
            ["committer", "avatar_url"],
            ["committer", "events_url"]
        ],
        "sensitive_field_paths": [
            ["commit", "author", "email"],
            ["commit", "committer", "email"]
        ]
    },
    "rest": {
        "auth": {
            "required": true,
            "strategy": "token",
            "token": {
                "header": "Authorization",
                "header_value": "Bearer YOUR_GITHUB_API_TOKEN"
            }
        },
        "response": {
            "pagination": true,
            "pagination_strategy": "query",
            "pagination_query": {
                "query_parameter": "page",
                "query_value": 2,
                "query_increment": 1
            }
        }
    }
}

Strava API

Oauth authentication required, records returned immediately in an array, paginated using query parameter, bookmark'd using "start_date" in record

config.json

{
    "stream_name": "my_strava_activities",
    "source_type": "rest",
    "url": "https://www.strava.com/api/v3/athlete/activities",
    "records": {
        "unique_key_path": ["id"],
        "bookmark_path": ["start_date"]
    },
    "rest": {
        "auth": {
            "required": true,
            "strategy": "oauth",
            "oauth": {
                "client_id": "YOUR_CLIENT_ID",
                "client_secret": "YOUR_CLIENT_SECRET",
                "refresh_token": "YOUR_REFRESH_TOKEN",
                "token_url": "https://www.strava.com/oauth/token"
            }
        },
        "sleep": 1,
        "response": {
            "pagination": true,
            "pagination_strategy": "query",
            "pagination_query": {
                "query_parameter": "page",
                "query_value": 2,
                "query_increment": 1
            }
        }
    }
}

Postgres

config.json

{
    "stream_name": "rick_and_morty_characters_from_postgres",
    "source_type": "db",
    "url": "postgres://admin:admin@localhost:5432/postgres?sslmode=disable",
    "records": {
        "unique_key_path": ["id"]
    },
    "db": {
        "table": "rick_and_morty_characters"
    }
}

SQLite

config.json

{
    "stream_name": "sqlite_customers",
    "source_type": "db",
    "url": "sqlite:///example.db",
    "records": {
        "unique_key_path": ["id"],
        "bookmark_path": ["updated_at"]
    },
    "db": {
        "table": "customers"
    }
}

File

config.json

{
    "stream_name": "xtkt_jsonl",
    "source_type": "file",
    "url": "_config_json/data.jsonl",
    "records": {
        "unique_key_path": ["id"],
        "filter_field_paths": [
            {
                "field_path": ["sport"],
                "operation": "not_equal_to",
                "value": "Volleyball"
            },
            {
                "field_path": ["championships_won"],
                "operation": "greater_than",
                "value": 0
            }
        ],
        "sensitive_field_paths": [
            ["location", "address"]
        ]
    }
}

Listen

config.json (e.g. curl -X POST -H "Content-Type: application/json" -d '{"key1":"value1","key2":"value2"}' http://localhost:8080/messages)

{
    "stream_name": "listen_testing",
    "source_type": "listen",
    "url": "",
    "records": {
        "unique_key_path": ["key1"],
        "sensitive_field_paths": [
            ["key2"]
        ]
    },
    "listen":{
        "collection_interval": 10,
        "port": "8080"
    }
}

www.fifaindex.com/teams

Scrape team "overall" rating found within HTML table (beta)

config.json

{
    "stream_name": "fifa_team_ratings",
    "source_type": "html",
    "url": "https://www.fifaindex.com/teams/",
    "records": {
        "unique_key_path": ["name"]
    },
    "html": {
        "elements_path": "table.table-teams > tbody > tr",
        "elements": [
            {"name": "name", "path": "td[data-title='Name'] > a.link-team"},
            {"name": "league", "path": "td[data-title='League'] > a.link-league"},
            {"name": "overall", "path": "td[data-title='OVR'] > span.rating:nth-child(1)"}
        ]
    }
}

Directories ΒΆ

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL