Skip to content

Columnar Data Stores

Introduction

Columnar data stores organize information by columns rather than rows, fundamentally changing how data is stored on disk and retrieved during queries. This storage paradigm delivers dramatic performance improvements for analytical workloads — often achieving 10–100x speedups over row-oriented databases — by reading only the columns relevant to a query and enabling superior compression ratios. Understanding columnar storage is essential for anyone designing data warehouses, analytics pipelines, or working with technologies like Apache Parquet, Amazon Redshift, Apache Cassandra (wide-column), or ClickHouse.

Core Concepts

Row-Oriented vs. Column-Oriented Storage

In a traditional row-oriented database (PostgreSQL, MySQL), each row is stored contiguously on disk. When you SELECT name, salary FROM employees, the engine must read entire rows — including every unused column — just to extract two fields.

In a columnar store, each column is stored contiguously. The same query reads only the name and salary columns, skipping everything else entirely.

Why Columnar Storage is Faster for Analytics

Analytical queries typically scan millions of rows but only a few columns. Columnar stores exploit three key advantages:

  1. I/O Reduction: Only relevant columns are read from disk
  2. Compression: Homogeneous data types within a column compress far better (e.g., run-length encoding on repeated department names)
  3. Vectorized Processing: CPUs can apply SIMD operations across arrays of same-typed values

Compression Techniques in Columnar Stores

Because a column contains homogeneous data, specialized encoding techniques become applicable:

TechniqueBest ForExample
Run-Length Encoding (RLE)Low-cardinality sorted columnsEng, Eng, Eng(Eng, 3)
Dictionary EncodingRepeated string valuesMap strings to integers
Delta EncodingMonotonically increasing valuesTimestamps, auto-increment IDs
Bit-PackingSmall integer rangesStore values in minimal bits
Null SuppressionSparse columnsSkip nulls with bitmap

Types of Columnar Data Stores

The columnar paradigm manifests in several distinct categories:

Row Groups and Column Chunks

Modern columnar formats like Parquet don't store one giant array per column. Instead, they split data into row groups (horizontal partitions), and within each row group, each column is stored as a column chunk. This hybrid approach enables both columnar efficiency and row-level locality.

Implementation: Writing and Reading Parquet Files in Java

Apache Parquet is the most widely used columnar file format. Let's write and read Parquet files using the Java API.

Writing Parquet Files

java
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.hadoop.fs.Path;

import java.io.IOException;
import java.util.List;

public class ParquetWriterExample {

    private static final String SCHEMA_JSON = """
        {
            "type": "record",
            "name": "Employee",
            "fields": [
                {"name": "id", "type": "int"},
                {"name": "name", "type": "string"},
                {"name": "department", "type": "string"},
                {"name": "salary", "type": "double"},
                {"name": "hire_year", "type": "int"}
            ]
        }
        """;

    public static void main(String[] args) {
        Schema schema = new Schema.Parser().parse(SCHEMA_JSON);
        Path outputPath = new Path("employees.parquet");

        try (ParquetWriter<GenericRecord> writer = AvroParquetWriter
                .<GenericRecord>builder(outputPath)
                .withSchema(schema)
                .withCompressionCodec(CompressionCodecName.SNAPPY)
                .withRowGroupSize(128 * 1024 * 1024) // 128 MB row groups
                .withPageSize(1024 * 1024)            // 1 MB pages
                .build()) {

            List<Object[]> employees = List.of(
                new Object[]{1, "Alice",   "Engineering", 120000.0, 2019},
                new Object[]{2, "Bob",     "Sales",       95000.0,  2020},
                new Object[]{3, "Carol",   "Engineering", 130000.0, 2018},
                new Object[]{4, "David",   "Marketing",   88000.0,  2021},
                new Object[]{5, "Eve",     "Engineering", 145000.0, 2017}
            );

            for (Object[] emp : employees) {
                GenericRecord record = new GenericData.Record(schema);
                record.put("id", emp[0]);
                record.put("name", emp[1]);
                record.put("department", emp[2]);
                record.put("salary", emp[3]);
                record.put("hire_year", emp[4]);
                writer.write(record);
            }

            System.out.println("Successfully wrote " + employees.size() + " records to Parquet");

        } catch (IOException e) {
            System.err.println("Failed to write Parquet file: " + e.getMessage());
            e.printStackTrace();
        }
    }
}

Reading Parquet Files with Column Projection

The key columnar advantage: reading only the columns you need.

java
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;

import java.io.IOException;

public class ParquetReaderExample {

    // Projection schema — only read 'name' and 'salary' columns
    private static final String PROJECTION_SCHEMA = """
        {
            "type": "record",
            "name": "Employee",
            "fields": [
                {"name": "name", "type": "string"},
                {"name": "salary", "type": "double"}
            ]
        }
        """;

    public static void main(String[] args) {
        Path inputPath = new Path("employees.parquet");
        Schema projection = new Schema.Parser().parse(PROJECTION_SCHEMA);
        Configuration conf = new Configuration();

        try {
            HadoopInputFile inputFile = HadoopInputFile.fromPath(inputPath, conf);

            try (ParquetReader<GenericRecord> reader = AvroParquetReader
                    .<GenericRecord>builder(inputFile)
                    .withConf(conf)
                    .build()) {

                GenericRecord record;
                double totalSalary = 0;
                int count = 0;

                while ((record = reader.read()) != null) {
                    String name = record.get("name").toString();
                    double salary = (double) record.get("salary");
                    System.out.printf("Employee: %-10s Salary: $%,.2f%n", name, salary);
                    totalSalary += salary;
                    count++;
                }

                if (count > 0) {
                    System.out.printf("%nAverage salary: $%,.2f (from %d employees)%n",
                            totalSalary / count, count);
                }
            }

        } catch (IOException e) {
            System.err.println("Failed to read Parquet file: " + e.getMessage());
            e.printStackTrace();
        }
    }
}

Implementation: Querying Amazon Redshift (Columnar Data Warehouse)

Amazon Redshift is a fully managed columnar data warehouse. Here's how to interact with it from Java using the AWS SDK:

java
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.redshiftdata.RedshiftDataClient;
import software.amazon.awssdk.services.redshiftdata.model.*;

import java.util.List;

public class RedshiftColumnarQueryExample {

    private static final String CLUSTER_ID = "my-analytics-cluster";
    private static final String DATABASE = "analytics_db";
    private static final String DB_USER = "analyst";

    public static void main(String[] args) throws InterruptedException {
        RedshiftDataClient client = RedshiftDataClient.builder()
                .region(Region.US_EAST_1)
                .build();

        try {
            // Create a columnar table with distribution and sort keys
            executeStatement(client, """
                CREATE TABLE IF NOT EXISTS sales_facts (
                    sale_id        BIGINT IDENTITY(1,1),
                    sale_date      DATE        SORTKEY,
                    product_id     INTEGER     NOT NULL,
                    customer_id    INTEGER     NOT NULL,
                    region         VARCHAR(50) ENCODE ZSTD,
                    quantity       INTEGER     ENCODE AZ64,
                    unit_price     DECIMAL(10,2) ENCODE AZ64,
                    total_amount   DECIMAL(12,2) ENCODE AZ64
                )
                DISTSTYLE KEY
                DISTKEY (product_id);
                """);

            // Analytical query — Redshift reads only the 3 columns needed
            String analyticalQuery = """
                SELECT
                    region,
                    DATE_TRUNC('month', sale_date) AS month,
                    SUM(total_amount) AS revenue,
                    COUNT(*) AS transaction_count,
                    AVG(unit_price) AS avg_price
                FROM sales_facts
                WHERE sale_date >= '2024-01-01'
                GROUP BY region, DATE_TRUNC('month', sale_date)
                ORDER BY revenue DESC
                LIMIT 20;
                """;

            String queryId = submitQuery(client, analyticalQuery);
            waitForQuery(client, queryId);
            printResults(client, queryId);

        } catch (RedshiftDataException e) {
            System.err.println("Redshift error: " + e.getMessage());
        } finally {
            client.close();
        }
    }

    private static String submitQuery(RedshiftDataClient client, String sql) {
        ExecuteStatementResponse response = client.executeStatement(
                ExecuteStatementRequest.builder()
                        .clusterIdentifier(CLUSTER_ID)
                        .database(DATABASE)
                        .dbUser(DB_USER)
                        .sql(sql)
                        .build());
        System.out.println("Query submitted: " + response.id());
        return response.id();
    }

    private static void executeStatement(RedshiftDataClient client, String sql) 
            throws InterruptedException {
        String id = submitQuery(client, sql);
        waitForQuery(client, id);
    }

    private static void waitForQuery(RedshiftDataClient client, String queryId) 
            throws InterruptedException {
        DescribeStatementResponse desc;
        do {
            Thread.sleep(1000);
            desc = client.describeStatement(
                    DescribeStatementRequest.builder().id(queryId).build());
            System.out.println("Status: " + desc.statusAsString());
        } while (desc.status() == StatusString.STARTED || 
                 desc.status() == StatusString.SUBMITTED ||
                 desc.status() == StatusString.PICKED);

        if (desc.status() == StatusString.FAILED) {
            throw new RuntimeException("Query failed: " + desc.error());
        }
    }

    private static void printResults(RedshiftDataClient client, String queryId) {
        GetStatementResultResponse result = client.getStatementResult(
                GetStatementResultRequest.builder().id(queryId).build());

        // Print column headers
        List<ColumnMetadata> columns = result.columnMetadata();
        columns.forEach(c -> System.out.printf("%-20s", c.name()));
        System.out.println();

        // Print rows
        for (List<Field> row : result.records()) {
            for (Field field : row) {
                String value = field.stringValue() != null ? 
                        field.stringValue() : "NULL";
                System.out.printf("%-20s", value);
            }
            System.out.println();
        }
    }
}

Implementation: Simple In-Memory Columnar Store

To truly understand columnar storage, let's build a minimal columnar store in Java:

java
import java.util.*;
import java.util.stream.*;

public class SimpleColumnarStore {

    private final Map<String, List<Object>> columns = new LinkedHashMap<>();
    private int rowCount = 0;

    public void addColumn(String name) {
        columns.put(name, new ArrayList<>());
    }

    public void insertRow(Map<String, Object> row) {
        for (Map.Entry<String, List<Object>> col : columns.entrySet()) {
            col.getValue().add(row.getOrDefault(col.getKey(), null));
        }
        rowCount++;
    }

    /**
     * Column projection — only reads specified columns.
     * This is the core columnar advantage.
     */
    public List<Map<String, Object>> select(List<String> projectedColumns,
                                             String filterColumn,
                                             Object filterValue) {
        // Step 1: Get the filter column and find matching row indices
        List<Object> filterCol = columns.get(filterColumn);
        if (filterCol == null) {
            throw new IllegalArgumentException("Unknown column: " + filterColumn);
        }

        List<Integer> matchingIndices = IntStream.range(0, rowCount)
                .filter(i -> filterValue.equals(filterCol.get(i)))
                .boxed()
                .collect(Collectors.toList());

        // Step 2: Project only requested columns for matching rows
        List<Map<String, Object>> results = new ArrayList<>();
        for (int idx : matchingIndices) {
            Map<String, Object> row = new LinkedHashMap<>();
            for (String col : projectedColumns) {
                row.put(col, columns.get(col).get(idx));
            }
            results.add(row);
        }
        return results;
    }

    /** Columnar aggregation — operates on a single column array */
    public double average(String columnName, String filterColumn, Object filterValue) {
        List<Object> filterCol = columns.get(filterColumn);
        List<Object> valueCol = columns.get(columnName);

        return IntStream.range(0, rowCount)
                .filter(i -> filterValue.equals(filterCol.get(i)))
                .mapToDouble(i -> ((Number) valueCol.get(i)).doubleValue())
                .average()
                .orElse(0.0);
    }

    /** Show compression opportunity — count distinct values per column */
    public Map<String, Long> columnCardinality() {
        Map<String, Long> cardinality = new LinkedHashMap<>();
        for (Map.Entry<String, List<Object>> entry : columns.entrySet()) {
            long distinct = entry.getValue().stream().distinct().count();
            cardinality.put(entry.getKey(), distinct);
        }
        return cardinality;
    }

    public static void main(String[] args) {
        SimpleColumnarStore store = new SimpleColumnarStore();
        store.addColumn("id");
        store.addColumn("name");
        store.addColumn("department");
        store.addColumn("salary");

        store.insertRow(Map.of("id", 1, "name", "Alice",  "department", "Engineering", "salary", 120000.0));
        store.insertRow(Map.of("id", 2, "name", "Bob",    "department", "Sales",       "salary", 95000.0));
        store.insertRow(Map.of("id", 3, "name", "Carol",  "department", "Engineering", "salary", 130000.0));
        store.insertRow(Map.of("id", 4, "name", "David",  "department", "Sales",       "salary", 88000.0));
        store.insertRow(Map.of("id", 5, "name", "Eve",    "department", "Engineering", "salary", 145000.0));

        // Columnar projection: only read 'name' and 'salary' for Engineering
        System.out.println("=== Engineering Employees (name, salary only) ===");
        List<Map<String, Object>> results = store.select(
                List.of("name", "salary"), "department", "Engineering");
        results.forEach(r -> System.out.printf("  %-10s $%,.2f%n", r.get("name"), r.get("salary")));

        // Columnar aggregation
        double avgSalary = store.average("salary", "department", "Engineering");
        System.out.printf("%nAvg Engineering salary: $%,.2f%n", avgSalary);

        // Compression analysis
        System.out.println("\n=== Column Cardinality (compression opportunity) ===");
        store.columnCardinality().forEach((col, card) ->
                System.out.printf("  %-15s distinct values: %d (out of %d rows)%n",
                        col, card, 5));
    }
}

Query Execution in Columnar Stores

Understanding how a columnar engine executes a query reveals why it outperforms row stores for analytics:

Predicate Pushdown and Zone Maps

Columnar stores maintain statistics (min, max, count, null count) per column chunk. This enables predicate pushdown — skipping entire row groups that cannot contain matching data.

When to Use (and Not Use) Columnar Stores

Data Warehouse Architecture with Columnar Storage

Performance Comparison

To illustrate the impact, here is a typical benchmark comparison for a 1-billion-row table with 50 columns:

OperationRow StoreColumnar StoreImprovement
SELECT * (full scan)120s140sSlightly slower
SELECT col1, col2 WHERE col3 = X120s3s40x faster
SELECT AVG(col1) GROUP BY col295s2s47x faster
Single row insert0.5ms5ms10x slower
Single row lookup by PK1ms50ms50x slower
Compression ratio3:110:13x better

Best Practices

  1. Choose sort keys wisely: Place your most-filtered columns as sort keys to maximize zone map effectiveness and enable row group skipping.

  2. Right-size row groups: Use 128MB–256MB row groups for Parquet — too small loses compression benefits, too large wastes memory during reads.

  3. Use appropriate compression codecs: Snappy for speed-sensitive workloads, ZSTD for storage-sensitive ones. Match codec to your latency vs. storage tradeoff.

  4. Apply column projection religiously: Never SELECT * in columnar stores — it negates the entire columnar advantage by forcing reads of all columns.

  5. Partition by time: Most analytical queries filter by date range. Partitioning by date allows entire partitions to be skipped.

  6. Prefer batch inserts: Columnar stores are optimized for large batch writes. Avoid single-row inserts — buffer data and write in bulk.

  7. Co-locate frequently joined columns: Use distribution keys (Redshift) or clustering (BigQuery) so joined tables are co-located on the same node.

  8. Monitor column cardinality: Low-cardinality columns compress dramatically better. Consider dictionary encoding for columns with fewer than ~60,000 distinct values.

  9. Use predicate pushdown: Structure queries so filters appear in WHERE clauses that can be pushed down to the storage layer, leveraging zone map statistics.

  10. Denormalize for analytics: Unlike OLTP, columnar stores perform best on wide, denormalized tables. Reduce joins by pre-joining during ETL.