Wednesday, June 18, 2014

Impact of Shared Storage on Cassandra


Every now and then we receive the question of why shared storage isn’t recommended for Cassandra.  The conversation usually goes like this:

Customer/User – “We have an awesome SAN and would like to use it for Cassandra.”
DataStax – “We don’t recommend shared storage for Cassandra.”
Customer/User – “Why not.”
DataStax – “Two reasons really.  One – performance suffers.  Two – shared storage introduces a single point of failure into the architecture.”
Customer/User – “Our SAN is awesome and has never had any down time and can perform a kagillion IOPS.  So why exactly shouldn’t we use shared storage.”

Hopefully, this blog post will provide some data points around shared storage and performance that will dissuade users from leveraging shared storage with Cassandra. 

Single Point of Failure

There really isn’t anything to say about shared storage being a single point of failure.  If someone has a single shared storage device in their architecture and multiple Cassandra nodes are pointing at the shared storage device, then the shared storage device is a single point of failure.

Performance

Our Senior Evangelist likes to define performance as the combination of speed and stability.  For Cassandra, a lot of performance comes from the underlying disk system that is supporting Cassandra.  To put it plainly, performance in Cassandra is directly correlated to the performance of a Cassandra node's disk system.  

But why is that?  And, why can't a super-awesome shared storage device keep up with Cassandra.

To answer the question of why, let's take a look at the major (arguably) contributors to Cassandra disk io and measure throughput and latency.  We will then take a look at some real world statistics that show the aggregated behavior of Cassandra on a shared storage device to show the affects of shared storage with Cassandra.   All of these data points should show the reader that the load placed onto a storage device from a single node of Cassandra is large.  And, when multiple Cassandra nodes use the same storage device, the compounded effects from each individual node's disk io overwhelms the shared storage device.

Cassandra Disk Pressure

Cassandra uses disks heavily, specifically during writes, reads, compaction, repair (anti-entropy), and  bootstrap operations.  
  • Writes in Cassandra are preformed using a log structured storage model, i.e. sequential writes.  This allows Cassandra to ingest data much faster than traditional RDBMs systems.   The write path will put heavy io pressure on disks from Commit Log syncs as well as Memtable flushes to SSTables (disk).  
  • Compaction is the process in Cassandra that enables good read performance by combining (merge-sorting) multiple SStables into a single SSTable.  Since SSTables are immutable, this process puts a lot of pressure on disk io as SSTables are read from disk, combined  and written back to disk.  There are two main types of compaction and each has different io impact.
  • Reads in Cassandra will take advantage of caching for optimization, but when they hit disk, they put pressure on the disk system.  Each read operation against an SSTable is considered a single disk seek.  Sometimes a read operation will be required to touch more than one SSTable, therefore will experience multiple disk io operations.  If caches are missed during read operations, then disk io is even heavier as more SSTables are accessed to satisfy the read operation.
  • Repairs are the process in Cassandra that ensures data consistency across nodes in a cluster.  Repairs rely on both compaction and streaming, i.e. bulk ingestion of data, to compare and correct data replicas between nodes.  Repair is designed to work as a background process and not impact the performance of production clusters.  But, repair puts some stress of the disk systems because both compaction and ingestion occurs during the operation. 
  • Bootstrapping a node is the process of on-boarding a new, or replacing a dead, node in Cassandra.   When the node starts, data is streamed to the new node which persists all data to disk.  Heavy compaction occurs during this process.  Thus, there is a lot of pressure put onto a disk system during the bootstrap operation.
The above list represents a subset of the disk intensive operations that occur within Cassandra.  
What's important to understand about all of the disk io operations in Cassandra is that they are not only "heavy" in terms of IOPS (general rule of thumb is that Cassandra can consume 5,000 write operations per second per CPU code) but, arguably more importantly, they can also be very heavy in terms of throughput, i.e. MB/s.  It is very conceivable that the disk system of a single node in Cassandra would have to maintain disk throughput of at least 200 MB/s or higher per node.  Most shared storage devices tout IOPS but don't highlight throughput as stringently.  Cassandra will put both high IOPS as well as high throughput, depending on the use case, on disk systems.  Heavy throughput is a a major reason why almost all shared storage Cassandra implementations experience performance issues.

To reinforce this point, we preformed a simple Cassandra Stress, (Quorum writes, Replication x 3, 100 million keys, 100 cols per partition, disabled compaction throttling, increased concurrent writers) test on a 3 node EC2 cluster (M3.2XL nodes) and watched disk performance for a couple of hours via Opscenter, sar, and iostat.  

Here are some observations:
  • iostat - wMB/s as high as 300 with sustained loads well over 100
  • iostat - rMB/s (thanks to compaction) as high as 100 with sustained loads well over 50
  • Opscenter - max disk utilization peak as high as 81% with average around 40%
  • sar -d - wr_sec/s as high as 224,506 with sustained loads around 200,000
This was a small and simple test that showed the amount of load put onto disk systems during small operations.  Imagine this load amplified with a complex, real world workload and a production sized cluster (more than 3 nodes).  The compound effects of these operations could easily overwhelm shared storage devices.  We've actually overheard, though we won't name names, storage vendors recommending not running Cassandra on their devices.  

Here's a real world example of the behavior of a shared storage device with a production Cassandra cluster.  Recently while on site with a customer, who will remain anonymous to protect the innocent, we collected several data points that highlight a typical shared storage environment.  The statistics collected during this on site trip represent the majority of observations made when shared storage is used for a production Cassandra system.


The metrics collected here were collected with sar but are the same as collected by iostat.

As one can see by this simple table, we were observing the state of device io every 10 minutes.  We filtered the results to show two, 40 minute chunks of time.  This table provides some exceptional metrics on poor disk performance caused by the use of shared storage.  Yes, that is a 28, almost 29, second await. Cassandra actually considered this node "down" because it was unresponsive during the high await periods.  Also, the load is minimal compared to what we were able to produce using cassandra-stress.

Performance Issues 

When users chose to run Cassandra with shared storage devices, they should expect to experience any number of performance issues.  The following lists highlights a few potential, probable, performance issues that would be expected:
  • Atrocious read performance
  • Potential write performance issues
  • System instability (nodes appear to go offline and/or are “flapping”)
  • Client side failures on read and/or write operations
  • Flushwriters are blocked
  • Compactions are backed up
  • Nodes won’t start
  • Repair/Streaming won’t complete

Conclusion

There is one flavor of shared storage that we have seen used somewhat successfully.  In environments where virtualization is used, locally attached storage that is shared across local virtual machines isn’t “so” bad.  This is similar in concept to ephemeral storage in AWS. 

Regardless of the channel, cable, or other fancy feature the shared storage device may have, a shared storage device will not be able to keep up with the io demand placed onto it by Cassandra.

Simply put, shared storage cannot keep up with the amount of disk io placed onto a system from Cassandra.  It's not recommended.  Don't use shared storage with Cassandra and be happier for it.

Sunday, May 4, 2014

Measuring Page Cache Utilization in Cassandra with PCSTAT

Overview

The purpose of this post is to show how to leverage this great, little tool named pcstat that our colleague Al Tobey created to understand how well your OS page cache is being utilized with Cassandra.  The premise is that, Cassandra read performance will increase if reads are satisfied with page cache vs. from disk.

Process

We are going to measure and observe how well page cache tuning works with Cassandra 2.0.7.  To do this we will create a baseline measurement using pcstat and then tweak the following two parameters via the cassandra.yaml file and measure how well Cassandra leverages page cache:
1) populate_io_cache_on_flush
2) compaction_preheat_key_cache (default is true in 2.0)

Step 1 - Baseline

We created a very simple table for this experiment:

CREATE KEYSPACE test WITH replication = {
  'class': 'SimpleStrategy',
  'replication_factor': '1'
};

CREATE TABLE test (
  col1 text,
  col2 text,
  PRIMARY KEY (col1)
)

We then loaded some simple data in this table and preformed a flush.  Here's a look at the data and the results of the flush.

cqlsh:test> select * from test;

 col1 | col2
------+-----------------------------------------------------------------------------------------------------------------------------------------
    6 | This should hopefully be a lot of data to put in the column to fill up the value of the column so we can see what happens when testing.
   90 | This should hopefully be a lot of data to put in the column to fill up the value of the column so we can see what happens when testing.
   46 | This should hopefully be a lot of data to put in the column to fill up the value of the column so we can see what happens when testing.

nodetool flush

/var/lib/cassandra/data/test/test$ ls
test-test-jb-1-CompressionInfo.db  test-test-jb-1-Data.db  test-test-jb-1-Filter.db  test-test-jb-1-Index.db  test-test-jb-1-Statistics.db  test-test-jb-1-Summary.db  test-test-jb-1-TOC.

Here's the baseline after flushing
pcstat /var/lib/cassandra/data/test/test/*-Data.db
|----------------------------------------------------------+----------------+------------+-----------+---------|
| Name                                                     | Size           | Pages      | Cached    | Percent |
|----------------------------------------------------------+----------------+------------+-----------+---------|
| /var/lib/cassandra/data/test/test/test-test-jb-1-Data.db | 2370           | 1          | 0         | 000.000 |
|----------------------------------------------------------+----------------+------------+-----------+---------|

Here's the baseline after preforming some reads
pcstat /var/lib/cassandra/data/test/test/*-Data.db
|----------------------------------------------------------+----------------+------------+-----------+---------|
| Name                                                     | Size           | Pages      | Cached    | Percent |
|----------------------------------------------------------+----------------+------------+-----------+---------|
| /var/lib/cassandra/data/test/test/test-test-jb-1-Data.db | 2370           | 1          | 1         | 100.000 |
|----------------------------------------------------------+----------------+------------+-----------+---------|

As expected, the OS cached pages after reads, especially since this is one small SSTABLE. 

Step 2 - Change populate_io_cache_on_flush

In Step 1, we saw what we expected to see.  That is, a new SSTable was not cached until a read was preformed against that table.  Let's see if we can leverage Cassandra's populate_io_cache_on_flush to get sstables to cache on:
1) flush events (though these will be manual)
2) compaction events (though this will be manual)

First, we must make our populate_io_cache_on_flush change.  I added this field to the yaml and restarted Cassandra.

Now we will write (overwrite) some data and preform a munal flush.

Here are the new SSTables for this table.

/var/lib/cassandra/data/test/test$ ls
test-test-jb-1-CompressionInfo.db  test-test-jb-1-Index.db       test-test-jb-1-TOC.txt             test-test-jb-2-Filter.db      test-test-jb-2-Summary.db
test-test-jb-1-Data.db             test-test-jb-1-Statistics.db  test-test-jb-2-CompressionInfo.db  test-test-jb-2-Index.db       test-test-jb-2-TOC.txt
test-test-jb-1-Filter.db           test-test-jb-1-Summary.db     test-test-jb-2-Data.db             test-test-jb-2-Statistics.db

And here are the results of the pcstat test

|----------------------------------------------------------+----------------+------------+-----------+---------|
| Name                                                     | Size           | Pages      | Cached    | Percent |
|----------------------------------------------------------+----------------+------------+-----------+---------|
| /var/lib/cassandra/data/test/test/test-test-jb-1-Data.db | 2370           | 1          | 1         | 100.000 |
| /var/lib/cassandra/data/test/test/test-test-jb-2-Data.db | 2107           | 1          | 0         | 000.000 |
|----------------------------------------------------------+----------------+------------+-----------+---------|

Yikes, what happened here.   The expectation was that our results would populate cache on flush.  Very interesting. 

Let's see if a major compaction causes the new sstable to cache.

nodetool compact

After the compaction we see the newly created SSTable.

ls
test-test-jb-3-CompressionInfo.db  test-test-jb-3-Data.db  test-test-jb-3-Filter.db  test-test-jb-3-Index.db  test-test-jb-3-Statistics.db  test-test-jb-3-Summary.db  test-test-jb-3-TOC.txt

Now, lets see if the newly created SSTable was cached.

 pcstat /var/lib/cassandra/data/test/test/*-Data.db
|----------------------------------------------------------+----------------+------------+-----------+---------|
| Name                                                     | Size           | Pages      | Cached    | Percent |
|----------------------------------------------------------+----------------+------------+-----------+---------|
| /var/lib/cassandra/data/test/test/test-test-jb-3-Data.db | 2368           | 1          | 0         | 000.000 |
|----------------------------------------------------------+----------------+------------+-----------+---------|

Again, no luck.  We know that populate_io_cache_on_flush was removed from the .yaml file in 2.0 and have heard that it wasn't fully honored by Cassandra.  With more time and investigation we could figure out exactly what's occurring.

BTW: Here's an interesting article about cache changes in 2.1

Big picture, thanks to Al, we now have the ability to monitor cache at a granular level for Cassandra files.  Very handy.

Step 3 - Change preheat_kernel_page_cache

So, let's take a look at the affects of tweaking another page cache setting, though this one is new to Cassandra, preheat_kernel_page_cache.

After making this change to our little node and restarting, we see that the output of pcstat is still 0% cached.

pcstat /var/lib/cassandra/data/test/test/*-Data.db
|----------------------------------------------------------+----------------+------------+-----------+---------|
| Name                                                     | Size           | Pages      | Cached    | Percent |
|----------------------------------------------------------+----------------+------------+-----------+---------|
| /var/lib/cassandra/data/test/test/test-test-jb-3-Data.db | 2368           | 1          | 0         | 000.000 |
|----------------------------------------------------------+----------------+------------+-----------+---------|

Let's add some more data, flush and measure results.

Here is the output of ls after the flush.

ls
test-test-jb-3-CompressionInfo.db  test-test-jb-3-Index.db       test-test-jb-3-TOC.txt             test-test-jb-4-Filter.db      test-test-jb-4-Summary.db
test-test-jb-3-Data.db             test-test-jb-3-Statistics.db  test-test-jb-4-CompressionInfo.db  test-test-jb-4-Index.db       test-test-jb-4-TOC.txt
test-test-jb-3-Filter.db           test-test-jb-3-Summary.db     test-test-jb-4-Data.db             test-test-jb-4-Statistics.db

And, here is what pcstat shows.

pcstat /var/lib/cassandra/data/test/test/*-Data.db
|----------------------------------------------------------+----------------+------------+-----------+---------|
| Name                                                     | Size           | Pages      | Cached    | Percent |
|----------------------------------------------------------+----------------+------------+-----------+---------|
| /var/lib/cassandra/data/test/test/test-test-jb-3-Data.db | 2368           | 1          | 0         | 000.000 |
| /var/lib/cassandra/data/test/test/test-test-jb-4-Data.db | 1918           | 1          | 0         | 000.000 |
|----------------------------------------------------------+----------------+------------+-----------+---------|

Now, let's issue a read and see what happens.  The workload we are using for this exercise is update-centric, so a single partition should exist across multiple SSTables.

Here's the output of pcstat, showing both SSTables are now cached.
pcstat /var/lib/cassandra/data/test/test/*-Data.db
|----------------------------------------------------------+----------------+------------+-----------+---------|
| Name                                                     | Size           | Pages      | Cached    | Percent |
|----------------------------------------------------------+----------------+------------+-----------+---------|
| /var/lib/cassandra/data/test/test/test-test-jb-3-Data.db | 2368           | 1          | 1         | 100.000 |
| /var/lib/cassandra/data/test/test/test-test-jb-4-Data.db | 1918           | 1          | 1         | 100.000 |
|----------------------------------------------------------+----------------+------------+-----------+---------|

Let's check cfhistograms to ensure we really did read 2 SSTables.

SSTables per Read
2 sstables: 3

Yep, we queried 3 rows, using an IN clause, and can see that 2 sstable were accessed.

For grins, we will issue a compaction and see if the newly created SSTable is cached.

The new SSTable is not compacted.
pcstat /var/lib/cassandra/data/test/test/*-Data.db
|----------------------------------------------------------+----------------+------------+-----------+---------|
| Name                                                     | Size           | Pages      | Cached    | Percent |
|----------------------------------------------------------+----------------+------------+-----------+---------|
| /var/lib/cassandra/data/test/test/test-test-jb-5-Data.db | 2373           | 1          | 0         | 000.000 |
|----------------------------------------------------------+----------------+------------+-----------+---------|


Conclusion

OS Page Cache is a very valuable tool to ensure performant read operations.  However, understanding exactly when and how it's utilized has historically been a bit of a mystery.  Thanks to Al Tobey, we have an easy way to precisely measure when, where, and how much page cache is being utilized during Cassandra operations. 


Wednesday, February 26, 2014

Bulk Integration Into and Out of Cassandra CQL3 Data Models


Purpose

The purpose of this blog is to provide an aggregated view of batch/bulk techniques available for integrating with Apache Cassandra with a CQL3 based data models, including Cassandra 1.2 and 2.0 (DSE 3.2 and 4.0).  This blog post will be augmented overtime as techniques evolve. 

Background

As long as people store data digitally, there will be a need to move large "chunks" of data between systems.  This could be for reasons of:
  • aggregation and combination (i.e. Analytical systems) where multiple sources of data are combined and queries
  • archive purposes (removal of old and non needed data)
  • data migration projects
  • large scale data model changes
  • database transition initiatives
  • other
This blog will provide a comprehensive view of bulk integration techniques when moving data into and out of Cassandra: 
  • with links to code samples
  • tips and tricks
  • recommended use cases
Information will be presented using a matrix view that lists different techniques as rows.  The matrix will contain 2 columns, one for loading data into Cassandra from a source (Into Cassandra) and the other for loading data from Cassandra into a source (Out of Cassandra).   "A source" generally means an RDBMS system, flat file, Hadoop, mainframe, etc.  The cells of this matrix will contain a summary for the row topic as well as links to additional posts that contain details supporting the summary.

We will work to provide a low latency matrix in a subsequent post to help with near real time integration/data pipe-lining techniques.

Batch Integration Matrix

 

Batch Technique
Into Cassandra
Out of Cassandra
Cassandra Bulk Loader (SSTableLoader)*
·  Loads SSTables into a Cassandra ring from a node or set of nodes that are not actively part of the ring.
·  Good option for migration, cluster generation, etc
·  Enables parallel loading of data
·  Should be executed on a set of “non cluster” nodes for optimal performance.
·  Requires creation of SSTables (see below)
·  CQL3 Support depends on the creation of SSTables
·  Counters may not be fully supported
·  Leverages Cassandra internal streaming
·  After completion, run a repair
n/a
Bulk Loading via JMX*
·  JMX based utility to stream sstable data into Cassandra.
·  Leverages same code functionality as SSTableLaoder
·  Enables loading of data from a cassandra node into the same node without requiring the configuration of a separate network interface, which is required for SSTableLaoder on the same node
·  Same requirements and limitations of SSTableLoader.
·  Source and Documentation (look for bulkLoad(String))
n/a
Copy SSTables
Into Data Directory 
·  Loads SSTables into a Cassandra ring
·  Good option for migration, cluster generation, etc
·  Requires creation of SSTables (see below)
·  Could leverage Snapshot SSTables for migration purposes
·  CQL3 Support depends on the structure of SSTables.
·  Working example and blog to be provided soon
Out of Data Directory
·  Get access to SSTable data with minimal production system impact
·  Can be used as a source to populate another cluster
ColumnFamily<>Format
ColumnFamilyOutputFormat
·  Thrift based driver to load data into Cassandra Thrift based column families
·  Not compatible with CQL3 tables
·  Example
ColumnFamilyInputFormat
·  Thrift based driver to extract data out of Cassandra Thrift based column families
·  Not compatible with CQL3 tables
·  Example
BulkOutputFormat
·  Thirft based tool/no CQL3 support (CQL3 support has to be creatively programmed using composites similar to the SSTable loader techniques for the Simple and SimpleUnsorted SSTable Wrtiers
·  Used to stream data from MR to Cassandra
·  Similar to Bulk Loading above but no need for a “fat client”, i.e. cassandra node to execute process.
·  Can be used with Pig
n/a
CQL<>Format
CQLOutputFormat
·  CQL3 based driver to load data into Cassandra CQL3 tables
·  This is not necessarily a “bulk loader”
·  Example
CQLPagingInputFormat
·  CQL3 based driver to extract data out of Cassandra CQL3 tables
·  This is not necessarily a “bulk loader”
·  Example
CQL3 Statements via M/R
·  Have heard that several users simply create Map only jobs and insert data into Cassandra leveraging a java driver, like the DSE java driver.
·  Example
n/a
CQL3 Batch Statements
·  Batch statements group individual statements into single operations and can be used for bulk-like integration processes.
·  Use the UNLOGGED if performance is a concern, though you lose atomicity
n/a
ETL Tools - Pentaho
·  Visual, ETL approach for the bulk integration of Cassandra data with other sources.
·  Pentaho Data Integration 5 supports CQL3 (JIRA)
·  We have not had the chance to test this approach but will do so soon.
·  Working example and blog to be provided soon
·  Visual, ETL approach for the bulk integration of Cassandra data with other sources.
·  Pentaho Data Integration 5 supports CQL3 (JIRA)
·  We have not had the chance to test this but will do so soon.
·  Working example and blog to be provided soon
ETL Tools - JasperSoft
·  Visual, ETL approach for the bulk integration of Cassandra data with other sources.
·  Jaspersoft Studio with the Cassandra Connector v 1.0 supports CQL3 (Release)
·  We have not had the chance to test this approach but will do so soon.
·  Working example and blog to be provided soon
·  Visual, ETL approach for the bulk integration of Cassandra data with other sources.
·  Jaspersoft Studio with the Cassandra Connector v 1.0 supports CQL3 (Release)
·  We have not had the chance to test this approach but will do so soon.
·  Working example and blog to be provided soon
ETL Tools - Talend
·  Visual, ETL approach for the bulk integration of Cassandra data with other sources.
·  We are still investigating what Talend supports with regards to Cassandra.  We did notice that it appears as if Talend would like to support the SSTableLoader mentioned above. (JIRA)
·  We have not had the chance to test this approach but will do so soon.
·  Working example and blog to be provided soon
·  Visual, ETL approach for the bulk integration of Cassandra data with other sources.
·  We are still investigating what Talend supports with regards to Cassandra.  We did notice that it appears as if Talend would like to support the SSTableLoader mentioned above. (JIRA)
·  We have not had the chance to test this approach but will do so soon.
·  Working example and blog to be provided soon
Sqoop (DSE version)
·  Offers a good tool for bulk/batch integration between many different databases, hadoop, and Cassandra.  
·  The DSE 3.X version does not as yet support CQL3, but this should be coming in DSE 4.X.
·  This is a very promising feature and we will update once DataStax releases the CQL3 support.
·  For CQL2, here is a write up with a working example
·  Offers a good tool for bulk/batch integration between many different databases, hadoop, and Cassandra.  
·  The DSE 3.X version does not as yet support CQL3, but this should be coming in DSE 4.X.
·  This is a very promising feature and we will update once DataStax releases the CQL3 support.
·  For CQL2, here is a write up with a working example
CQLSH Copy
·  Good tool for bulk loading a small amount of data, less than 1 million records is the recommendation.
·  Uses .csv files as import sources only
·  Good tool for writing out a small amount of data, less than 1 million records is the recommendation, to a cs file.
·  Uses .csv files as export source only
Flume Integration
·  Found a project on GitHub that offers Flume integration into Cassandra.  
·  Currently this is Hector based.  


*Requires SSTable Creation

*SSTable Writers

In order to load data into Cassandra using the bulk loading technique, denoted by *, SSTables need to be generated to load.  The following table provides information on the available techniques for the creating of SSTables.

Batch Technique
Overview
Limitations
CQL3 Support
CQLSSTableWriter
·  Fully supports CQL3 compatible SSTable creation
·  Contained in C* 2.X and higher
Full
SSTableSimpleUnsortedWriter
·  Generates SSTables simply and easily using partitioned order
·  Does not inherently support CQL3
·  CQL3 support can be created using composite types
·  DataStax Example (Non CQL)
·  Limited CQL Support
·  More complex CQL configuration compared to CQLSSTableWriter
Limited
SSTableSimpleWriter
·  Generates SSTables but not in sorted order, requires data be added in partition sorted order
·  Not recommended for use unless there is a specific use case.
·  Requires inserting data in partition sorted order
Limited

Conclusion

This list is meant to be a comprehensive guide.  Please let us know if we missed anything or if you have feedback on specific techniques.  Hopefully this post provides value to people who are analyzing different techniques to move large chunks of data into or out of Cassandra.

Friday, February 21, 2014

DataStax AMI EC2 Cassandra JMX Configuration for Jconsole

Purpose

Provide a quick reference when configuring the DataStax AMI for JConsole access.  Datastax is working on updating their documentation for this, but in the mean time, here you go:

Steps

1)  Change the cassandra-env.sh to provide a jmx hostname like the following:
JVM_OPTS="$JVM_OPTS -Djava.rmi.server.hostname=<ec2 public name> like ec2-xx-xxx-xxx-xxxx.us-west-1.compute.amazonaws.com"

2) Open up your Security Group to allow access to port 7199 (default Cassandra JMX port) as well as 1024 - 65535 for the machine that you are connecting from.

3)  Start JConsole using the following service:jmx:rmi://ec2-xx-xxx-xxx-xxxx.us-west-1.compute.amazonaws.com:7199/jndi/rmi://ec2-xx-xxx-xxx-xxxx.us-west-1.compute.amazonaws.com:7199/jmxrmi

Note: use an insecure connection if ssl is disabled (what i did)

That should do it.

Monday, February 10, 2014

Cassandra 1.2.X Secondary Indexes, Observations

Goal

The purpose of this post is to share observations on why secondary indexes in Cassandra 1.2.X should only be used in specific use cases.

Background


For background, here are two good write-ups on secondary indexes in Cassandra:

1) Original DataStax blog entry introducing secondary indexes by Jonathan Ellis
2) Indexing in Cassandra

Our view on secondary indexes is that they provide a good mechanism to short-circuit the restrictions of CQL3 data modeling to retrieve data in very specific situations.  Secondary indexes are not a panacea to all data modeling challenges in CQL3 and should be used very selectively.

Check out the 2nd article listed above for the "official" guidelines for when to, and not to, use secondary indexes.

Observations

Our take is that secondary indexes should be used to enable read queries;
1) on columns that contain a small set of possible values (low cardinality) that span many partitions
2) and where the indexed column is queried often
3) and data in the table is not often updated or deleted
3) and including the tables (cf's) partition index in the query is undesirable
4) and when explicitly creating another CQL3 table is undesirable

Narrow use-case, huh?

So, why are secondary indexes recommended for very specific use cases?  It's because a secondary index creates a second, hidden, table "under the covers".  Looking at the contents of the hidden table on disk, using SSTable2Jason, you can clearly see what's happening when you create a secondary index.

Here's an example, using a little POC schema,  complete with a type in the table name.

CREATE TABLE ptoducts2 (
  country_cd text,
  base_div_nbr int,
  sub_div text,
  store_nbr int,
  store_type text,
  att1 text,
  att2 text,
  PRIMARY KEY ((country_cd, base_div_nbr), sub_div, store_nbr, store_type)
) WITH
  bloom_filter_fp_chance=0.010000 AND
  caching='KEYS_ONLY' AND
  comment='' AND
  dclocal_read_repair_chance=0.000000 AND
  gc_grace_seconds=864000 AND
  read_repair_chance=0.100000 AND
  replicate_on_write='true' AND
  populate_io_cache_on_flush='false' AND
  compaction={'class': 'SizeTieredCompactionStrategy'} AND
  compression={'sstable_compression': 'SnappyCompressor'};


CREATE INDEX att1 ON ptoducts2 (att1);

Looking at the data files for this table we see:

total 64
4 -rw-r--r-- 1 root root   46 Feb 10 06:20 demo2-ptoducts2.att1-ic-1-CompressionInfo.db
4 -rw-r--r-- 1 root root  159 Feb 10 06:20 demo2-ptoducts2.att1-ic-1-Data.db
4 -rw-r--r-- 1 root root   16 Feb 10 06:20 demo2-ptoducts2.att1-ic-1-Filter.db
4 -rw-r--r-- 1 root root   37 Feb 10 06:20 demo2-ptoducts2.att1-ic-1-Index.db
8 -rw-r--r-- 1 root root 4355 Feb 10 06:20 demo2-ptoducts2.att1-ic-1-Statistics.db
4 -rw-r--r-- 1 root root   73 Feb 10 06:20 demo2-ptoducts2.att1-ic-1-Summary.db
4 -rw-r--r-- 1 root root   79 Feb 10 06:20 demo2-ptoducts2.att1-ic-1-TOC.txt
4 -rw-r--r-- 1 root root   46 Feb 10 06:20 demo2-ptoducts2-ic-1-CompressionInfo.db
4 -rw-r--r-- 1 root root  300 Feb 10 06:20 demo2-ptoducts2-ic-1-Data.db
4 -rw-r--r-- 1 root root   16 Feb 10 06:20 demo2-ptoducts2-ic-1-Filter.db
4 -rw-r--r-- 1 root root   78 Feb 10 06:20 demo2-ptoducts2-ic-1-Index.db
8 -rw-r--r-- 1 root root 4357 Feb 10 06:20 demo2-ptoducts2-ic-1-Statistics.db
4 -rw-r--r-- 1 root root   96 Feb 10 06:20 demo2-ptoducts2-ic-1-Summary.db

4 -rw-r--r-- 1 root root   79 Feb 10 06:20 demo2-ptoducts2-ic-1-TOC.txt

The bolded, added fro emphasis, files represent the hidden table that was created with the CREATE INDEX statement.

Even more interesting is the content of this hidden table.

Here are the "rows" that exist in the ptoducts2 table:

cqlsh:demo2> select * from ptoducts2;








Here are the rows returned for a specific value of the secondary index:

cqlsh:demo2> SELECT * FROM ptoducts2 WHERE att1 = 'test2';





And here is the output of SSTable2Jason for the hidden table:

[
{"key": "74657374","columns": [["000275730000040000000100:us1:1:test","",1392040302405000], ["000275730000040000000100:us1:2:test","",1392040376314000], ["000275730000040000000100:us1:3:test","",1392040376326000], ["000275730000040000000100:us2:1:test","",1392040376339000], ["00026d780000040000000100:us1:1:test","",1392040377513000]]},
{"key": "7465737432","columns": [["000275730000040000000200:us1:1:test","",1392041431384000]]}
]

Looking at the actual data stored on disk shows that this index contains a partition for each unique value of the secondary index column,  Just imagine what the secondary index data file would look like if you create a secondary index on a column that doesn't adhere to the guidelines listed above.

Conclusion

Now that you can see exactly what's happening with secondary indexes in Cassandra 1.2.X be sure to think through the use cases for when to leverage this mechanism.  Secondary indexes do enable a lot of "ease of use" query functionality but there's a cost.  Be conscious of the cost of this mechanism and use it appropriately.  Make good, sound architectural and design trade-off type decisions when deciding to use secondary indexes.

We still prefer creating explicit tables to handle "index" scenarios.  Hopefully, you find the right mix for your scenario.


Friday, February 7, 2014

Cassandra CQL3 Data Modeling for Reformed Dimensional Modelers

Goal


This post will present Cassandra data modeling practices in dimensional modeling terms.  This post is targeted at those individuals who come from a Data Warehousing background and find themselves being asked to generate data models in a Cassandra environment.  The goal of this post is to help those with Dimensional Modeling backgrounds wrap their brain around modeling in Cassandra.

Please keep in mind that we are deliberately trying to find similarities between modeling in Cassandra and modeling for Data Warehouses/Marts.  Also, keep in mind that we were recently doing a lot of Data Warehousing/BI/batch integration work prior to jumping head first into Cassandra-land.

Also, please keep in mind that this post is not meant to be an exhaustive review of Cassandra data modeling techniques and best practices.  Please refer to Patrick McFadin's data modeling presentations for more in depth Cassandra modeling guidelines.  Patrick's presentations are fantastic!

A bit of background

Cassandra and Data Warehousing use cases differ greatly, but the data modeling guidelines used in both technology genres are very similar, or can be if you squint your eyes a bit.

Use Cases:

Cassandra - typically referred to as online or transactional (here's a link to the target use cases for Cassandra, just click on Use Cases), think lots and lots of small amounts of data flowing into a system very quickly (milliseconds or less).

Data Warehousing (used loosely) - typically referred to as analytical or batch, think lots of large amounts of data flowing into a system at periodic times throughout the day using a data processing procedure that lasts minutes to hours.

Data Modeling Guidelines:

Cassandra
  • Disk space is cheap, writes are blazing fast (thanks log structured storage and sequential writes)
  • Align table layout to satisfy read patterns
  • Duplicate data to satisfy different versions of read operations
  • Eliminate joins if possible (materialized views in Cassandra-speak)
  • Leverage unique keys for data distributions (partition keys)
  • Leverage cascading secondary keys for column clustering and dynamic schemas (cluster keys)
  • Access data through cascading key structure (a.k.a. hierarchical data access)
  • No aggregations dues to distributed architecture (store pre-aggregated data)
Data Warehousing (more specifics can be found here ...warning possible TL;DR..just like this blog)
  • Align table layout to satisfy read patterns, ad hoc requests, and reporting structure
  • Duplicate data to simplify read access patterns (think Slowly Changing Dimensions (type II or higher)
  • Leverage unique keys to identify unique combinations of dimensional attributes (surrogate keys)
  • Nest cascading hierarchies in dimension attributes to enable "drill" capabilities in BI layer
  • Access data through cascading hierarchical attributes using GROUP BY clauses
  • For query performance, limit joining to simple joins between facts and dimensions (avoid snow-flaking)
  • For performance pre-aggregate data when applicable and store in OLAP cubes, in memory layer, or summary fact tables
Hopefully you can spot a few common themes that we would like to explore in this blog, mainly:
  1. read based table design
  2. data duplication for read performance
  3. hierarchical data access
  4. storing pre-aggregated data

Read Based Table Design

Okay, so if we could provide only one piece of advice to Cassandra data modelers with a DW background it would be to approach a data modeling problem in Cassandra as you would in Data Warehousing.  Meaning, understand what queries are going to be executed against the data stored in Cassandra to guide modeling.  This same techniques can be used during requirements gathering exercises of Data Warehouse projects, i.e. meet with end users of the system and analyze how the end users would like to query the data.  This is a different approach to modeling data in a traditional OLTP type of system.

Note that in Cassandra the read use cases will probably be a lot smaller, more targeted, and won't require ad-hoc query design practices.  In other words, the read requirements for Cassandra backed systems will probably be a bit simpler compared to a DW system.  So even though you should start with understanding how data should be queried in Cassandra, don't overthink this step.  Also note that you are probably only dealing with one subject area, i.e. step number 1 in Dimensional Modeling.  It's going to be straightforward, or at least it should be straightforward, let it be straightforward.

Start with understanding the read access patterns like you would in a Data Warehouse project and you're more than halfway to creating the right data model in Cassandra. 

Data Duplication for Read Performance

Duplicating data by repeating values in one table:  

This is where Cassandra and Dimension data modeling share similarities.  We are assuming that reads of this post have all created dimensions.  Dimensions duplicate data to enable easier query access.  For example if I had a location dimension, I would repeat the State in every row for a City.  The dimension might look something like the following:

Dimension Name : Location
Key : Location Key
Attributes: City, State

For every City, the state value will be duplicated.

The same concept exists in Cassandra.  You should duplicate data in Cassandra to match the read design pattern.  Do this to eliminate joins, just like in a DW system.  

There is a slight nuance here between Cassandra and DW systems.  In DW systems, joins slow things down just a bit (or worse a lot).  In Cassandra some joins are possible because of the distributed architecture (i.e. the database lives on many physical servers, nodes).  Therefore joins occur at the client level.

Duplicating data by repeating data in multiple tables:

This is where Cassandra and Fact data modeling share similarities.  Think of modeling in Cassandra as you would modeling multiple versions of Summary fact tables.  At the atomic level all fact tables should be unique, and as my friend Leo would argue should be business process aligned.  This means at the atomic level of a Data Warehouse, fact data will be contained in one and only one table.  However, for performance reasons, many Data Warehouses persist similar data in special summarized fact tables that are typically referred to as Summary Fact tables.

For example, say we had an atomic POS fact table that was at the grain of an individual sale transaction.  But say we had one group of users who would like to analyze POS data rolled up to monthly levels with the ability to slice that data by states.  Let's say we had another group of users who would like to analyze POS data rolled up to weekly level with the ability to slide that data by state-crossing market boundaries (like MSA).  A common solution to this requirement in a Data Warehous would be to create 2 summary fact tables, containing slightly different versions of the same data:

Monthly, State, Summary Fact Table - Grain Month, State : Fact - Sales
Weekly, MSA, Summary Fact Table - Grain Week, MSA : Fact - Sales

This concept exists in Cassandra as well.  We try to duplicate data based on the grain of the query.  For example, say we had a POS system in which we wanted to lookup transactions by time of day and transactions by state, we would create two different tables in Cassandra:

Transactions by Time of Day Table - Grain Date/Time, Transaction ID : Data - Transaction Attributes
Transactions by State Table - Grain State, Transaction ID : Data - Transaction Attributes

Speaking in terms of grain in the Cassandra example may be a bit misleading as both tables have an atomic grain of Transaction ID, but to help communicate concepts in terms that DW data modelers will understand, we chose to use the concept of Grain for this example.

Hierarchical Data Access

Okay, so this one's probably a stretch, but it makes a point about table structure in Cassandra.

Cassandra has some strict rules for query access, which, if thought of as drill paths, in DW speak, enables a data modeler to quickly create the correct table layout in Cassandra.  

These rules are as follows (my summary of the "rules"):
1) Every table has to have at least 1 Partition Key.
    -  the partition key is the first field listed in a Primary Key and it is the value Cassandra uses to distribute data in a ring.
2)  Partition Keys must be included as the first element in a WHERE clause of a query and can only be accessed using the IN or = value, i.e. explicit filtering.
     - explicit filtering enables Cassandra to "find" the data on the correct nodes in a ring.
3)  Primary Key values that are declared after the Partition Key are called clustering columns and they determine the sort order of data contained in a CQL3 table.
4)  Clustering columns must be placed into a WHERE clause in the same order as they are declared in a CQL3 table Primary Key.
5)  Clustering columns can leverage WHERE clause conditions with the following operators =, <=, <, >, >=, IN
6)  Only attributes defined in the Primary Key can be included as predicates in WHERE clauses for Cassandra queries.

For example:
CREATE TABLE test (
  col1 int,
  col2 string,
  col3 string,
  col4 string,
  PRIMARY KEY (col1, col2, col3))
WITH CLUSTERING ORDER BY (col2 desc);

would store table based on col1, col2 in descending order, and col3 in ascending order.  This means that if we wanted to get the largest value for col2 for any particular value of col1 we could simply do something like the following:

SELECT * FROM test WHERE col1 = somevalue LIMIT 1;

We could not run the following query though.

SELECT * FROM test WHERE col2 = somevalue;

So how is this type of, strict, query access pattern similar to Data Warehousing.  Well, if your data warehouse leverages a BI tool that has strict drill hierarchies defined, without the ability to skip levels (i.e. use a DESCENDANTS OLAP clause), then you are using the same type of query access pattern that is modeled in Cassandra.

The lesson here is to model data in Cassandra just like you would create a strict drill path within a Data Warehouse BI or OLAP tool.   You can't get to col3 without specifying which values of col1 and col2 to "drill through" to get to the desired col3 value.

  

Store Pre-Aggregated Data

There is no way to aggregate data stored in Cassandra, other than limited COUNTS functionality.  This is, again, by design as Cassandra is a distributed system, preforming aggregations would be very costly.  I believe Jonathan Ellis wrote a nice blog on this topic but I can't seem to find it at the moment.

Anyway, the solution to this, if you would like to aggregate data, is to store data in an aggregated manner in Cassandra.  This is very similar to the Summary Fact table example mentioned above.  

This means that if you would like to see sums of POS transaction values per day, using the same POS example listed above, then you would want to aggregate this data in client code, or using Spark/Storm/or Map Reduce, and store the aggregated results in Cassandra for lighting fast data access.  
To us the stored aggregation components reminds us of the role of OLAP and how purely additive metrics are pre-calculated in OLAP solutions (specifically MOLAP) so results are returned very quickly.

Conclusion

We hope this post helps Dimensional Modelers and Data Warehousing data modelers wrap their head around modeling in Cassandra.  If you come from an ERD background, then go ask a Dimensional Modeler how they create data mdoels.  This will help you get into the right mindset for data modeling in Cassandra.