{question}
How do I use pipelines?
{question}
{answer}
Pipelines can be used to fetch information from different sources of data.
Each data source requires different information to be created for the pipeline.
Requirements include a database and a table to save the data.
The following is an example of a Pipeline where the data originates from a Kafka source for Twitter tweets.
Example:
Start by creating a table with a field id and a field tweet as JSON data type.
CREATE TABLE tweets (
-- These fields are present in each record we get from Kafka.
-- `tweet` is an arbitrary JSON blob.
id BIGINT,
tweet JSON);
The pipeline can then be created by:
-- A MemSQL Pipeline. Everything inside "CREATE PIPELINE AS"
-- is a normal "LOAD DATA" statement.
CREATE PIPELINE twitter_pipeline AS
-- The "source" of this pipeline is a Kafka broker and topic.
LOAD DATA KAFKA "public-kafka.memcompute.com:9092/tweets-json"
-- The "sink" of this pipeline is a MemSQL Table. In this case, our
-- destination table has a unique key, so we REPLACE rows if we get
-- a new record with a key that already exists in the table.
INTO TABLE tweets
-- Our example Kafka topic contains tab-separated data: a tweet ID
-- and a JSON blob representing a tweet.
FIELDS TERMINATED BY "\t"
-- Our tab-separated data from Kafka will be written to these
-- two columns in the destination table.
(id, tweet);
After the pipeline is created, it requires to be manually started. When you first start your pipeline, you may not see any information, as the pipeline process is started as a background process.
To test your pipeline, you can add the FOREGROUND attribute to the START PIPELINE command below:
memsql> START PIPELINE twitter_pipeline;
Query OK, 0 rows affected (0.06 sec)
To verify if the pipeline is running you can issue the SHOW PIPELINES commands below:
memsql> SHOW PIPELINES;
+----------------------------+-----------------------------------------------------------------------------------------------+-----------+
| Pipelines_in_example | State | Scheduled |
+----------------------------+-----------------------------------------------------------------------------------------------+-----------+
| twitter_pipeline | Running | False |
+----------------------------+-----------------------------------------------------------------------------------------------+-----------+
1 rows in set (0.00 sec)
To stop the pipeline injection, you can issue the STOP PIPELINE command below:
memsql> STOP PIPELINE twitter_pipeline;
Query OK, 0 rows affected (0.04 sec)
Learn more about Pipelines Documentation at SingleStore.
{answer}