{question}
How to Load Data from MongoDB Atlas to SingleStore Self-Managed using Change Data Capture (CDC-IN)?
{question}
{answer}
In this article, we will discuss how to load data from MongoDB Atlas to an on-premises SingleStore using the CDC-IN feature.
- Deploy a MongoDB atlas cluster on Mongo Cloud. Click here for the same.
- In network access, open the access for everyone or whitelist your instance IP address.
- In the connection string, make sure to use each MongoDB replica set nodes hostname and port in the order of "PrimaryNodeHost:PrimaryNodePort, SecondaryNodeHost:econdaryNodePort, SecondaryNodeHost:SecondaryNodePort". This is the recommended method for the pipeline to recognize the complete replica set at MongoDB Atlas.
- In MongoDB, we have only two types of clusters: a Replica Set or a Sharded Cluster.
- To get the hostname and port of the primary & secondary nodes, you can click on your cluster and hover over the nodes under the Regions section.
- Another method to get the hostname and port of the primary & secondary nodes would be to connect to the MongoDB cluster and, in the terminal, run the 'rs.status()' command; you will get a similar response to the one shown below. From here, you can check which node is primary and which is secondary.
{
set: 'atlas-ah1wa9-shard-0',
date: 2024-04-23T10:31:02.250Z,
myState: 1,
term: Long('529'),
syncSourceHost: '',
syncSourceId: -1,
heartbeatIntervalMillis: Long('2000'),
majorityVoteCount: 2,
writeMajorityCount: 2,
votingMembersCount: 3,
writableVotingMembersCount: 3,
optimes: {
lastCommittedOpTime: { ts: Timestamp({ t: 1713868262, i: 1007 }), t: Long('529') },
lastCommittedWallTime: 2024-04-23T10:31:02.227Z,
readConcernMajorityOpTime: { ts: Timestamp({ t: 1713868262, i: 1007 }), t: Long('529') },
appliedOpTime: { ts: Timestamp({ t: 1713868262, i: 1007 }), t: Long('529') },
durableOpTime: { ts: Timestamp({ t: 1713868262, i: 1007 }), t: Long('529') },
lastAppliedWallTime: 2024-04-23T10:31:02.227Z,
lastDurableWallTime: 2024-04-23T10:31:02.227Z
},
lastStableRecoveryTimestamp: Timestamp({ t: 1713868247, i: 1114 }),
electionCandidateMetrics: {
lastElectionReason: 'priorityTakeover',
lastElectionDate: 2024-04-22T02:05:52.000Z,
electionTerm: Long('529'),
lastCommittedOpTimeAtElection: { ts: Timestamp({ t: 1713751551, i: 12 }), t: Long('528') },
lastSeenOpTimeAtElection: { ts: Timestamp({ t: 1713751551, i: 12 }), t: Long('528') },
numVotesNeeded: 2,
priorityAtElection: 7.5,
electionTimeoutMillis: Long('5000'),
priorPrimaryMemberId: 2,
numCatchUpOps: Long('0'),
newTermStartDate: 2024-04-22T02:05:52.137Z,
wMajorityWriteAvailabilityDate: 2024-04-22T02:05:52.246Z
},
members: [
{
_id: 0,
name: 'ac-v3am6ob-shard-00-00.crpzkmv.mongodb.net:27017',
health: 1,
state: 2,
stateStr: 'SECONDARY',
uptime: 116435,
optime: [Object],
optimeDurable: [Object],
optimeDate: 2024-04-23T10:31:02.000Z,
optimeDurableDate: 2024-04-23T10:31:02.000Z,
lastAppliedWallTime: 2024-04-23T10:31:02.227Z,
lastDurableWallTime: 2024-04-23T10:31:02.227Z,
lastHeartbeat: 2024-04-23T10:31:02.028Z,
lastHeartbeatRecv: 2024-04-23T10:31:01.920Z,
pingMs: Long('0'),
lastHeartbeatMessage: '',
syncSourceHost: 'ac-v3am6ob-shard-00-01.crpzkmv.mongodb.net:27017',
syncSourceId: 1,
infoMessage: '',
configVersion: 47,
configTerm: 529
},
{
_id: 1,
name: 'ac-v3am6ob-shard-00-01.crpzkmv.mongodb.net:27017',
health: 1,
state: 1,
stateStr: 'PRIMARY',
uptime: 116751,
optime: [Object],
optimeDate: 2024-04-23T10:31:02.000Z,
lastAppliedWallTime: 2024-04-23T10:31:02.227Z,
lastDurableWallTime: 2024-04-23T10:31:02.227Z,
syncSourceHost: '',
syncSourceId: -1,
infoMessage: '',
electionTime: Timestamp({ t: 1713751552, i: 1 }),
electionDate: 2024-04-22T02:05:52.000Z,
configVersion: 47,
configTerm: 529,
self: true,
lastHeartbeatMessage: ''
},
{
_id: 2,
name: 'ac-v3am6ob-shard-00-02.crpzkmv.mongodb.net:27017',
health: 1,
state: 2,
stateStr: 'SECONDARY',
uptime: 116055,
optime: [Object],
optimeDurable: [Object],
optimeDate: 2024-04-23T10:31:02.000Z,
optimeDurableDate: 2024-04-23T10:31:01.000Z,
lastAppliedWallTime: 2024-04-23T10:31:02.227Z,
lastDurableWallTime: 2024-04-23T10:31:02.227Z,
lastHeartbeat: 2024-04-23T10:31:02.028Z,
lastHeartbeatRecv: 2024-04-23T10:31:01.994Z,
pingMs: Long('1'),
lastHeartbeatMessage: '',
syncSourceHost: 'ac-v3am6ob-shard-00-01.crpzkmv.mongodb.net:27017',
syncSourceId: 1,
infoMessage: '',
configVersion: 47,
configTerm: 529
}
],
ok: 1,
'$clusterTime': {
clusterTime: Timestamp({ t: 1713868262, i: 1007 }),
signature: {
hash: Binary.createFromBase64('NtelnxdIf2hl2sC2Hwe9laGhopc=', 0),
keyId: Long('7325186338167193606')
}
},
operationTime: Timestamp({ t: 1713868262, i: 1007 })
}
- Login to your single store cluster & check if Java is installed on the master node. (Not for Helios)
- If Java is not installed, please install it using the commands below. (Not for Helios)
sudo apt update # For Ubuntu/Debian
sudo yum update # For CentOS/RHEL
sudo apt install default-jdk # For Ubuntu/Debian
sudo yum install java-1.8.0-openjdk # For CentOS/RHEL (Java 8)
java -version (verify installation)
- Log in to SingleStore studio now and set the Java path using the command below. (Not for Helios)
which java (to get the path where the java is installed)
OR
whereis java (to get the path where the java is installed)
##SQL Command to be run in SingleStore Studio
SET GLOBAL java_pipelines_java11_path = 'javafilepath';
OR
sdb-admin update-config --all --set-global --key "java_pipelines_java11_path" --value "/usr/bin/java"
- To get the connection string from Mongo atlas -
##Suppose if you MongoDB Atlas Shell connection string is this -
Shell - mongosh "mongodb+srv://cluster0.rrb2vgy.mongodb.net/" --apiVersion 1 --username <db_username>
You can use the "mongodb+srv://cluster0.rrb2vgy.mongodb.net/" part in your SingleStore command CONFIG part to detect your Atlas Cluster.
- After this login to Helios or SingleStore Self Managed and go to SQL editor and please use the following commands to Create a link to the MongoDB hosts using the
"mongodb.connection.string"
. - Please use the following command to create the infer pipelines -:
First using the link
## Creating the link
CREATE LINK mongoatlas AS MONGODB
CONFIG '{"mongodb.connection.string": "mongodb+srv://cluster0.rrb2vgy.mongodb.net/",
"collection.include.list": "source_db.source_collection",
"mongodb.ssl.enabled":"true",
"mongodb.authsource":"admin",
"mongodb.members.auto.discover": "false"}'
CREDENTIALS '{"mongodb.user":"xxxx",
"mongodb.password":"xxxx"}';
## creating the pipelines and procedure using infer statement automatically even if the table exists
CREATE TABLE IF NOT EXISTS subscribers
AS INFER PIPELINE AS LOAD DATA
LINK mongoatlas "source_db.source_collection"
FORMAT AVRO;
OR
Second without using link with MONGODB command
## Creating the pipelines and procedure using infer statement automatically even if the table exists using mongodb and config details
CREATE TABLE IF NOT EXISTS subscribers
AS INFER PIPELINE AS LOAD DATA
MONGODB "source_db.source_collection"
CONFIG '{"mongodb.connection.string": "mongodb+srv://cluster0.rrb2vgy.mongodb.net/",
"collection.include.list": "source_db.source_collection",
"mongodb.ssl.enabled":"true",
"mongodb.authsource":"admin",
"mongodb.members.auto.discover": "false"}'
CREDENTIALS '{"mongodb.user":"xxxx",
"mongodb.password":"xxxx"}'
FORMAT AVRO;
- Check the pipelines and start the stopped pipelines since by default they are stopped.
-- Checking if the pipelines are created using infer pipeline command
show pipelines;
-- Starting the stopped pipelines
start all pipelines;
- To verify the batches are being inserted properly please use the command below
select * from information_schema.pipelines_batches_summary;
- After this please check that the tables are created using the following command
-- Checking if the tables are created
show tables;
- To view the data inserted properly in readable format use the following command.
-- Verifying if the data is properly present
SELECT _id :> JSON , _more :> JSON FROM <table_name>;
For example -:
- Starting SingleStore 8.7 Sharded clusters are supported as well.
Related Links :
Install MongoDB on a Linux machine
What is Replication & Replica Set
What is Sharding & Sharded Cluster
Load Data from MongoDB to SingleStore Self-Managed
{answer}