New Member of the Cluster API Family: The Native Node.js Connector

MySQL Cluster 7.3 went GA yesterday and with it came a new member of the MySQL Cluster API family: mysql-js – a native Node.js connector. mysql-js uses the NDB API to connect directly to the data nodes which improves performance compared to executing queries through the MySQL nodes.

For an introduction to mysql-js and installation instructions I will recommend taking a look at the official API documentation and Andrew Morgan's blog; the latter also has an overview of the new features in MySQL Cluster 7.3 in general.

To get a feel for how the new API works, I went ahead and created a small test program that will take one or more files with delimited data (e.g. similar to what you get with SELECT ... INTO OUTFILE and inserts the data into a table. I have tried to keep things simple. This means that no other external modules than mysql-js is used, not very much error handling has been included, the reading, parsing of the data files could be done much better, performance has not been considered, etc. – but I would rather focus on the usage of mysql-js.

The complete example can be found in the file nodejs_tabinsert.js. The following will go through the important bits.

Preparation

The first part of the script is not really specific to mysql-js, so I will go lightly over that. A few of the arguments deserve a couple of extra words:

  • –log-level: when set to debug or detail some output with information about what happens inside mysql-js is logged. This can be useful to learn more about the module or for debugging.
  • –basedir: this is the same as the basedir option for mysqld – it sets where MySQL has been installed. It is used for loading the mysql-js module. Default is /usr/local/mysql.
  • –database and –table: which table to insert the data into. The default database is test, but the table name must always be specified.
  • –connect-string: the script connects directly to the cluster nodes, so it needs the NDB connect-string similar to other NDB programs. The default is localhost:1186.
  • –delimiter: the delimiter used in the data files. The default is a tab (\t).

Setting Up mysql-js

With all the arguments parsed, it is not possible to load the mysql-js module:

// Create the mysql-js instance - look for it in $basedir/share/nodejs
var nosqlPath = path.join(basedir, 'share', 'nodejs');
var nosql = require(nosqlPath);

// unified_debug becomes available once nosql has been loaded. So set up
// the log level now.
if (logLevel != 'default') {
   unified_debug.on();
   switch (logLevel) {
      case 'debug':
         unified_debug.level_debug();
         break;

      case 'detail':
         unified_debug.level_detail();
         break;
   }
}

// Configure the connections - use all defaults except for the connect string and the database
var dbProperties = nosql.ConnectionProperties('ndb');
dbProperties.ndb_connectstring = ndbConnectstring;
dbProperties.database          = databaseName;

The unified_debug class is part of mysql-js and allows to get debug information from inside mysql-js logged to the console.

The nosql.ConnectionProperties() method will return an object with the default settings for the chosen adapter – in this case ndb. After that we can change the settings where we do not want the defaults. It is also possible to use an object with the settings as the argument instead of ‘ndb'; that requires setting the name of the adapter using the “implementation” property. Currently the two supported adapters are ‘ndb' (as in this example) and ‘mysql' which connects to mysqld instead. ‘mysql' required node-mysql version 2.0 and also support InnoDB.

As the ‘ndb' adapter connects directly to the cluster nodes, no authentication is used. This is the same as for the NDB API.

Callbacks and Table Mapping Constructor

var trxCommit = function(err, session) {
   if (err) {
      failOnError(err, 'Failed to commit after inserting ' + session.insertedRows + ' rows from ' + session.file + '.');
   }
   session.close(function(err) {
      if (err) {
         failOnError(err, 'Failed to close session for ' + session.file + '.');
      }
   });
}

We will load each file inside a transaction. The trxCommit() callback will verify that the transaction was committed without error and then closes the session.

var onInsert = function(err, session) {
   session.insertedRows++;
   if (err && err.ndb_error !== null) {
      failOnError(err, 'Error onInsert after ' + session.insertedRows + ' rows.');
   }

   // Check whether this is the last row.
   if (session.insertedRows === session.totalRows) {
      session.currentTransaction().commit(trxCommit, session);
   }
};

The onInsert callback checks whether each insert worked correctly. When all rows for the session (file) have been inserted, it commits the transaction.

var tableRow = function(tableMeta, line) {
   // Skip empty lines and comments
   if (line.length > 0 && line.substr(0, 1) != '#') {
      var dataArray = line.split(delimiter);
      for (var j = 0; j < tableMeta.columns.length; j++) {
         this[tableMeta.columns[j].name] = dataArray[tableMeta.columns[j].columnNumber];
      }
   }
}

The tableRow is the constructor later used for the table mapping. It is used to set up the object with the data to be inserted for that row. tableMeta is a TableMetaData object with information about the table we are inserting into.

The Session

This is were the bulk of the work is done. Each file will have it's own session.

var onSession = function(err, session, file) {
   if (err) {
      failOnError(err, 'Error onSession.');
   }

   // Get the metadata for the table we are going to insert into.
   // This is needed to map the lines read from the data files into row objects
   // (the mapping happens in tableRow() ).
   session.getTableMetadata(databaseName, tableName, function(err, tableMeta) {
      if (err) {
         failOnError(err, 'Error getTableMetadata.');
      }

      var trx = session.currentTransaction();
      trx.begin(function(err) {
         if (err) {
            failOnError(err, 'Failed to start transaction for "' + file + '".');
         }
      });

      session.insertedRows = 0;
      session.file         = file;
      console.log('Reading: ' + file);
      fs.readFile(file, { encoding: 'utf8', flag: 'r' }, function(err, data) {
         if (err) {
            failOnError(err, 'Error reading file "' + file + '"');
         }

         // First find the rows to inserted
         console.log('Analysing: ' + file);
         var rows  = [];
         session.totalRows = 0;
         data.split('\n').forEach(function(line) {
            var row = new tableRow(tableMeta, line);
            if (Object.keys(row).length > 0) {
               rows[session.totalRows++] = row;
            }
         });

         // Insert the rows
         console.log('Inserting: ' + file);
         rows.forEach(function(row) {
            session.persist(row, onInsert, session);
         });
      });
   });
};

The onSession function is a callback that is used when creating (opening) the sessions.

The first step is to get the meta data for the table. As all data is inserted into the same table, in principle we could reuse the same meta data object for all sessions, but the getTableMetaData() method is a method of the session, so it cannot be fetched until this point.

Next a transaction is started. We get the transaction with the session.currentTransaction() method. This returns an idle transaction which can then be started using the begin() method. As such there is not need to store the transaction in a variable; as can be seen in the trxCommit() and onInsert() callbacks above, it is also possible to call session.currnetTransaction() repeatedly – it will keep returning the same transaction object.

The rest of the onSession function processes the actual data. The insert itself is performed with the session.persist() method.

Edit: using a session this way to insert the rows one by one is obviously not very efficient as it requires a round trip to the data nodes for each row. For bulk inserts the Batch class is a better choice, however I chose Session to demonstrate using multiple updates inside a transaction.

Creating the Sessions

var annotations = new nosql.TableMapping(tableName).applyToClass(tableRow);
files.forEach(function(file) {
   nosql.openSession(dbProperties, tableRow, onSession, file);
});

First the table mapping is defined. Then a session is opened for each file. Opening a session means connecting to the cluster, so it can be a relatively expensive step.

Running the Script

To test the script, the table t1 in the test database will be used:

CREATE TABLE `t1` (
  `id` int(10) unsigned NOT NULL PRIMARY KEY,
  `val` varchar(10) NOT NULL
) ENGINE=ndbcluster DEFAULT CHARSET=utf8;

For the data files, I have been using:

t1a.txt:

# id    val
1       a
2       b
3       c
4       d
5       e

t1b.txt:

# id    val
6       f
7       g
8       h
9       i
10      j

Running the script:

shell$ export LD_LIBRARY_PATH=/usr/local/mysql/lib
shell$ node nodejs_tabinsert.js --table=t1 t1a.txt t1b.txt
Connected to cluster as node id: 53
Reading: t1b.txt
Reading: t1a.txt
Analysing: t1b.txt
Inserting: t1b.txt
Analysing: t1a.txt
Inserting: t1a.txt

One important observation is that even though the session for t1a.txt is created before the one for t1b, the t1b.txt file is ending up being inserted first. Actually if the inserts were using auto-increments, it would be possible to see that in fact, the actual assignment of auto-increment values will in general alternate between rows from t1b.txt and t1a.txt. The lesson: in node.js do not count on knowing the exact order of operations.

I hope this example will spark your interest in mysql-js. Feedback is most welcome – both bug reports and feature requests can be reports at bugs.mysql.com.

I have worked with MySQL databases since 2006 both as an SQL developer, a database administrator, and for more than eight years as part of the Oracle MySQL Support team. I have spoken at MySQL Connect and Oracle OpenWorld on several occasions. I have contributed to the sys schema and four Oracle Certified Professional (OCP) exams for MySQL 5.6 to 8.0. I have written four books, all published at Apress.

Leave a Reply

Your email address will not be published.

*

This site uses Akismet to reduce spam. Learn how your comment data is processed.