Wednesday, February 26, 2014

Bulk Integration Into and Out of Cassandra CQL3 Data Models


Purpose

The purpose of this blog is to provide an aggregated view of batch/bulk techniques available for integrating with Apache Cassandra with a CQL3 based data models, including Cassandra 1.2 and 2.0 (DSE 3.2 and 4.0).  This blog post will be augmented overtime as techniques evolve. 

Background

As long as people store data digitally, there will be a need to move large "chunks" of data between systems.  This could be for reasons of:
  • aggregation and combination (i.e. Analytical systems) where multiple sources of data are combined and queries
  • archive purposes (removal of old and non needed data)
  • data migration projects
  • large scale data model changes
  • database transition initiatives
  • other
This blog will provide a comprehensive view of bulk integration techniques when moving data into and out of Cassandra: 
  • with links to code samples
  • tips and tricks
  • recommended use cases
Information will be presented using a matrix view that lists different techniques as rows.  The matrix will contain 2 columns, one for loading data into Cassandra from a source (Into Cassandra) and the other for loading data from Cassandra into a source (Out of Cassandra).   "A source" generally means an RDBMS system, flat file, Hadoop, mainframe, etc.  The cells of this matrix will contain a summary for the row topic as well as links to additional posts that contain details supporting the summary.

We will work to provide a low latency matrix in a subsequent post to help with near real time integration/data pipe-lining techniques.

Batch Integration Matrix

 

Batch Technique
Into Cassandra
Out of Cassandra
Cassandra Bulk Loader (SSTableLoader)*
·  Loads SSTables into a Cassandra ring from a node or set of nodes that are not actively part of the ring.
·  Good option for migration, cluster generation, etc
·  Enables parallel loading of data
·  Should be executed on a set of “non cluster” nodes for optimal performance.
·  Requires creation of SSTables (see below)
·  CQL3 Support depends on the creation of SSTables
·  Counters may not be fully supported
·  Leverages Cassandra internal streaming
·  After completion, run a repair
n/a
Bulk Loading via JMX*
·  JMX based utility to stream sstable data into Cassandra.
·  Leverages same code functionality as SSTableLaoder
·  Enables loading of data from a cassandra node into the same node without requiring the configuration of a separate network interface, which is required for SSTableLaoder on the same node
·  Same requirements and limitations of SSTableLoader.
·  Source and Documentation (look for bulkLoad(String))
n/a
Copy SSTables
Into Data Directory 
·  Loads SSTables into a Cassandra ring
·  Good option for migration, cluster generation, etc
·  Requires creation of SSTables (see below)
·  Could leverage Snapshot SSTables for migration purposes
·  CQL3 Support depends on the structure of SSTables.
·  Working example and blog to be provided soon
Out of Data Directory
·  Get access to SSTable data with minimal production system impact
·  Can be used as a source to populate another cluster
ColumnFamily<>Format
ColumnFamilyOutputFormat
·  Thrift based driver to load data into Cassandra Thrift based column families
·  Not compatible with CQL3 tables
·  Example
ColumnFamilyInputFormat
·  Thrift based driver to extract data out of Cassandra Thrift based column families
·  Not compatible with CQL3 tables
·  Example
BulkOutputFormat
·  Thirft based tool/no CQL3 support (CQL3 support has to be creatively programmed using composites similar to the SSTable loader techniques for the Simple and SimpleUnsorted SSTable Wrtiers
·  Used to stream data from MR to Cassandra
·  Similar to Bulk Loading above but no need for a “fat client”, i.e. cassandra node to execute process.
·  Can be used with Pig
n/a
CQL<>Format
CQLOutputFormat
·  CQL3 based driver to load data into Cassandra CQL3 tables
·  This is not necessarily a “bulk loader”
·  Example
CQLPagingInputFormat
·  CQL3 based driver to extract data out of Cassandra CQL3 tables
·  This is not necessarily a “bulk loader”
·  Example
CQL3 Statements via M/R
·  Have heard that several users simply create Map only jobs and insert data into Cassandra leveraging a java driver, like the DSE java driver.
·  Example
n/a
CQL3 Batch Statements
·  Batch statements group individual statements into single operations and can be used for bulk-like integration processes.
·  Use the UNLOGGED if performance is a concern, though you lose atomicity
n/a
ETL Tools - Pentaho
·  Visual, ETL approach for the bulk integration of Cassandra data with other sources.
·  Pentaho Data Integration 5 supports CQL3 (JIRA)
·  We have not had the chance to test this approach but will do so soon.
·  Working example and blog to be provided soon
·  Visual, ETL approach for the bulk integration of Cassandra data with other sources.
·  Pentaho Data Integration 5 supports CQL3 (JIRA)
·  We have not had the chance to test this but will do so soon.
·  Working example and blog to be provided soon
ETL Tools - JasperSoft
·  Visual, ETL approach for the bulk integration of Cassandra data with other sources.
·  Jaspersoft Studio with the Cassandra Connector v 1.0 supports CQL3 (Release)
·  We have not had the chance to test this approach but will do so soon.
·  Working example and blog to be provided soon
·  Visual, ETL approach for the bulk integration of Cassandra data with other sources.
·  Jaspersoft Studio with the Cassandra Connector v 1.0 supports CQL3 (Release)
·  We have not had the chance to test this approach but will do so soon.
·  Working example and blog to be provided soon
ETL Tools - Talend
·  Visual, ETL approach for the bulk integration of Cassandra data with other sources.
·  We are still investigating what Talend supports with regards to Cassandra.  We did notice that it appears as if Talend would like to support the SSTableLoader mentioned above. (JIRA)
·  We have not had the chance to test this approach but will do so soon.
·  Working example and blog to be provided soon
·  Visual, ETL approach for the bulk integration of Cassandra data with other sources.
·  We are still investigating what Talend supports with regards to Cassandra.  We did notice that it appears as if Talend would like to support the SSTableLoader mentioned above. (JIRA)
·  We have not had the chance to test this approach but will do so soon.
·  Working example and blog to be provided soon
Sqoop (DSE version)
·  Offers a good tool for bulk/batch integration between many different databases, hadoop, and Cassandra.  
·  The DSE 3.X version does not as yet support CQL3, but this should be coming in DSE 4.X.
·  This is a very promising feature and we will update once DataStax releases the CQL3 support.
·  For CQL2, here is a write up with a working example
·  Offers a good tool for bulk/batch integration between many different databases, hadoop, and Cassandra.  
·  The DSE 3.X version does not as yet support CQL3, but this should be coming in DSE 4.X.
·  This is a very promising feature and we will update once DataStax releases the CQL3 support.
·  For CQL2, here is a write up with a working example
CQLSH Copy
·  Good tool for bulk loading a small amount of data, less than 1 million records is the recommendation.
·  Uses .csv files as import sources only
·  Good tool for writing out a small amount of data, less than 1 million records is the recommendation, to a cs file.
·  Uses .csv files as export source only
Flume Integration
·  Found a project on GitHub that offers Flume integration into Cassandra.  
·  Currently this is Hector based.  


*Requires SSTable Creation

*SSTable Writers

In order to load data into Cassandra using the bulk loading technique, denoted by *, SSTables need to be generated to load.  The following table provides information on the available techniques for the creating of SSTables.

Batch Technique
Overview
Limitations
CQL3 Support
CQLSSTableWriter
·  Fully supports CQL3 compatible SSTable creation
·  Contained in C* 2.X and higher
Full
SSTableSimpleUnsortedWriter
·  Generates SSTables simply and easily using partitioned order
·  Does not inherently support CQL3
·  CQL3 support can be created using composite types
·  DataStax Example (Non CQL)
·  Limited CQL Support
·  More complex CQL configuration compared to CQLSSTableWriter
Limited
SSTableSimpleWriter
·  Generates SSTables but not in sorted order, requires data be added in partition sorted order
·  Not recommended for use unless there is a specific use case.
·  Requires inserting data in partition sorted order
Limited

Conclusion

This list is meant to be a comprehensive guide.  Please let us know if we missed anything or if you have feedback on specific techniques.  Hopefully this post provides value to people who are analyzing different techniques to move large chunks of data into or out of Cassandra.

Friday, February 21, 2014

DataStax AMI EC2 Cassandra JMX Configuration for Jconsole

Purpose

Provide a quick reference when configuring the DataStax AMI for JConsole access.  Datastax is working on updating their documentation for this, but in the mean time, here you go:

Steps

1)  Change the cassandra-env.sh to provide a jmx hostname like the following:
JVM_OPTS="$JVM_OPTS -Djava.rmi.server.hostname=<ec2 public name> like ec2-xx-xxx-xxx-xxxx.us-west-1.compute.amazonaws.com"

2) Open up your Security Group to allow access to port 7199 (default Cassandra JMX port) as well as 1024 - 65535 for the machine that you are connecting from.

3)  Start JConsole using the following service:jmx:rmi://ec2-xx-xxx-xxx-xxxx.us-west-1.compute.amazonaws.com:7199/jndi/rmi://ec2-xx-xxx-xxx-xxxx.us-west-1.compute.amazonaws.com:7199/jmxrmi

Note: use an insecure connection if ssl is disabled (what i did)

That should do it.

Monday, February 10, 2014

Cassandra 1.2.X Secondary Indexes, Observations

Goal

The purpose of this post is to share observations on why secondary indexes in Cassandra 1.2.X should only be used in specific use cases.

Background


For background, here are two good write-ups on secondary indexes in Cassandra:

1) Original DataStax blog entry introducing secondary indexes by Jonathan Ellis
2) Indexing in Cassandra

Our view on secondary indexes is that they provide a good mechanism to short-circuit the restrictions of CQL3 data modeling to retrieve data in very specific situations.  Secondary indexes are not a panacea to all data modeling challenges in CQL3 and should be used very selectively.

Check out the 2nd article listed above for the "official" guidelines for when to, and not to, use secondary indexes.

Observations

Our take is that secondary indexes should be used to enable read queries;
1) on columns that contain a small set of possible values (low cardinality) that span many partitions
2) and where the indexed column is queried often
3) and data in the table is not often updated or deleted
3) and including the tables (cf's) partition index in the query is undesirable
4) and when explicitly creating another CQL3 table is undesirable

Narrow use-case, huh?

So, why are secondary indexes recommended for very specific use cases?  It's because a secondary index creates a second, hidden, table "under the covers".  Looking at the contents of the hidden table on disk, using SSTable2Jason, you can clearly see what's happening when you create a secondary index.

Here's an example, using a little POC schema,  complete with a type in the table name.

CREATE TABLE ptoducts2 (
  country_cd text,
  base_div_nbr int,
  sub_div text,
  store_nbr int,
  store_type text,
  att1 text,
  att2 text,
  PRIMARY KEY ((country_cd, base_div_nbr), sub_div, store_nbr, store_type)
) WITH
  bloom_filter_fp_chance=0.010000 AND
  caching='KEYS_ONLY' AND
  comment='' AND
  dclocal_read_repair_chance=0.000000 AND
  gc_grace_seconds=864000 AND
  read_repair_chance=0.100000 AND
  replicate_on_write='true' AND
  populate_io_cache_on_flush='false' AND
  compaction={'class': 'SizeTieredCompactionStrategy'} AND
  compression={'sstable_compression': 'SnappyCompressor'};


CREATE INDEX att1 ON ptoducts2 (att1);

Looking at the data files for this table we see:

total 64
4 -rw-r--r-- 1 root root   46 Feb 10 06:20 demo2-ptoducts2.att1-ic-1-CompressionInfo.db
4 -rw-r--r-- 1 root root  159 Feb 10 06:20 demo2-ptoducts2.att1-ic-1-Data.db
4 -rw-r--r-- 1 root root   16 Feb 10 06:20 demo2-ptoducts2.att1-ic-1-Filter.db
4 -rw-r--r-- 1 root root   37 Feb 10 06:20 demo2-ptoducts2.att1-ic-1-Index.db
8 -rw-r--r-- 1 root root 4355 Feb 10 06:20 demo2-ptoducts2.att1-ic-1-Statistics.db
4 -rw-r--r-- 1 root root   73 Feb 10 06:20 demo2-ptoducts2.att1-ic-1-Summary.db
4 -rw-r--r-- 1 root root   79 Feb 10 06:20 demo2-ptoducts2.att1-ic-1-TOC.txt
4 -rw-r--r-- 1 root root   46 Feb 10 06:20 demo2-ptoducts2-ic-1-CompressionInfo.db
4 -rw-r--r-- 1 root root  300 Feb 10 06:20 demo2-ptoducts2-ic-1-Data.db
4 -rw-r--r-- 1 root root   16 Feb 10 06:20 demo2-ptoducts2-ic-1-Filter.db
4 -rw-r--r-- 1 root root   78 Feb 10 06:20 demo2-ptoducts2-ic-1-Index.db
8 -rw-r--r-- 1 root root 4357 Feb 10 06:20 demo2-ptoducts2-ic-1-Statistics.db
4 -rw-r--r-- 1 root root   96 Feb 10 06:20 demo2-ptoducts2-ic-1-Summary.db

4 -rw-r--r-- 1 root root   79 Feb 10 06:20 demo2-ptoducts2-ic-1-TOC.txt

The bolded, added fro emphasis, files represent the hidden table that was created with the CREATE INDEX statement.

Even more interesting is the content of this hidden table.

Here are the "rows" that exist in the ptoducts2 table:

cqlsh:demo2> select * from ptoducts2;








Here are the rows returned for a specific value of the secondary index:

cqlsh:demo2> SELECT * FROM ptoducts2 WHERE att1 = 'test2';





And here is the output of SSTable2Jason for the hidden table:

[
{"key": "74657374","columns": [["000275730000040000000100:us1:1:test","",1392040302405000], ["000275730000040000000100:us1:2:test","",1392040376314000], ["000275730000040000000100:us1:3:test","",1392040376326000], ["000275730000040000000100:us2:1:test","",1392040376339000], ["00026d780000040000000100:us1:1:test","",1392040377513000]]},
{"key": "7465737432","columns": [["000275730000040000000200:us1:1:test","",1392041431384000]]}
]

Looking at the actual data stored on disk shows that this index contains a partition for each unique value of the secondary index column,  Just imagine what the secondary index data file would look like if you create a secondary index on a column that doesn't adhere to the guidelines listed above.

Conclusion

Now that you can see exactly what's happening with secondary indexes in Cassandra 1.2.X be sure to think through the use cases for when to leverage this mechanism.  Secondary indexes do enable a lot of "ease of use" query functionality but there's a cost.  Be conscious of the cost of this mechanism and use it appropriately.  Make good, sound architectural and design trade-off type decisions when deciding to use secondary indexes.

We still prefer creating explicit tables to handle "index" scenarios.  Hopefully, you find the right mix for your scenario.


Friday, February 7, 2014

Cassandra CQL3 Data Modeling for Reformed Dimensional Modelers

Goal


This post will present Cassandra data modeling practices in dimensional modeling terms.  This post is targeted at those individuals who come from a Data Warehousing background and find themselves being asked to generate data models in a Cassandra environment.  The goal of this post is to help those with Dimensional Modeling backgrounds wrap their brain around modeling in Cassandra.

Please keep in mind that we are deliberately trying to find similarities between modeling in Cassandra and modeling for Data Warehouses/Marts.  Also, keep in mind that we were recently doing a lot of Data Warehousing/BI/batch integration work prior to jumping head first into Cassandra-land.

Also, please keep in mind that this post is not meant to be an exhaustive review of Cassandra data modeling techniques and best practices.  Please refer to Patrick McFadin's data modeling presentations for more in depth Cassandra modeling guidelines.  Patrick's presentations are fantastic!

A bit of background

Cassandra and Data Warehousing use cases differ greatly, but the data modeling guidelines used in both technology genres are very similar, or can be if you squint your eyes a bit.

Use Cases:

Cassandra - typically referred to as online or transactional (here's a link to the target use cases for Cassandra, just click on Use Cases), think lots and lots of small amounts of data flowing into a system very quickly (milliseconds or less).

Data Warehousing (used loosely) - typically referred to as analytical or batch, think lots of large amounts of data flowing into a system at periodic times throughout the day using a data processing procedure that lasts minutes to hours.

Data Modeling Guidelines:

Cassandra
  • Disk space is cheap, writes are blazing fast (thanks log structured storage and sequential writes)
  • Align table layout to satisfy read patterns
  • Duplicate data to satisfy different versions of read operations
  • Eliminate joins if possible (materialized views in Cassandra-speak)
  • Leverage unique keys for data distributions (partition keys)
  • Leverage cascading secondary keys for column clustering and dynamic schemas (cluster keys)
  • Access data through cascading key structure (a.k.a. hierarchical data access)
  • No aggregations dues to distributed architecture (store pre-aggregated data)
Data Warehousing (more specifics can be found here ...warning possible TL;DR..just like this blog)
  • Align table layout to satisfy read patterns, ad hoc requests, and reporting structure
  • Duplicate data to simplify read access patterns (think Slowly Changing Dimensions (type II or higher)
  • Leverage unique keys to identify unique combinations of dimensional attributes (surrogate keys)
  • Nest cascading hierarchies in dimension attributes to enable "drill" capabilities in BI layer
  • Access data through cascading hierarchical attributes using GROUP BY clauses
  • For query performance, limit joining to simple joins between facts and dimensions (avoid snow-flaking)
  • For performance pre-aggregate data when applicable and store in OLAP cubes, in memory layer, or summary fact tables
Hopefully you can spot a few common themes that we would like to explore in this blog, mainly:
  1. read based table design
  2. data duplication for read performance
  3. hierarchical data access
  4. storing pre-aggregated data

Read Based Table Design

Okay, so if we could provide only one piece of advice to Cassandra data modelers with a DW background it would be to approach a data modeling problem in Cassandra as you would in Data Warehousing.  Meaning, understand what queries are going to be executed against the data stored in Cassandra to guide modeling.  This same techniques can be used during requirements gathering exercises of Data Warehouse projects, i.e. meet with end users of the system and analyze how the end users would like to query the data.  This is a different approach to modeling data in a traditional OLTP type of system.

Note that in Cassandra the read use cases will probably be a lot smaller, more targeted, and won't require ad-hoc query design practices.  In other words, the read requirements for Cassandra backed systems will probably be a bit simpler compared to a DW system.  So even though you should start with understanding how data should be queried in Cassandra, don't overthink this step.  Also note that you are probably only dealing with one subject area, i.e. step number 1 in Dimensional Modeling.  It's going to be straightforward, or at least it should be straightforward, let it be straightforward.

Start with understanding the read access patterns like you would in a Data Warehouse project and you're more than halfway to creating the right data model in Cassandra. 

Data Duplication for Read Performance

Duplicating data by repeating values in one table:  

This is where Cassandra and Dimension data modeling share similarities.  We are assuming that reads of this post have all created dimensions.  Dimensions duplicate data to enable easier query access.  For example if I had a location dimension, I would repeat the State in every row for a City.  The dimension might look something like the following:

Dimension Name : Location
Key : Location Key
Attributes: City, State

For every City, the state value will be duplicated.

The same concept exists in Cassandra.  You should duplicate data in Cassandra to match the read design pattern.  Do this to eliminate joins, just like in a DW system.  

There is a slight nuance here between Cassandra and DW systems.  In DW systems, joins slow things down just a bit (or worse a lot).  In Cassandra some joins are possible because of the distributed architecture (i.e. the database lives on many physical servers, nodes).  Therefore joins occur at the client level.

Duplicating data by repeating data in multiple tables:

This is where Cassandra and Fact data modeling share similarities.  Think of modeling in Cassandra as you would modeling multiple versions of Summary fact tables.  At the atomic level all fact tables should be unique, and as my friend Leo would argue should be business process aligned.  This means at the atomic level of a Data Warehouse, fact data will be contained in one and only one table.  However, for performance reasons, many Data Warehouses persist similar data in special summarized fact tables that are typically referred to as Summary Fact tables.

For example, say we had an atomic POS fact table that was at the grain of an individual sale transaction.  But say we had one group of users who would like to analyze POS data rolled up to monthly levels with the ability to slice that data by states.  Let's say we had another group of users who would like to analyze POS data rolled up to weekly level with the ability to slide that data by state-crossing market boundaries (like MSA).  A common solution to this requirement in a Data Warehous would be to create 2 summary fact tables, containing slightly different versions of the same data:

Monthly, State, Summary Fact Table - Grain Month, State : Fact - Sales
Weekly, MSA, Summary Fact Table - Grain Week, MSA : Fact - Sales

This concept exists in Cassandra as well.  We try to duplicate data based on the grain of the query.  For example, say we had a POS system in which we wanted to lookup transactions by time of day and transactions by state, we would create two different tables in Cassandra:

Transactions by Time of Day Table - Grain Date/Time, Transaction ID : Data - Transaction Attributes
Transactions by State Table - Grain State, Transaction ID : Data - Transaction Attributes

Speaking in terms of grain in the Cassandra example may be a bit misleading as both tables have an atomic grain of Transaction ID, but to help communicate concepts in terms that DW data modelers will understand, we chose to use the concept of Grain for this example.

Hierarchical Data Access

Okay, so this one's probably a stretch, but it makes a point about table structure in Cassandra.

Cassandra has some strict rules for query access, which, if thought of as drill paths, in DW speak, enables a data modeler to quickly create the correct table layout in Cassandra.  

These rules are as follows (my summary of the "rules"):
1) Every table has to have at least 1 Partition Key.
    -  the partition key is the first field listed in a Primary Key and it is the value Cassandra uses to distribute data in a ring.
2)  Partition Keys must be included as the first element in a WHERE clause of a query and can only be accessed using the IN or = value, i.e. explicit filtering.
     - explicit filtering enables Cassandra to "find" the data on the correct nodes in a ring.
3)  Primary Key values that are declared after the Partition Key are called clustering columns and they determine the sort order of data contained in a CQL3 table.
4)  Clustering columns must be placed into a WHERE clause in the same order as they are declared in a CQL3 table Primary Key.
5)  Clustering columns can leverage WHERE clause conditions with the following operators =, <=, <, >, >=, IN
6)  Only attributes defined in the Primary Key can be included as predicates in WHERE clauses for Cassandra queries.

For example:
CREATE TABLE test (
  col1 int,
  col2 string,
  col3 string,
  col4 string,
  PRIMARY KEY (col1, col2, col3))
WITH CLUSTERING ORDER BY (col2 desc);

would store table based on col1, col2 in descending order, and col3 in ascending order.  This means that if we wanted to get the largest value for col2 for any particular value of col1 we could simply do something like the following:

SELECT * FROM test WHERE col1 = somevalue LIMIT 1;

We could not run the following query though.

SELECT * FROM test WHERE col2 = somevalue;

So how is this type of, strict, query access pattern similar to Data Warehousing.  Well, if your data warehouse leverages a BI tool that has strict drill hierarchies defined, without the ability to skip levels (i.e. use a DESCENDANTS OLAP clause), then you are using the same type of query access pattern that is modeled in Cassandra.

The lesson here is to model data in Cassandra just like you would create a strict drill path within a Data Warehouse BI or OLAP tool.   You can't get to col3 without specifying which values of col1 and col2 to "drill through" to get to the desired col3 value.

  

Store Pre-Aggregated Data

There is no way to aggregate data stored in Cassandra, other than limited COUNTS functionality.  This is, again, by design as Cassandra is a distributed system, preforming aggregations would be very costly.  I believe Jonathan Ellis wrote a nice blog on this topic but I can't seem to find it at the moment.

Anyway, the solution to this, if you would like to aggregate data, is to store data in an aggregated manner in Cassandra.  This is very similar to the Summary Fact table example mentioned above.  

This means that if you would like to see sums of POS transaction values per day, using the same POS example listed above, then you would want to aggregate this data in client code, or using Spark/Storm/or Map Reduce, and store the aggregated results in Cassandra for lighting fast data access.  
To us the stored aggregation components reminds us of the role of OLAP and how purely additive metrics are pre-calculated in OLAP solutions (specifically MOLAP) so results are returned very quickly.

Conclusion

We hope this post helps Dimensional Modelers and Data Warehousing data modelers wrap their head around modeling in Cassandra.  If you come from an ERD background, then go ask a Dimensional Modeler how they create data mdoels.  This will help you get into the right mindset for data modeling in Cassandra.