How to Load Data from a Hadoop Cluster into a MemSQL Database

MemSQL is a distributed in-memory SQL database, suitable for processing transactions and real-time analytics. The Hadoop Distributed File System (or HDFS) is a popular option for storing very large data sets.

If you are using both Hadoop and MemSQL, you might need to load data from your Hadoop cluster into MemSQL database. In this blog post, we explore two approaches.

Setting Up a MemSQL cluster

This post assumes that you already have a Hadoop cluster up and running, so I will just briefly describe the installation process for MemSQL:

I have provisioned three machines on AWS (just enough for a test cluster). I have used CentOS 7.1 x86_64 with cloud-init (HVM) (ami-91e416fa), m4.xlarge (4 vCPUs and 16 GiB), and default 8 GB of General Purpose SSD (GP2).

The following inbound rules have been created:

All traffic | All | All | <IP range of the subnetwork>
SSH | TCP | 22 | 0.0.0.0/0
Custom TCP Rule | TCP | 9000 | 0.0.0.0/0 (MemSQL Ops Web UI)
Custom TCP Rule | TCP | 10010 | 0.0.0.0/0 (Spark Web UI)

To deploy MemSQL, I followed these instructions closely. They are very straightforward; just follow each step and carefully read the messages on MemSQL UI.

In addition, the following document may be used as a reference: http://docs.memsql.com/docs/quick-start-with-amazon-webservices#section-setting-up-on-aws-manually.

If you have three machines, by default the first of them will act as a Master Aggregator node and others two will act as Leaf nodes. To use my resources more effectively, I have added an additional Leaf node on the first machine (several clicks on MemSQL Ops UI). The default port 3306 was already occupied by Master Aggregator, so I have chosen 3307.

Creating a database for my test data (you may use any MySQL client to connect to MemSQL):

CREATE DATABASE hadoop_poc;
USE hadoop_poc;
CREATE TABLE `test` (
`user_id` int(11),
...
);


For each new test, I had to drop the whole database and recreate it again (truncating the tables leaves all physical files, so I ran out of space very quickly):

DROP DATABASE hadoop_poc;


This brings us to our tutorial.

Approach 1: Load data from CSV files on HDFS

  • Go to MemSQL Ops UI, click Data Sources and Import.
  • Click HDFS CSV.
  • Specify HDFS Server (either hostname or IP address), HDFS Port (8020) and File or Folder to Import and click Continue. For example: /apps/hive/warehouse/poc.db/test/*. Please note that a folder name MUST be followed by a wildcard.
  • Configure CSV data formatting (default values should be fine, unless your files have headers) and click Test & Continue.
  • Verify the data and click Continue.
  • Specify Database Name and Table Name and click Continue.
  • Verify field mapping and click Start Import.
  • Done!

Performance:

This approach is reasonably fast. Using the configuration described above, it took in average 3:47 minutes to import 5,696 MB of original HDFS data (45,768,754 of records). Number of tests: 3; standard deviation: 15 seconds.

Limitations:

This approach assumes you’re working with CSV files on HDFS. It’s not possible to use other input formats. It may be good for a one-time load, but it’s not possible to run incremental updates.

This is the easiest solution that may be done from MemSQL Ops UI. The next approach is faster, but is more complicated in its processing since it requires connecting to the hosts.

Approach 2: Load data using MemSQL Loader

  • Login to any host (may be outside of the MemSQL cluster). Install wget, if needed:
    sudo yum -y install wget
  • Download and unpack memsql-loader:
    wget https://github.com/memsql/memsql-loader/releases/download/2.0.4/memsql-loader.tar.gz
    tar -xvf memsql-loader.tar.gz
    cd memsql-loader
    ls -als
  • Open another terminal window to the host and open memsql-loader log:
    cd memsql-loader
    ./memsql-loader log
    
  • Run load in the first window:
    ./memsql-loader load -D <database name> -t <table name> -u root -h <IP or hostname of MemSQL Master Aggregator> -P 3306 --hdfs-host <IP or hostname of HDFS name node> --hdfs-user hdfs --debug --fields-terminated "," --fields-enclosed '"' --fields-escaped '\\' --columns <comma-separated list of column names> <HDFS path>
    

    For example:
    ./memsql-loader load -D hadoop_poc -t test -u root -h 172.31.44.1 -P 3306 --hdfs-host 172.31.34.1 --hdfs-user hdfs --debug --fields-terminated "," --fields-enclosed '"' --fields-escaped '\\' --columns user_id,movie_id,rating,tms hdfs:///apps/hive/warehouse/poc.db/test/*
    
    

Instead of specifying a set of CLI parameters, you can pass a single JSON file with all the data using --spec. Format of the JSON may be revealed by appending --print-spec flag.

Please note, if you need to run the test several times, you should additionally add --force flag to say memsql-loader to reprocess the files, which were processed earlier.

The memsql-loader is smart enough to process only new files by default, so you can easily run incremental updates. The memsql-loader can also track changes in the existing files, but to do that your table must have a file_id column (append --file-id-column).

What if our input files are not in CSV format?

The memsql-loader allows you to easily inject a script that will transform your data into CSV format and apply any additional filtering or transformation (if needed). For example, if your files on HDFS are compressed, you can just append --script 'lzma -d'. The data is being passed to your script using pipe.

Let's assume that your files instead of CSV contain a JSON object for each record like:

{"user_id": 1, "movie_id": 2, "rating": 0.5, "tms": "1287412650"}
{"user_id": 2, "movie_id": 3, "rating": 0.6, "tms": "1287412651"}
To insert that data, you may create a simple Python script like the following:
$ cat /tmp/transform.py
import json
import sys
def main():
for input_line in sys.stdin:
	value = json.loads(input_line)
	output_line = '%s,%s,%s,%s\n' % (value['user_id'],
                                    value['movie_id'],
                                    value['rating'],
                                    value['tms'])
	sys.stdout.write(output_line)
if __name__ == '__main__':
	main()


Then append to your load command the following: --script 'python /tmp/transform.py'.

Performance:

This approach is faster than the previous one. To import 5,696 MB of original HDFS data (45,768,754 of records) it took in average 2:14 minutes (number of tests: 3, standard deviation: 1.4 seconds) on the MemSQL Master Aggegator host and in average 2:04 minutes (number of tests: 3, standard deviation: 1.2 seconds) on a host outside of the MemSQL cluster.

When we pass the data through a custom script, performance is slightly worse, but not terrible.

Let's inject the following script (it just redirects each line from input to output):

$ cat /tmp/transform.py
import sys
def main():
	for input_line in sys.stdin:
       sys.stdout.write(input_line)
if __name__ == '__main__':
   main()


On the MemSQL Master Aggegator host it took an average 2:22 minutes (number of tests: 3, standard deviation: 0.5 seconds). This process is just 6% slower.

Limitations:

This approach is faster, flexible, and may be used for both a one-time load and incremental updates. However, this solution is much more complicated than the previous one and requires connecting to the hosts.