{question}
How to Ingest data from MySQL to SingleStore using Aggregator Pipeline Syntax?
{question}
{answer}
In this article, We will be discussing ingesting data from MySQL to SingleStore using Aggreagator Pipeline Syntax,
Steps to follow:
- Setting the variable java_pipelines_java11_path;
set global java_pipelines_java11_path="/usr/bin/java";
- Creating a table in SingleStore to store the ingested MySQL data
CREATE TABLE `table_data_from_mysql` (
`id` int(11) NOT NULL,
`name` varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL,
PRIMARY KEY (`id`),
SHARD KEY `__SHARDKEY` (`id`),
SORT KEY `__UNORDERED` ()
) AUTOSTATS_CARDINALITY_MODE=INCREMENTAL AUTOSTATS_HISTOGRAM_MODE=CREATE AUTOSTATS_SAMPLING=ON SQL_MODE='STRICT_ALL_TABLES,NO_AUTO_CREATE_USER';
- Creating a Link to the MySQL endpoint:
CREATE LINK pLink AS MYSQL
CONFIG '{"database.hostname": "3.83.31.19", "database.exclude.list": "mysql,performance_schema,information_schema,sys", "database.port": 3306}'
CREDENTIALS '{"database.password": "****", "database.user": "CDC_USER"}';
- Creating a Stored Procedure to map the MySQL table to the SingleStore table and implement any required transformations:
delimiter $$
CREATE OR REPLACE PROCEDURE `Database.table_data_from_mysql`(__cdc_changes query(`__operation` int(11) NOT NULL, `id` int(11) NOT NULL, `name` varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL)) RETURNS void AS declare rowsDeleted int; begin
replace into `table_data_from_mysql` select `id`,
`name` from __cdc_changes where __cdc_changes.__operation != 1;
select count(*) into rowsDeleted from __cdc_changes where __cdc_changes.__operation = 1;
if rowsDeleted > 0 then
delete dest from `table_data_from_mysql` as dest inner join __cdc_changes using ( `id` ) where __cdc_changes.__operation = 1;
end if;
end $$
delimiter ;
- Creating the Aggregator Pipeline:
CREATE AGGREGATOR PIPELINE `Database.table_data_from_mysql`
AS LOAD DATA LINK pLink 'Database.mysqltable'
BATCH_INTERVAL 2500
MAX_PARTITIONS_PER_BATCH 1
DISABLE OFFSETS METADATA GC
REPLACE
KEY(`id`)
INTO PROCEDURE `Database.table_data_from_mysql`
FORMAT AVRO
WITH_TEMPORAL_CONVERSION
(
`__operation` <- `__operation`,
`id` <- `payload`::`id`,
`name` <- `payload`::`name`
);
- Start the Pipeline:-
##SQL Command to be run in SingleStore Studio
##To check the STATUS of pipeline created above
SHOW PIPELINES; (Check the status of pipelines)
###IF your pipelines are showing stopped run the command-
START PIPELINE PIPELINENAME;
OR
START ALL PIPELINES;
Related Links :
Load Data from MySQL to SingleStore Self-Managed
{answer}