README
¶
pushx - cloud agnostic data push
pushx is a cloud agnostic abstraction binary for pushing data to various persistence layers.
pushx is a single compiled binary that can be packaged in your existing job code container, configured with environment variables or command line flags, and included in any existing data pipelines and workflows.
Used in conjunction with procx, pushx can be used to build entirely cloud and provider agnostic data pipelines.
Execution
pushx is configured with either environment variables or a set of command line flags, and accepts data input either from stdin, a file, or command line arguments.
# cli args
echo -n hello world | pushx -driver redis-list ...
# or, env vars
export PUSHX_DRIVER=redis-list
...
echo -n hello world | pushx
Payload
By default, pushx will read input data from stdin. If -in-file is provided, pushx will read input data from the specified file, and if -in is provided, pushx will read input data from the specified command line argument.
Relational Driver JSON Parsing
For drivers which are non-structured (ex. fs, aws-s3, redis-list, etc.), pushx will send the input data as-is to the driver. However for drivers which enforce some relational schema such as SQL-based drivers, you will need to provide an input query which will be executed to insert the input data. You can provide a {{pushx_payload}} placeholder in your query / parameters which will be replaced with the entire input data. For example:
echo 'the data' | pushx -driver postgres \
...
-psql-query "INSERT INTO table (data) VALUES ($1)"
-psql-params "{{pushx_payload}}"
However if your input data is a JSON object, you may want to convert this to a relational format when inserting into your column-oriented database. You can use {{mustache}} syntax to extract specific fields from the input data and insert them into your query. For example:
echo '{"id": 1, "name": "John"}' | pushx -driver postgres \
...
-psql-query "INSERT INTO table (id, name) VALUES ($1, $2)"
-psql-params "{{id}},{{name}}"
This also supports deeply nested fields, using gjson syntax.
echo '{"id": 1, "name": "John", "address": {"street": "123 Main St", "city": "Anytown"}}' | pushx -driver postgres \
...
-psql-query "INSERT INTO table (id, name, street, city) VALUES ($1, $2, $3, $4)"
-psql-params "{{id}},{{name}},{{address.street}},{{address.city}}"
Drivers
Currently, the following drivers are supported:
- ActiveMQ (
activemq) - AWS DynamoDB (
aws-dynamo) - AWS S3 (
aws-s3) - AWS SQS (
aws-sqs) - Cassandra (
cassandra) - Centauri (
centauri) - Elasticsearch (
elasticsearch) - FS (
fs) - GCP Big Query (
gcp-bq) - GCP Cloud Storage (
gcp-gcs) - GCP Firestore (
gcp-firestore) - GCP Pub/Sub (
gcp-pubsub) - HTTP (
http) - Kafka (
kafka) - PostgreSQL (
postgres) - Pulsar (
pulsar) - MongoDB (
mongodb) - MySQL (
mysql) - NATS (
nats) - NFS (
nfs) - NSQ (
nsq) - RabbitMQ (
rabbitmq) - Redis List (
redis-list) - Redis Pub/Sub (
redis-pubsub) - Redis Stream (
redis-stream) - Local (
local)
Plans to add more drivers in the future, and PRs are welcome.
See Driver Examples for more information.
Install
curl -SsL https://raw.githubusercontent.com/robertlestak/pushx/main/scripts/install.sh | bash -e
A note on permissions
Depending on the path of INSTALL_DIR and the permissions of the user running the installation script, you may get a Permission Denied error if you are trying to move the binary into a location which your current user does not have access to. This is most often the case when running the script as a non-root user yet trying to install into /usr/local/bin. To fix this, you can either:
Create a $HOME/bin directory in your current user home directory. This will be the default installation directory. Be sure to add this to your $PATH environment variable.
Use sudo to run the installation script, to install into /usr/local/bin
curl -SsL https://raw.githubusercontent.com/robertlestak/pushx/main/scripts/install.sh | sudo bash -e
Build From Source
mkdir -p bin
go build -o bin/pushx cmd/pushx/*.go
Building for a Specific Driver
By default, the pushx binary is compiled for all drivers. This is to enable a truly build-once-run-anywhere experience. However some users may want a smaller binary for embedded workloads. To enable this, you can run make listdrivers to get the full list of available drivers, and make slim drivers="driver1 driver2 driver3 ..." - listing each driver separated by a space - to build a slim binary with just the specified driver(s).
While building for a specific driver may seem contrary to the ethos of pushx, the decoupling between the job queue and work still enables a write-once-run-anywhere experience, and simply requires DevOps to rebuild the image with your new drivers if you are shifting upstream data sources.
Usage
Usage: pushx [options]
-activemq-address string
ActiveMQ STOMP address
-activemq-enable-tls
Enable TLS
-activemq-name string
ActiveMQ name
-activemq-tls-ca-file string
TLS CA
-activemq-tls-cert-file string
TLS cert
-activemq-tls-insecure
Enable TLS insecure
-activemq-tls-key-file string
TLS key
-aws-dynamo-clear-query string
AWS DynamoDB clear query
-aws-dynamo-data-path string
AWS DynamoDB data JSON path
-aws-dynamo-fail-query string
AWS DynamoDB fail query
-aws-dynamo-key-path string
AWS DynamoDB query key JSON path
-aws-dynamo-retrieve-query string
AWS DynamoDB retrieve query
-aws-dynamo-table string
AWS DynamoDB table name
-aws-load-config
load AWS config from ~/.aws/config
-aws-region string
AWS region
-aws-role-arn string
AWS role ARN
-aws-s3-acl string
AWS S3 ACL
-aws-s3-bucket string
AWS S3 bucket
-aws-s3-key string
AWS S3 key
-aws-s3-tags string
AWS S3 tags. Comma separated list of key=value pairs
-aws-sqs-queue-url string
AWS SQS queue URL
-cassandra-consistency string
Cassandra consistency (default "QUORUM")
-cassandra-hosts string
Cassandra hosts
-cassandra-keyspace string
Cassandra keyspace
-cassandra-password string
Cassandra password
-cassandra-query string
Cassandra query
-cassandra-params string
Cassandra query params
-cassandra-user string
Cassandra user
-centauri-channel string
Centauri channel (default "default")
-centauri-filename string
Centauri filename
-centauri-message-type string
Centauri message type. One of: bytes, file (default "bytes")
-centauri-peer-url string
Centauri peer URL
-centauri-public-key string
Centauri public key
-centauri-public-key-base64 string
Centauri public key base64
-driver string
driver to use. (activemq, aws-dynamo, aws-s3, aws-sqs, cassandra, centauri, elasticsearch, fs, gcp-bq, gcp-firestore, gcp-gcs, gcp-pubsub, http, kafka, local, mongodb, mysql, nats, nfs, nsq, postgres, pulsar, rabbitmq, redis-list, redis-pubsub, redis-stream)
-elasticsearch-address string
Elasticsearch address
-elasticsearch-doc-id string
Elasticsearch doc id
-elasticsearch-enable-tls
Elasticsearch enable TLS
-elasticsearch-index string
Elasticsearch index
-elasticsearch-password string
Elasticsearch password
-elasticsearch-tls-ca-file string
Elasticsearch TLS CA file
-elasticsearch-tls-cert-file string
Elasticsearch TLS cert file
-elasticsearch-tls-key-file string
Elasticsearch TLS key file
-elasticsearch-tls-skip-verify
Elasticsearch TLS skip verify
-elasticsearch-username string
Elasticsearch username
-fs-folder string
FS folder
-fs-key string
FS key
-gcp-bq-query string
GCP BigQuery query
-gcp-firestore-collection string
GCP Firestore collection
-gcp-firestore-id string
GCP Firestore document ID. If empty, a new document ID will be created
-gcp-gcs-bucket string
GCP GCS bucket
-gcp-gcs-key string
GCP GCS key
-gcp-project-id string
GCP project ID
-gcp-pubsub-topic string
GCP Pub/Sub topic name
-http-content-type string
HTTP content type
-http-enable-tls
HTTP enable tls
-http-headers string
HTTP headers
-http-method string
HTTP method (default "POST")
-http-successful-status-codes string
HTTP successful status codes
-http-tls-ca-file string
HTTP tls ca file
-http-tls-cert-file string
HTTP tls cert file
-http-tls-insecure
HTTP tls insecure
-http-tls-key-file string
HTTP tls key file
-http-url string
HTTP url
-in string
input string to use. Will take precedence over -in-file
-in-file string
input file to use. (default: stdin) (default "-")
-kafka-brokers string
Kafka brokers, comma separated
-kafka-enable-sasl
Enable SASL
-kafka-enable-tls
Enable TLS
-kafka-sasl-password string
Kafka SASL password
-kafka-sasl-type string
Kafka SASL type. Can be either 'scram' or 'plain'
-kafka-sasl-username string
Kafka SASL user
-kafka-tls-ca-file string
Kafka TLS CA file
-kafka-tls-cert-file string
Kafka TLS cert file
-kafka-tls-insecure
Enable TLS insecure
-kafka-tls-key-file string
Kafka TLS key file
-kafka-topic string
Kafka topic
-mongo-auth-source string
MongoDB auth source
-mongo-collection string
MongoDB collection
-mongo-database string
MongoDB database
-mongo-enable-tls
Enable TLS
-mongo-host string
MongoDB host
-mongo-password string
MongoDB password
-mongo-port string
MongoDB port (default "27017")
-mongo-tls-ca-file string
Mongo TLS CA file
-mongo-tls-cert-file string
Mongo TLS cert file
-mongo-tls-insecure
Enable TLS insecure
-mongo-tls-key-file string
Mongo TLS key file
-mongo-user string
MongoDB user
-mysql-database string
MySQL database
-mysql-host string
MySQL host
-mysql-password string
MySQL password
-mysql-port string
MySQL port (default "3306")
-mysql-query string
MySQL query
-mysql-params string
MySQL query params
-mysql-user string
MySQL user
-nats-creds-file string
NATS creds file
-nats-enable-tls
NATS enable TLS
-nats-jwt-file string
NATS JWT file
-nats-nkey-file string
NATS NKey file
-nats-password string
NATS password
-nats-subject string
NATS subject
-nats-tls-ca-file string
NATS TLS CA file
-nats-tls-cert-file string
NATS TLS cert file
-nats-tls-insecure
NATS TLS insecure
-nats-tls-key-file string
NATS TLS key file
-nats-token string
NATS token
-nats-url string
NATS URL
-nats-username string
NATS username
-nfs-folder string
NFS folder
-nfs-host string
NFS host
-nfs-key string
NFS key
-nfs-target string
NFS target
-nsq-enable-tls
Enable TLS
-nsq-nsqd-address string
NSQ nsqd address
-nsq-nsqlookupd-address string
NSQ nsqlookupd address
-nsq-tls-ca-file string
NSQ TLS CA file
-nsq-tls-cert-file string
NSQ TLS cert file
-nsq-tls-key-file string
NSQ TLS key file
-nsq-tls-skip-verify
NSQ TLS skip verify
-nsq-topic string
NSQ topic
-psql-database string
PostgreSQL database
-psql-host string
PostgreSQL host
-psql-password string
PostgreSQL password
-psql-port string
PostgreSQL port (default "5432")
-psql-query string
PostgreSQL query
-psql-params string
PostgreSQL query params
-psql-ssl-mode string
PostgreSQL SSL mode (default "disable")
-psql-user string
PostgreSQL user
-pulsar-address string
Pulsar address
-pulsar-auth-cert-file string
Pulsar auth cert file
-pulsar-auth-key-file string
Pulsar auth key file
-pulsar-auth-oauth-params string
Pulsar auth oauth params
-pulsar-auth-token string
Pulsar auth token
-pulsar-auth-token-file string
Pulsar auth token file
-pulsar-producer-name string
Pulsar producer name
-pulsar-tls-allow-insecure-connection
Pulsar TLS allow insecure connection
-pulsar-tls-trust-certs-file string
Pulsar TLS trust certs file path
-pulsar-tls-validate-hostname
Pulsar TLS validate hostname
-pulsar-topic string
Pulsar topic
-rabbitmq-exchange string
RabbitMQ exchange
-rabbitmq-queue string
RabbitMQ queue
-rabbitmq-url string
RabbitMQ URL
-redis-enable-tls
Enable TLS
-redis-host string
Redis host
-redis-key string
Redis key
-redis-message-id string
Redis stream message id (default "*")
-redis-password string
Redis password
-redis-port string
Redis port (default "6379")
-redis-tls-ca-file string
Redis TLS CA file
-redis-tls-cert-file string
Redis TLS cert file
-redis-tls-key-file string
Redis TLS key file
-redis-tls-skip-verify
Redis TLS skip verify
Environment Variables
AWS_REGIONAWS_SDK_LOAD_CONFIGLOG_LEVELNSQ_LOG_LEVELPUSHX_ACTIVEMQ_ADDRESSPUSHX_ACTIVEMQ_ENABLE_TLSPUSHX_ACTIVEMQ_NAMEPUSHX_ACTIVEMQ_TLS_CA_FILEPUSHX_ACTIVEMQ_TLS_CERT_FILEPUSHX_ACTIVEMQ_TLS_INSECUREPUSHX_ACTIVEMQ_TLS_KEY_FILEPUSHX_AWS_DYNAMO_TABLEPUSHX_AWS_LOAD_CONFIGPUSHX_AWS_REGIONPUSHX_AWS_ROLE_ARNPUSHX_AWS_S3_ACLPUSHX_AWS_S3_BUCKETPUSHX_AWS_S3_KEYPUSHX_AWS_S3_TAGSPUSHX_AWS_SQS_QUEUE_URLPUSHX_AWS_SQS_ROLE_ARNPUSHX_CASSANDRA_CONSISTENCYPUSHX_CASSANDRA_HOSTSPUSHX_CASSANDRA_KEYSPACEPUSHX_CASSANDRA_PARAMSPUSHX_CASSANDRA_PASSWORDPUSHX_CASSANDRA_QUERYPUSHX_CASSANDRA_USERPUSHX_CENTAURI_CHANNELPUSHX_CENTAURI_FILENAMEPUSHX_CENTAURI_MESSAGE_TYPEPUSHX_CENTAURI_PEER_URLPUSHX_CENTAURI_PUBLIC_KEYPUSHX_CENTAURI_PUBLIC_KEY_BASE64PUSHX_DRIVERPUSHX_ELASTICSEARCH_ADDRESSPUSHX_ELASTICSEARCH_DOC_IDPUSHX_ELASTICSEARCH_ENABLE_TLSPUSHX_ELASTICSEARCH_INDEXPUSHX_ELASTICSEARCH_PASSWORDPUSHX_ELASTICSEARCH_TLS_CA_FILEPUSHX_ELASTICSEARCH_TLS_CERT_FILEPUSHX_ELASTICSEARCH_TLS_KEY_FILEPUSHX_ELASTICSEARCH_TLS_SKIP_VERIFYPUSHX_ELASTICSEARCH_USERNAMEPUSHX_FS_FOLDERPUSHX_FS_KEYPUSHX_GCP_BQ_QUERYPUSHX_GCP_FIRESTORE_COLLECTIONPUSHX_GCP_FIRESTORE_IDPUSHX_GCP_GCS_BUCKETPUSHX_GCP_GCS_KEYPUSHX_GCP_PROJECT_IDPUSHX_GCP_TOPICPUSHX_HTTP_ENABLE_TLSPUSHX_HTTP_REQUEST_CONTENT_TYPEPUSHX_HTTP_REQUEST_HEADERSPUSHX_HTTP_REQUEST_METHODPUSHX_HTTP_REQUEST_SUCCESSFUL_STATUS_CODESPUSHX_HTTP_REQUEST_URLPUSHX_HTTP_TLS_CA_FILEPUSHX_HTTP_TLS_CERT_FILEPUSHX_HTTP_TLS_KEY_FILEPUSHX_INPUT_FILEPUSHX_INPUT_STRPUSHX_KAFKA_BROKERSPUSHX_KAFKA_ENABLE_SASLPUSHX_KAFKA_ENABLE_TLSPUSHX_KAFKA_SASL_PASSWORDPUSHX_KAFKA_SASL_TYPEPUSHX_KAFKA_SASL_USERNAMEPUSHX_KAFKA_TLS_CA_FILEPUSHX_KAFKA_TLS_CERT_FILEPUSHX_KAFKA_TLS_INSECUREPUSHX_KAFKA_TLS_KEY_FILEPUSHX_KAFKA_TOPICPUSHX_MONGO_AUTH_SOURCEPUSHX_MONGO_COLLECTIONPUSHX_MONGO_DATABASEPUSHX_MONGO_ENABLE_TLSPUSHX_MONGO_HOSTPUSHX_MONGO_PASSWORDPUSHX_MONGO_PORTPUSHX_MONGO_TLS_CA_FILEPUSHX_MONGO_TLS_CERT_FILEPUSHX_MONGO_TLS_INSECUREPUSHX_MONGO_TLS_KEY_FILEPUSHX_MONGO_USERPUSHX_MYSQL_DATABASEPUSHX_MYSQL_HOSTPUSHX_MYSQL_PASSWORDPUSHX_MYSQL_PORTPUSHX_MYSQL_QUERYPUSHX_MYSQL_QUERY_PARAMSPUSHX_MYSQL_USERPUSHX_NATS_CREDS_FILEPUSHX_NATS_ENABLE_TLSPUSHX_NATS_JWT_FILEPUSHX_NATS_NKEY_FILEPUSHX_NATS_PASSWORDPUSHX_NATS_SUBJECTPUSHX_NATS_TLS_CA_FILEPUSHX_NATS_TLS_CERT_FILEPUSHX_NATS_TLS_INSECUREPUSHX_NATS_TLS_KEY_FILEPUSHX_NATS_TOKENPUSHX_NATS_URLPUSHX_NATS_USERNAMEPUSHX_NFS_FOLDERPUSHX_NFS_HOSTPUSHX_NFS_KEYPUSHX_NFS_TARGETPUSHX_NSQ_ENABLE_TLSPUSHX_NSQ_NSQD_ADDRESSPUSHX_NSQ_NSQLOOKUPD_ADDRESSPUSHX_NSQ_TLS_CA_FILEPUSHX_NSQ_TLS_CERT_FILEPUSHX_NSQ_TLS_INSECUREPUSHX_NSQ_TLS_KEY_FILEPUSHX_NSQ_TOPICPUSHX_PSQL_DATABASEPUSHX_PSQL_HOSTPUSHX_PSQL_PASSWORDPUSHX_PSQL_PORTPUSHX_PSQL_QUERYPUSHX_PSQL_QUERY_PARAMSPUSHX_PSQL_SSL_MODEPUSHX_PSQL_USERPUSHX_PULSAR_ADDRESSPUSHX_PULSAR_AUTH_CERT_FILEPUSHX_PULSAR_AUTH_KEY_FILEPUSHX_PULSAR_AUTH_OAUTH_PARAMSPUSHX_PULSAR_AUTH_TOKENPUSHX_PULSAR_AUTH_TOKEN_FILEPUSHX_PULSAR_PRODUCER_NAMEPUSHX_PULSAR_TLS_ALLOW_INSECURE_CONNECTIONPUSHX_PULSAR_TLS_TRUST_CERTS_FILEPUSHX_PULSAR_TLS_VALIDATE_HOSTNAMEPUSHX_PULSAR_TOPICPUSHX_RABBITMQ_QUEUEPUSHX_RABBITMQ_URLPUSHX_REDIS_ENABLE_TLSPUSHX_REDIS_HOSTPUSHX_REDIS_KEYPUSHX_REDIS_MESSAGE_IDPUSHX_REDIS_PASSWORDPUSHX_REDIS_PORTPUSHX_REDIS_TLS_CA_FILEPUSHX_REDIS_TLS_CERT_FILEPUSHX_REDIS_TLS_INSECUREPUSHX_REDIS_TLS_KEY_FILE
Driver Examples
ActiveMQ
The ActiveMQ driver will connect to the specified STOMP address and send the data to the specified queue / topic. TLS is optional and shown below, if not used the flags are not required.
echo hello | pushx \
-driver activemq \
-activemq-address localhost:61613 \
-activemq-name my-queue \
-activemq-enable-tls \
-activemq-tls-ca-file /path/to/ca.pem \
-activemq-tls-cert-file /path/to/cert.pem \
-activemq-tls-key-file /path/to/key.pem
AWS DynamoDB
The AWS DynamoDB driver will insert the specified JSON document into DynamoDB.
echo '{"hello": "world"}' | pushx \
-driver aws-dynamo \
-aws-dynamo-table my-table \
-aws-region us-east-1 \
-aws-role-arn arn:aws:iam::123456789012:role/my-role
AWS S3
The S3 driver will upload the specified data to the specified S3 bucket.
pushx \
-driver aws-s3 \
-in-file /path/to/file.txt \
-aws-s3-bucket my-bucket \
-aws-s3-key 'example-object' \
-aws-s3-acl public-read \
-aws-s3-tags 'Hello=World,Foo=Bar'
AWS SQS
The SQS driver will send the specified data to the specified SQS queue.
For cross-account access, you must provide the ARN of the role that has access to the queue, and the identity running pushx must be able to assume the target identity.
If running on a developer workstation, you will most likely want to pass your ~/.aws/config identity. To do so, pass the -aws-load-config flag.
echo hello | pushx \
-aws-sqs-queue-url https://sqs.us-east-1.amazonaws.com/123456789012/my-queue \
-aws-role-arn arn:aws:iam::123456789012:role/my-role \
-aws-region us-east-1 \
-driver aws-sqs
Cassandra
The Cassandra driver will submit the data to the specified keyspace table. If the -cassandra-query contains a {{pushx_payload}} placeholder, the entire input data will be substituted for the placeholder. However if the input data is a JSON document, value keys can be substituted using mustache-style syntax.
echo '{"hello": "world", "another": {"nested": "value"}}' | pushx \
-cassandra-keyspace mykeyspace \
-cassandra-consistency QUORUM \
-cassandra-hosts "localhost:9042,another:9042" \
-cassandra-query 'INSERT INTO mykeyspace.mytable (hello, another) VALUES (?, ?)' \
-cassandra-params '{{hello}}, {{another.nested}}' \
-driver cassandra
Centauri
The centauri driver integrates with a Centauri network to send the input data to the specified public key.
echo hello | pushx \
-centauri-channel my-channel \
-centauri-public-key "$(</path/to/public.pem)" \
-centauri-peer-url https://api.test-peer1.centauri.sh \
-driver centauri
Elasticsearch
The Elasticsearch driver will insert the specified document into Elasticsearch.
echo '{"hello": "world"}' | pushx \
-elasticsearch-address https://localhost:9200 \
-elasticsearch-username elastic \
-elasticsearch-password elastic \
-elasticsearch-tls-skip-verify \
-elasticsearch-index my-index \
-driver elasticsearch
FS
The fs driver will write the input data to the specified file.
echo hello | pushx \
-fs-folder /path/to/folder \
-fs-key "my-file.txt" \
-driver fs
GCP BQ
The gcp-bq driver will insert the specified document into BigQuery. If the input data is a JSON document, value keys can be substituted using mustache-style syntax.
export GOOGLE_APPLICATION_CREDENTIALS="/path/to/credentials.json"
echo '{"id": 1, "name": "hello", "another": "value"}' | pushx \
-gcp-project-id my-project \
-gcp-bq-dataset my-dataset \
-gcp-bq-table my-table \
-gcp-bq-query "INSERT INTO mydatatest.mytable (id, name, another) VALUES ({{id}}, '{{name}}', '{{another}}')" \
-driver gcp-bq
GCP GCS
The GCS driver will upload the specified data to the specified GCS bucket.
export GOOGLE_APPLICATION_CREDENTIALS=/path/to/credentials.json
pushx \
-driver gcp-gcs \
-gcp-project-id my-project \
-payload-file my-payload.json \
-gcp-gcs-bucket my-bucket \
-gcp-gcs-key 'example.json'
GCP Firestore
The GCP Firestore driver will insert the specified document into Firestore.
echo '{"id": 1, "name": "hello", "another": "value"}' | pushx \
-driver gcp-firestore \
-gcp-project-id my-project \
-gcp-firestore-collection my-collection
GCP Pub/Sub
The GCP Pub/Sub driver will publish the specified data to the specified Pub/Sub topic.
export GOOGLE_APPLICATION_CREDENTIALS=/path/to/credentials.json
echo hello | pushx \
-gcp-project-id my-project \
-gcp-pubsub-topic my-topic \
-driver gcp-pubsub
HTTP
The HTTP driver will connect to any HTTP(s) endpoint and submit the input data as a HTTP request. By default, the POST method is used. If using internal PKI, mTLS, or disabling TLS validation, pass the -http-enable-tls flag and the corresponding TLS flags.
echo hello | pushx \
-http-url https://example.com/jobs \
-http-headers 'ExampleToken:foobar,ExampleHeader:barfoo' \
-driver http
Kafka
The Kafka driver will submit the input data to the specified Kafka topic. Similar to the Pub/Sub drivers, if there are no messages in the topic when the process starts, it will wait for the first message. TLS and SASL authentication are optional.
echo hello | pushx \
-kafka-brokers localhost:9092 \
-kafka-topic my-topic \
-kafka-enable-tls \
-kafka-tls-ca-file /path/to/ca.pem \
-kafka-tls-cert-file /path/to/cert.pem \
-kafka-tls-key-file /path/to/key.pem \
-kafka-enable-sasl \
-kafka-sasl-type plain \
-kafka-sasl-username my-username \
-kafka-sasl-password my-password \
-driver kafka
MongoDB
The MongoDB driver will insert the specified document into MongoDB.
echo '{"id": 1, "name": "hello", "another": "value"}' | pushx \
-mongo-collection my-collection \
-mongo-database my-database \
-mongo-host localhost \
-mongo-port 27017 \
-mongo-user my-user \
-mongo-password my-password \
-driver mongodb
MySQL
The MySQL driver will insert the specified document into MySQL. If the input data is a JSON document, value keys can be substituted using mustache-style syntax.
echo '{"id": 1, "name": "hello", "another": "value"}' | pushx \
-mysql-host localhost \
-mysql-port 3306 \
-mysql-database mydb \
-mysql-user myuser \
-mysql-password mypassword \
-mysql-query "INSERT INTO example (id, name, another) VALUES (?, ?, ?)" \
-mysql-params "{{id}},{{name}},{{another}}" \
-driver mysql
NATS
The NATS driver will publish the specified data to the specified NATS topic.
echo hello | pushx \
-nats-subject my-subject \
-nats-url localhost:4222 \
-nats-username my-user \
-nats-password my-password \
-nats-enable-tls \
-nats-tls-ca-file /path/to/ca.pem \
-nats-tls-cert-file /path/to/cert.pem \
-nats-tls-key-file /path/to/key.pem \
-driver nats
NFS
The nfs driver will mount the specified NFS directory, and write the input data to the specified file.
echo hello | pushx \
-nfs-host nfs.example.com \
-nfs-target /path/to/nfs \
-nfs-folder /path/to/folder/in/nfs \
-nfs-key "my-object" \
-driver nfs
NSQ
The NSQ driver will connect to the specified nsqlookupd or nsqd endpoint and submit the input data as a PUB message.
echo hello | pushx \
-nsq-nsqlookupd-address localhost:4161 \
-nsq-topic my-topic \
-driver nsq
PostgreSQL
The PostgreSQL driver will insert the specified document into PostgreSQL. If the input data is a JSON document, value keys can be substituted using mustache-style syntax.
echo 'full payload' | pushx \
-psql-host localhost \
-psql-port 5432 \
-psql-database mydb \
-psql-user myuser \
-psql-password mypassword \
-psql-query "INSERT INTO example (payload) VALUES (?)" \
-psql-params "{{pushx_payload}}" \
-driver postgres
Pulsar
The Pulsar driver will connect to the specified comma-separated Pulsar endpoint(s) and submit the input data as a PUB message.
echo hello | pushx \
-pulsar-address localhost:6650,localhost:6651 \
-pulsar-topic my-topic \
-pulsar-producer-name my-producer \
-pulsar-auth-cert-file /path/to/cert.pem \
-pulsar-auth-key-file /path/to/key.pem \
-pulsar-tls-trust-certs-file /path/to/trusted.pem \
-driver pulsar
RabbitMQ
The RabbitMQ driver will connect to the specified queue AMQP endpoint and submit the input data as a PUB message.
echo hello | pushx \
-rabbitmq-url amqp://guest:guest@localhost:5672 \
-rabbitmq-queue my-queue \
-driver rabbitmq
Redis List
The Redis List driver will connect to the specified Redis server and push the input data to the specified list.
echo hello | pushx \
-redis-host localhost \
-redis-port 6379 \
-redis-key my-list \
-driver redis-list
Redis Pub/Sub
The Redis Pub/Sub driver will connect to the specified Redis server and publish the input data to the specified topic.
echo hello | pushx \
-redis-host localhost \
-redis-port 6379 \
-redis-key my-subscription \
-driver redis-pubsub
Redis Stream
The Redis Stream driver will connect to the specified Redis server and push the input data to the specified stream.
echo '{"id": 1, "name": "hello", "another": "value"}' | pushx \
-redis-host localhost \
-redis-port 6379 \
-redis-key my-stream \
-driver redis-stream
Local
The local driver is a simple wrapper around the process to execute, primarily for local testing. It does not communicate with any queue, and simply writes the input data to the specified output (default is stdout).
echo hello | pushx \
-driver local