{question}
How to skip a specific file in S3 Pipeline?
{question}
{answer}
SingleStore Pipelines is a feature that continuously loads data as it arrives from external sources. As a built-in component of the database, Pipelines can extract, shape (modify), and load external data without the need for third-party tools or middleware. Pipelines are robust, scalable, highly performant, and support fully distributed workloads.
A key feature of SingleStore Pipelines that make it a powerful alternative to third-party ETL middleware in many scenarios is Easy continuous loading. Pipelines monitor their source S3 folder and when new files arrive, automatically load them. This simplifies the job of the application developer.
Furthermore, pipelines natively offer exactly one semantics. In other words, the architecture of Pipelines ensures that transactions are processed exactly once, even in the event of a failover. For this reason, if there is an error in one particular file, then pipelines errors out and stop loading files. This behavior is default unless the PIPELINES_STOP_ON_ERROR engine variable is set to the non-default value OFF.
However, there may be cases where you might want to skip a specific file in an S3 bucket from loading. There may be some erroneous records in one file and removing it from the S3 bucket may be time taking and sometimes impossible in large enterprises (where different teams are responsible for data generation).
In those scenarios, it may be beneficial to use a command to skip one specific file from loading. This can be done by following the SQL command.
ALTER PIPELINE <PIPELINE_NAME> SET OFFSETS '{"<path_to_the_file>":1}';
Example:
For this demonstration, We will use typical TPCH data and try to load data into a database from files available in a public S3 bucket. Please see the below commands to create a pipeline and see the files that are in the S3 bucket.
DROP DATABASE IF EXISTS tpch;
CREATE DATABASE tpch;
USE tpch;
CREATE TABLE `lineitem` (
`l_orderkey` bigint(11) NOT NULL,
`l_partkey` int(11) NOT NULL,
`l_suppkey` int(11) NOT NULL,
`l_linenumber` int(11) NOT NULL,
`l_quantity` decimal(15,2) NOT NULL,
`l_extendedprice` decimal(15,2) NOT NULL,
`l_discount` decimal(15,2) NOT NULL,
`l_tax` decimal(15,2) NOT NULL,
`l_returnflag` char(1) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
`l_linestatus` char(1) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
`l_shipdate` date NOT NULL,
`l_commitdate` date NOT NULL,
`l_receiptdate` date NOT NULL,
`l_shipinstruct` char(25) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
`l_shipmode` char(10) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
`l_comment` varchar(44) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
UNIQUE KEY pk (`l_orderkey`, `l_linenumber`) UNENFORCED RELY ,
SHARD KEY (`l_orderkey`) using clustered columnstore
);
CREATE OR REPLACE PIPELINE order_mgmt_lineitem
AS LOAD DATA S3 'memsql-tpch-dataset/sf_100/lineitem/'
config '{"region":"us-east-1"}'
SKIP DUPLICATE KEY ERRORS
INTO TABLE lineitem
FIELDS TERMINATED BY '|'
LINES TERMINATED BY '|\n';
The below view stores information about files that have been extracted from a file system-like data source, such as Amazon S3. Each row represents a single file. Click here to learn more.
SELECT * FROM INFORMATION_SCHEMA.PIPELINES_FILES ORDER BY FILE_NAME;
Above commands will create a database, create a table, create a pipeline and list all the files available in S3. if you start this pipeline, then SingleStore will load all 300 files into the table.
If there is a need to skip one file (for instance if we would like to skip sf_100/lineitem/lineitem.tbl.9319.gz
a file, then we need to run the following command so as to trick the pipeline into not loading that file.
The ALTER PIPELINE command to mark a file already loaded is as below,
ALTER PIPELINE order_mgmt_lineitem SET OFFSETS '{"sf_100/lineitem/lineitem.tbl.9319.gz":1}';
Now after running above, this file will be marked as Loaded and the pipeline will not load data in that file.
Click here to learn about varies use cases of SET OFFSETS.
Additional References:
View and Handle Pipeline Errors
{answer}