Skip to content

Apache Cassandra

Introduction

Apache Cassandra is a highly scalable, distributed NoSQL database designed to handle massive amounts of data across many commodity servers with no single point of failure. Originally developed at Facebook for inbox search and later open-sourced through Apache, Cassandra combines the distributed architecture of Amazon's Dynamo with the data model of Google's Bigtable, making it a cornerstone technology for applications requiring high availability, linear scalability, and tunable consistency.

Core Concepts

Distributed Architecture

Cassandra uses a peer-to-peer ring architecture where every node is identical — there are no master or slave nodes. Data is distributed across the cluster using consistent hashing, where each node is responsible for a range of tokens on the ring. This eliminates single points of failure and allows the cluster to scale horizontally by simply adding nodes.

Partitioning and Replication

When data is written, the partition key is hashed to determine which node owns that data. Cassandra then replicates the data to additional nodes based on the replication factor (RF). With RF=3, three copies of each piece of data exist across different nodes. The replication strategy determines how replicas are placed:

  • SimpleStrategy: Places replicas on consecutive nodes in the ring (suitable for single datacenter).
  • NetworkTopologyStrategy: Places replicas across different racks and datacenters for fault tolerance.

Tunable Consistency

Cassandra offers tunable consistency per operation. You choose how many replicas must acknowledge a read or write before it's considered successful:

Consistency LevelDescription
ONEOnly one replica must respond
QUORUMMajority of replicas (RF/2 + 1) must respond
LOCAL_QUORUMQuorum within the local datacenter
EACH_QUORUMQuorum in each datacenter
ALLAll replicas must respond
ANYA write must be stored on at least one node (including hinted handoff)

Strong consistency is achieved when: R + W > RF (read consistency + write consistency > replication factor).

Data Model

Cassandra's data model is based on wide column stores. Data is organized into:

  • Keyspace: Top-level namespace (analogous to a database in RDBMS), defines replication strategy.
  • Table: Contains rows organized by a primary key.
  • Primary Key: Composed of a partition key (determines data distribution) and optional clustering columns (determine sort order within a partition).
  • Partition: The fundamental unit of data distribution; all rows sharing the same partition key are stored together on the same node.

Write Path

Cassandra's write path is optimized for speed. Every write goes through these stages:

  1. Commit Log: Write is appended to an on-disk commit log for durability.
  2. Memtable: Write is stored in an in-memory data structure.
  3. SSTable Flush: When the memtable reaches a threshold, it's flushed to disk as an immutable SSTable (Sorted String Table).
  4. Compaction: Background process merges multiple SSTables, removing tombstones (deleted data markers) and consolidating data.

Read Path

Reads are more complex because data may exist across memtables and multiple SSTables:

  1. Check the memtable for the most recent data.
  2. Check row cache (if enabled).
  3. Check Bloom filter for each SSTable to determine if the partition might exist.
  4. Check partition key cache and partition summary to locate data on disk.
  5. Fetch from the SSTable compression offset map and read from disk.
  6. Merge results from all sources, returning the most recent version.

Implementation: Java Application with Cassandra

Setting Up the DataStax Java Driver

The DataStax Java Driver is the standard client library for connecting to Cassandra from Java applications.

Maven Dependencies:

xml
<dependencies>
    <dependency>
        <groupId>com.datastax.oss</groupId>
        <artifactId>java-driver-core</artifactId>
        <version>4.17.0</version>
    </dependency>
    <dependency>
        <groupId>com.datastax.oss</groupId>
        <artifactId>java-driver-query-builder</artifactId>
        <version>4.17.0</version>
    </dependency>
</dependencies>

Complete CRUD Application

java
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.CqlIdentifier;
import com.datastax.oss.driver.api.core.cql.*;
import com.datastax.oss.driver.api.core.type.DataTypes;
import com.datastax.oss.driver.api.querybuilder.SchemaBuilder;
import com.datastax.oss.driver.api.querybuilder.QueryBuilder;
import com.datastax.oss.driver.api.core.ConsistencyLevel;
import com.datastax.oss.driver.api.core.servererrors.*;

import java.net.InetSocketAddress;
import java.time.Instant;
import java.util.UUID;
import java.util.List;
import java.util.ArrayList;

public class CassandraOrderService {

    private final CqlSession session;
    private PreparedStatement insertOrderStmt;
    private PreparedStatement selectOrdersByCustomerStmt;
    private PreparedStatement selectSingleOrderStmt;
    private PreparedStatement deleteOrderStmt;

    public CassandraOrderService(String contactPoint, int port, String datacenter) {
        this.session = CqlSession.builder()
                .addContactPoint(new InetSocketAddress(contactPoint, port))
                .withLocalDatacenter(datacenter)
                .build();
        initializeSchema();
        prepareStatements();
    }

    private void initializeSchema() {
        // Create keyspace with NetworkTopologyStrategy
        session.execute(
            "CREATE KEYSPACE IF NOT EXISTS ecommerce " +
            "WITH replication = {'class': 'NetworkTopologyStrategy', 'datacenter1': 3}"
        );

        session.execute("USE ecommerce");

        // Create table designed for query: "Get all orders for a customer, sorted by date"
        session.execute(
            "CREATE TABLE IF NOT EXISTS orders_by_customer (" +
            "  customer_id UUID," +
            "  order_date TIMESTAMP," +
            "  order_id UUID," +
            "  product_name TEXT," +
            "  quantity INT," +
            "  total_price DECIMAL," +
            "  status TEXT," +
            "  PRIMARY KEY ((customer_id), order_date, order_id)" +
            ") WITH CLUSTERING ORDER BY (order_date DESC, order_id ASC)"
        );

        System.out.println("Schema initialized successfully.");
    }

    private void prepareStatements() {
        insertOrderStmt = session.prepare(
            "INSERT INTO ecommerce.orders_by_customer " +
            "(customer_id, order_date, order_id, product_name, quantity, total_price, status) " +
            "VALUES (?, ?, ?, ?, ?, ?, ?) " +
            "USING TTL ?"
        );

        selectOrdersByCustomerStmt = session.prepare(
            "SELECT * FROM ecommerce.orders_by_customer " +
            "WHERE customer_id = ? " +
            "ORDER BY order_date DESC " +
            "LIMIT ?"
        );

        selectSingleOrderStmt = session.prepare(
            "SELECT * FROM ecommerce.orders_by_customer " +
            "WHERE customer_id = ? AND order_date = ? AND order_id = ?"
        );

        deleteOrderStmt = session.prepare(
            "DELETE FROM ecommerce.orders_by_customer " +
            "WHERE customer_id = ? AND order_date = ? AND order_id = ?"
        );
    }

    // CREATE
    public void createOrder(UUID customerId, UUID orderId, String productName,
                            int quantity, double totalPrice, String status, int ttlSeconds) {
        try {
            BoundStatement bound = insertOrderStmt.bind(
                customerId,
                Instant.now(),
                orderId,
                productName,
                quantity,
                java.math.BigDecimal.valueOf(totalPrice),
                status,
                ttlSeconds
            ).setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);

            session.execute(bound);
            System.out.println("Order created: " + orderId);
        } catch (WriteTimeoutException e) {
            System.err.println("Write timeout — received " + e.getReceived()
                + " of " + e.getBlockFor() + " required acknowledgments.");
            // Implement retry logic
        } catch (UnavailableException e) {
            System.err.println("Not enough replicas available: required="
                + e.getRequired() + ", alive=" + e.getAlive());
        }
    }

    // READ — multiple orders
    public List<OrderRecord> getOrdersByCustomer(UUID customerId, int limit) {
        List<OrderRecord> orders = new ArrayList<>();
        try {
            BoundStatement bound = selectOrdersByCustomerStmt.bind(customerId, limit)
                .setConsistencyLevel(ConsistencyLevel.LOCAL_ONE);

            ResultSet rs = session.execute(bound);
            for (Row row : rs) {
                orders.add(new OrderRecord(
                    row.getUuid("customer_id"),
                    row.getInstant("order_date"),
                    row.getUuid("order_id"),
                    row.getString("product_name"),
                    row.getInt("quantity"),
                    row.getBigDecimal("total_price").doubleValue(),
                    row.getString("status")
                ));
            }
        } catch (ReadTimeoutException e) {
            System.err.println("Read timeout — data may be temporarily unavailable.");
        }
        return orders;
    }

    // UPDATE using Lightweight Transaction (LWT) for conditional update
    public boolean updateOrderStatus(UUID customerId, Instant orderDate,
                                      UUID orderId, String expectedStatus, String newStatus) {
        try {
            ResultSet rs = session.execute(
                SimpleStatement.newInstance(
                    "UPDATE ecommerce.orders_by_customer SET status = ? " +
                    "WHERE customer_id = ? AND order_date = ? AND order_id = ? " +
                    "IF status = ?",
                    newStatus, customerId, orderDate, orderId, expectedStatus
                ).setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM)
            );

            boolean applied = rs.one().getBoolean("[applied]");
            if (applied) {
                System.out.println("Order " + orderId + " status updated to " + newStatus);
            } else {
                System.out.println("Conditional update failed — status was not '" + expectedStatus + "'");
            }
            return applied;
        } catch (WriteTimeoutException e) {
            System.err.println("LWT write timeout — the operation may or may not have been applied.");
            return false;
        }
    }

    // DELETE
    public void deleteOrder(UUID customerId, Instant orderDate, UUID orderId) {
        BoundStatement bound = deleteOrderStmt.bind(customerId, orderDate, orderId)
            .setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
        session.execute(bound);
        System.out.println("Order deleted: " + orderId);
    }

    public void close() {
        if (session != null) {
            session.close();
        }
    }

    // Simple record class
    public static class OrderRecord {
        public final UUID customerId;
        public final Instant orderDate;
        public final UUID orderId;
        public final String productName;
        public final int quantity;
        public final double totalPrice;
        public final String status;

        public OrderRecord(UUID customerId, Instant orderDate, UUID orderId,
                           String productName, int quantity, double totalPrice, String status) {
            this.customerId = customerId;
            this.orderDate = orderDate;
            this.orderId = orderId;
            this.productName = productName;
            this.quantity = quantity;
            this.totalPrice = totalPrice;
            this.status = status;
        }

        @Override
        public String toString() {
            return String.format("Order[%s] %s x%d = $%.2f (%s) on %s",
                orderId, productName, quantity, totalPrice, status, orderDate);
        }
    }

    public static void main(String[] args) {
        CassandraOrderService service = new CassandraOrderService(
            "127.0.0.1", 9042, "datacenter1"
        );

        try {
            UUID customerId = UUID.randomUUID();
            UUID orderId1 = UUID.randomUUID();
            UUID orderId2 = UUID.randomUUID();

            // Create orders
            service.createOrder(customerId, orderId1, "Laptop", 1, 999.99, "PENDING", 86400 * 365);
            service.createOrder(customerId, orderId2, "Mouse", 2, 49.98, "PENDING", 86400 * 365);

            // Read orders
            List<OrderRecord> orders = service.getOrdersByCustomer(customerId, 10);
            orders.forEach(System.out::println);

            // Conditional update
            if (!orders.isEmpty()) {
                OrderRecord first = orders.get(0);
                service.updateOrderStatus(
                    customerId, first.orderDate, first.orderId, "PENDING", "SHIPPED"
                );
            }

            // Verify update
            service.getOrdersByCustomer(customerId, 10).forEach(System.out::println);

        } finally {
            service.close();
        }
    }
}

Batch Operations and Async Writes

java
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.*;

import java.net.InetSocketAddress;
import java.time.Instant;
import java.util.UUID;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CompletableFuture;
import java.util.List;
import java.util.ArrayList;

public class CassandraBatchAndAsyncExample {

    private final CqlSession session;

    public CassandraBatchAndAsyncExample(CqlSession session) {
        this.session = session;
    }

    /**
     * Logged batch — ensures atomicity within a SINGLE partition.
     * NEVER use batches across partitions for performance; that's an anti-pattern.
     */
    public void atomicPartitionBatch(UUID customerId) {
        BatchStatement batch = BatchStatement.builder(BatchType.LOGGED)
            .addStatement(SimpleStatement.newInstance(
                "INSERT INTO ecommerce.orders_by_customer " +
                "(customer_id, order_date, order_id, product_name, quantity, total_price, status) " +
                "VALUES (?, ?, ?, ?, ?, ?, ?)",
                customerId, Instant.now(), UUID.randomUUID(),
                "Keyboard", 1, java.math.BigDecimal.valueOf(79.99), "PENDING"
            ))
            .addStatement(SimpleStatement.newInstance(
                "INSERT INTO ecommerce.orders_by_customer " +
                "(customer_id, order_date, order_id, product_name, quantity, total_price, status) " +
                "VALUES (?, ?, ?, ?, ?, ?, ?)",
                customerId, Instant.now(), UUID.randomUUID(),
                "Monitor", 1, java.math.BigDecimal.valueOf(299.99), "PENDING"
            ))
            .build();

        session.execute(batch);
        System.out.println("Batch executed atomically within partition: " + customerId);
    }

    /**
     * Asynchronous writes for high-throughput ingestion.
     */
    public CompletableFuture<Void> asyncBulkInsert(UUID customerId, int count) {
        PreparedStatement ps = session.prepare(
            "INSERT INTO ecommerce.orders_by_customer " +
            "(customer_id, order_date, order_id, product_name, quantity, total_price, status) " +
            "VALUES (?, ?, ?, ?, ?, ?, ?)"
        );

        List<CompletionStage<AsyncResultSet>> futures = new ArrayList<>();

        for (int i = 0; i < count; i++) {
            BoundStatement bound = ps.bind(
                customerId, Instant.now(), UUID.randomUUID(),
                "Product-" + i, 1, java.math.BigDecimal.valueOf(9.99 + i), "PENDING"
            );
            futures.add(session.executeAsync(bound));
        }

        // Wait for all async operations to complete
        CompletableFuture<?>[] futureArray = futures.stream()
            .map(CompletionStage::toCompletableFuture)
            .toArray(CompletableFuture[]::new);

        return CompletableFuture.allOf(futureArray)
            .thenRun(() -> System.out.println("Async bulk insert complete: " + count + " rows"));
    }

    public static void main(String[] args) throws Exception {
        try (CqlSession session = CqlSession.builder()
                .addContactPoint(new InetSocketAddress("127.0.0.1", 9042))
                .withLocalDatacenter("datacenter1")
                .withKeyspace("ecommerce")
                .build()) {

            CassandraBatchAndAsyncExample example = new CassandraBatchAndAsyncExample(session);

            UUID customerId = UUID.randomUUID();
            example.atomicPartitionBatch(customerId);
            example.asyncBulkInsert(customerId, 100).get(); // block until complete
        }
    }
}

Data Modeling: Query-Driven Design

Cassandra data modeling is query-driven — you design tables around the queries your application needs, not around entity relationships.

Denormalization Pattern

Unlike relational databases, Cassandra encourages denormalization — duplicating data across multiple tables optimized for different queries. The application is responsible for keeping these tables in sync.

Anti-Patterns

Understanding what NOT to do in Cassandra is just as important as knowing best practices.

Multi-Datacenter Replication

Cassandra natively supports multi-datacenter replication, making it ideal for global applications.

Compaction Strategies

Cassandra supports different compaction strategies optimized for different workloads:

java
// Size-Tiered Compaction — default, good for write-heavy workloads
session.execute(
    "CREATE TABLE write_heavy_table (" +
    "  id UUID PRIMARY KEY, data TEXT" +
    ") WITH compaction = {" +
    "  'class': 'SizeTieredCompactionStrategy'," +
    "  'min_threshold': 4," +
    "  'max_threshold': 32" +
    "}"
);

// Leveled Compaction — good for read-heavy workloads
session.execute(
    "CREATE TABLE read_heavy_table (" +
    "  id UUID PRIMARY KEY, data TEXT" +
    ") WITH compaction = {" +
    "  'class': 'LeveledCompactionStrategy'," +
    "  'sstable_size_in_mb': 160" +
    "}"
);

// Time-Window Compaction — ideal for time-series data
session.execute(
    "CREATE TABLE time_series_table (" +
    "  sensor_id UUID, ts TIMESTAMP, value DOUBLE," +
    "  PRIMARY KEY ((sensor_id), ts)" +
    ") WITH compaction = {" +
    "  'class': 'TimeWindowCompactionStrategy'," +
    "  'compaction_window_unit': 'DAYS'," +
    "  'compaction_window_size': 1" +
    "}"
);

Best Practices

  1. Design tables around queries: Model your data to satisfy specific query patterns rather than normalizing entities; each query should map to one table.

  2. Keep partitions small: Aim for partitions under 100MB and under 100,000 rows; use compound partition keys with time bucketing for high-cardinality data.

  3. Use LOCAL_QUORUM for strong consistency: In multi-datacenter deployments, LOCAL_QUORUM for both reads and writes provides strong consistency within a datacenter without cross-datacenter latency.

  4. Always use prepared statements: Prepared statements are parsed once and executed many times, reducing server-side parsing overhead and enabling token-aware routing.

  5. Avoid ALLOW FILTERING: This forces Cassandra to scan all partitions; if you need a query that requires filtering, create a new denormalized table instead.

  6. Use TimeUUIDs for time-ordered data: TIMEUUID type provides both ordering and uniqueness, making it ideal for clustering columns in time-series tables.

  7. Implement retry policies: Configure the driver's retry policy appropriately; use exponential backoff for WriteTimeoutException and be cautious with LWT retries.

  8. Monitor partition hotspots: Uneven partition sizes cause load imbalance; use nodetool tablestats and nodetool tablehistograms to detect skewed partitions.

  9. Choose the right compaction strategy: STCS for write-heavy, LCS for read-heavy, TWCS for time-series; wrong choices lead to increased disk usage and read latency.

  10. Use TTL for automatic data expiration: Set TTL on writes for data that has a natural lifespan (logs, sessions, events) to avoid manual cleanup and tombstone accumulation.