Appearance
Columnar Data Stores
Introduction
Columnar data stores organize information by columns rather than rows, fundamentally changing how data is stored on disk and retrieved during queries. This storage paradigm delivers dramatic performance improvements for analytical workloads — often achieving 10–100x speedups over row-oriented databases — by reading only the columns relevant to a query and enabling superior compression ratios. Understanding columnar storage is essential for anyone designing data warehouses, analytics pipelines, or working with technologies like Apache Parquet, Amazon Redshift, Apache Cassandra (wide-column), or ClickHouse.
Core Concepts
Row-Oriented vs. Column-Oriented Storage
In a traditional row-oriented database (PostgreSQL, MySQL), each row is stored contiguously on disk. When you SELECT name, salary FROM employees, the engine must read entire rows — including every unused column — just to extract two fields.
In a columnar store, each column is stored contiguously. The same query reads only the name and salary columns, skipping everything else entirely.
Why Columnar Storage is Faster for Analytics
Analytical queries typically scan millions of rows but only a few columns. Columnar stores exploit three key advantages:
- I/O Reduction: Only relevant columns are read from disk
- Compression: Homogeneous data types within a column compress far better (e.g., run-length encoding on repeated department names)
- Vectorized Processing: CPUs can apply SIMD operations across arrays of same-typed values
Compression Techniques in Columnar Stores
Because a column contains homogeneous data, specialized encoding techniques become applicable:
| Technique | Best For | Example |
|---|---|---|
| Run-Length Encoding (RLE) | Low-cardinality sorted columns | Eng, Eng, Eng → (Eng, 3) |
| Dictionary Encoding | Repeated string values | Map strings to integers |
| Delta Encoding | Monotonically increasing values | Timestamps, auto-increment IDs |
| Bit-Packing | Small integer ranges | Store values in minimal bits |
| Null Suppression | Sparse columns | Skip nulls with bitmap |
Types of Columnar Data Stores
The columnar paradigm manifests in several distinct categories:
Row Groups and Column Chunks
Modern columnar formats like Parquet don't store one giant array per column. Instead, they split data into row groups (horizontal partitions), and within each row group, each column is stored as a column chunk. This hybrid approach enables both columnar efficiency and row-level locality.
Implementation: Writing and Reading Parquet Files in Java
Apache Parquet is the most widely used columnar file format. Let's write and read Parquet files using the Java API.
Writing Parquet Files
java
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.hadoop.fs.Path;
import java.io.IOException;
import java.util.List;
public class ParquetWriterExample {
private static final String SCHEMA_JSON = """
{
"type": "record",
"name": "Employee",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"},
{"name": "department", "type": "string"},
{"name": "salary", "type": "double"},
{"name": "hire_year", "type": "int"}
]
}
""";
public static void main(String[] args) {
Schema schema = new Schema.Parser().parse(SCHEMA_JSON);
Path outputPath = new Path("employees.parquet");
try (ParquetWriter<GenericRecord> writer = AvroParquetWriter
.<GenericRecord>builder(outputPath)
.withSchema(schema)
.withCompressionCodec(CompressionCodecName.SNAPPY)
.withRowGroupSize(128 * 1024 * 1024) // 128 MB row groups
.withPageSize(1024 * 1024) // 1 MB pages
.build()) {
List<Object[]> employees = List.of(
new Object[]{1, "Alice", "Engineering", 120000.0, 2019},
new Object[]{2, "Bob", "Sales", 95000.0, 2020},
new Object[]{3, "Carol", "Engineering", 130000.0, 2018},
new Object[]{4, "David", "Marketing", 88000.0, 2021},
new Object[]{5, "Eve", "Engineering", 145000.0, 2017}
);
for (Object[] emp : employees) {
GenericRecord record = new GenericData.Record(schema);
record.put("id", emp[0]);
record.put("name", emp[1]);
record.put("department", emp[2]);
record.put("salary", emp[3]);
record.put("hire_year", emp[4]);
writer.write(record);
}
System.out.println("Successfully wrote " + employees.size() + " records to Parquet");
} catch (IOException e) {
System.err.println("Failed to write Parquet file: " + e.getMessage());
e.printStackTrace();
}
}
}Reading Parquet Files with Column Projection
The key columnar advantage: reading only the columns you need.
java
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import java.io.IOException;
public class ParquetReaderExample {
// Projection schema — only read 'name' and 'salary' columns
private static final String PROJECTION_SCHEMA = """
{
"type": "record",
"name": "Employee",
"fields": [
{"name": "name", "type": "string"},
{"name": "salary", "type": "double"}
]
}
""";
public static void main(String[] args) {
Path inputPath = new Path("employees.parquet");
Schema projection = new Schema.Parser().parse(PROJECTION_SCHEMA);
Configuration conf = new Configuration();
try {
HadoopInputFile inputFile = HadoopInputFile.fromPath(inputPath, conf);
try (ParquetReader<GenericRecord> reader = AvroParquetReader
.<GenericRecord>builder(inputFile)
.withConf(conf)
.build()) {
GenericRecord record;
double totalSalary = 0;
int count = 0;
while ((record = reader.read()) != null) {
String name = record.get("name").toString();
double salary = (double) record.get("salary");
System.out.printf("Employee: %-10s Salary: $%,.2f%n", name, salary);
totalSalary += salary;
count++;
}
if (count > 0) {
System.out.printf("%nAverage salary: $%,.2f (from %d employees)%n",
totalSalary / count, count);
}
}
} catch (IOException e) {
System.err.println("Failed to read Parquet file: " + e.getMessage());
e.printStackTrace();
}
}
}Implementation: Querying Amazon Redshift (Columnar Data Warehouse)
Amazon Redshift is a fully managed columnar data warehouse. Here's how to interact with it from Java using the AWS SDK:
java
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.redshiftdata.RedshiftDataClient;
import software.amazon.awssdk.services.redshiftdata.model.*;
import java.util.List;
public class RedshiftColumnarQueryExample {
private static final String CLUSTER_ID = "my-analytics-cluster";
private static final String DATABASE = "analytics_db";
private static final String DB_USER = "analyst";
public static void main(String[] args) throws InterruptedException {
RedshiftDataClient client = RedshiftDataClient.builder()
.region(Region.US_EAST_1)
.build();
try {
// Create a columnar table with distribution and sort keys
executeStatement(client, """
CREATE TABLE IF NOT EXISTS sales_facts (
sale_id BIGINT IDENTITY(1,1),
sale_date DATE SORTKEY,
product_id INTEGER NOT NULL,
customer_id INTEGER NOT NULL,
region VARCHAR(50) ENCODE ZSTD,
quantity INTEGER ENCODE AZ64,
unit_price DECIMAL(10,2) ENCODE AZ64,
total_amount DECIMAL(12,2) ENCODE AZ64
)
DISTSTYLE KEY
DISTKEY (product_id);
""");
// Analytical query — Redshift reads only the 3 columns needed
String analyticalQuery = """
SELECT
region,
DATE_TRUNC('month', sale_date) AS month,
SUM(total_amount) AS revenue,
COUNT(*) AS transaction_count,
AVG(unit_price) AS avg_price
FROM sales_facts
WHERE sale_date >= '2024-01-01'
GROUP BY region, DATE_TRUNC('month', sale_date)
ORDER BY revenue DESC
LIMIT 20;
""";
String queryId = submitQuery(client, analyticalQuery);
waitForQuery(client, queryId);
printResults(client, queryId);
} catch (RedshiftDataException e) {
System.err.println("Redshift error: " + e.getMessage());
} finally {
client.close();
}
}
private static String submitQuery(RedshiftDataClient client, String sql) {
ExecuteStatementResponse response = client.executeStatement(
ExecuteStatementRequest.builder()
.clusterIdentifier(CLUSTER_ID)
.database(DATABASE)
.dbUser(DB_USER)
.sql(sql)
.build());
System.out.println("Query submitted: " + response.id());
return response.id();
}
private static void executeStatement(RedshiftDataClient client, String sql)
throws InterruptedException {
String id = submitQuery(client, sql);
waitForQuery(client, id);
}
private static void waitForQuery(RedshiftDataClient client, String queryId)
throws InterruptedException {
DescribeStatementResponse desc;
do {
Thread.sleep(1000);
desc = client.describeStatement(
DescribeStatementRequest.builder().id(queryId).build());
System.out.println("Status: " + desc.statusAsString());
} while (desc.status() == StatusString.STARTED ||
desc.status() == StatusString.SUBMITTED ||
desc.status() == StatusString.PICKED);
if (desc.status() == StatusString.FAILED) {
throw new RuntimeException("Query failed: " + desc.error());
}
}
private static void printResults(RedshiftDataClient client, String queryId) {
GetStatementResultResponse result = client.getStatementResult(
GetStatementResultRequest.builder().id(queryId).build());
// Print column headers
List<ColumnMetadata> columns = result.columnMetadata();
columns.forEach(c -> System.out.printf("%-20s", c.name()));
System.out.println();
// Print rows
for (List<Field> row : result.records()) {
for (Field field : row) {
String value = field.stringValue() != null ?
field.stringValue() : "NULL";
System.out.printf("%-20s", value);
}
System.out.println();
}
}
}Implementation: Simple In-Memory Columnar Store
To truly understand columnar storage, let's build a minimal columnar store in Java:
java
import java.util.*;
import java.util.stream.*;
public class SimpleColumnarStore {
private final Map<String, List<Object>> columns = new LinkedHashMap<>();
private int rowCount = 0;
public void addColumn(String name) {
columns.put(name, new ArrayList<>());
}
public void insertRow(Map<String, Object> row) {
for (Map.Entry<String, List<Object>> col : columns.entrySet()) {
col.getValue().add(row.getOrDefault(col.getKey(), null));
}
rowCount++;
}
/**
* Column projection — only reads specified columns.
* This is the core columnar advantage.
*/
public List<Map<String, Object>> select(List<String> projectedColumns,
String filterColumn,
Object filterValue) {
// Step 1: Get the filter column and find matching row indices
List<Object> filterCol = columns.get(filterColumn);
if (filterCol == null) {
throw new IllegalArgumentException("Unknown column: " + filterColumn);
}
List<Integer> matchingIndices = IntStream.range(0, rowCount)
.filter(i -> filterValue.equals(filterCol.get(i)))
.boxed()
.collect(Collectors.toList());
// Step 2: Project only requested columns for matching rows
List<Map<String, Object>> results = new ArrayList<>();
for (int idx : matchingIndices) {
Map<String, Object> row = new LinkedHashMap<>();
for (String col : projectedColumns) {
row.put(col, columns.get(col).get(idx));
}
results.add(row);
}
return results;
}
/** Columnar aggregation — operates on a single column array */
public double average(String columnName, String filterColumn, Object filterValue) {
List<Object> filterCol = columns.get(filterColumn);
List<Object> valueCol = columns.get(columnName);
return IntStream.range(0, rowCount)
.filter(i -> filterValue.equals(filterCol.get(i)))
.mapToDouble(i -> ((Number) valueCol.get(i)).doubleValue())
.average()
.orElse(0.0);
}
/** Show compression opportunity — count distinct values per column */
public Map<String, Long> columnCardinality() {
Map<String, Long> cardinality = new LinkedHashMap<>();
for (Map.Entry<String, List<Object>> entry : columns.entrySet()) {
long distinct = entry.getValue().stream().distinct().count();
cardinality.put(entry.getKey(), distinct);
}
return cardinality;
}
public static void main(String[] args) {
SimpleColumnarStore store = new SimpleColumnarStore();
store.addColumn("id");
store.addColumn("name");
store.addColumn("department");
store.addColumn("salary");
store.insertRow(Map.of("id", 1, "name", "Alice", "department", "Engineering", "salary", 120000.0));
store.insertRow(Map.of("id", 2, "name", "Bob", "department", "Sales", "salary", 95000.0));
store.insertRow(Map.of("id", 3, "name", "Carol", "department", "Engineering", "salary", 130000.0));
store.insertRow(Map.of("id", 4, "name", "David", "department", "Sales", "salary", 88000.0));
store.insertRow(Map.of("id", 5, "name", "Eve", "department", "Engineering", "salary", 145000.0));
// Columnar projection: only read 'name' and 'salary' for Engineering
System.out.println("=== Engineering Employees (name, salary only) ===");
List<Map<String, Object>> results = store.select(
List.of("name", "salary"), "department", "Engineering");
results.forEach(r -> System.out.printf(" %-10s $%,.2f%n", r.get("name"), r.get("salary")));
// Columnar aggregation
double avgSalary = store.average("salary", "department", "Engineering");
System.out.printf("%nAvg Engineering salary: $%,.2f%n", avgSalary);
// Compression analysis
System.out.println("\n=== Column Cardinality (compression opportunity) ===");
store.columnCardinality().forEach((col, card) ->
System.out.printf(" %-15s distinct values: %d (out of %d rows)%n",
col, card, 5));
}
}Query Execution in Columnar Stores
Understanding how a columnar engine executes a query reveals why it outperforms row stores for analytics:
Predicate Pushdown and Zone Maps
Columnar stores maintain statistics (min, max, count, null count) per column chunk. This enables predicate pushdown — skipping entire row groups that cannot contain matching data.
When to Use (and Not Use) Columnar Stores
Data Warehouse Architecture with Columnar Storage
Performance Comparison
To illustrate the impact, here is a typical benchmark comparison for a 1-billion-row table with 50 columns:
| Operation | Row Store | Columnar Store | Improvement |
|---|---|---|---|
SELECT * (full scan) | 120s | 140s | Slightly slower |
SELECT col1, col2 WHERE col3 = X | 120s | 3s | 40x faster |
SELECT AVG(col1) GROUP BY col2 | 95s | 2s | 47x faster |
| Single row insert | 0.5ms | 5ms | 10x slower |
| Single row lookup by PK | 1ms | 50ms | 50x slower |
| Compression ratio | 3:1 | 10:1 | 3x better |
Best Practices
Choose sort keys wisely: Place your most-filtered columns as sort keys to maximize zone map effectiveness and enable row group skipping.
Right-size row groups: Use 128MB–256MB row groups for Parquet — too small loses compression benefits, too large wastes memory during reads.
Use appropriate compression codecs: Snappy for speed-sensitive workloads, ZSTD for storage-sensitive ones. Match codec to your latency vs. storage tradeoff.
Apply column projection religiously: Never
SELECT *in columnar stores — it negates the entire columnar advantage by forcing reads of all columns.Partition by time: Most analytical queries filter by date range. Partitioning by date allows entire partitions to be skipped.
Prefer batch inserts: Columnar stores are optimized for large batch writes. Avoid single-row inserts — buffer data and write in bulk.
Co-locate frequently joined columns: Use distribution keys (Redshift) or clustering (BigQuery) so joined tables are co-located on the same node.
Monitor column cardinality: Low-cardinality columns compress dramatically better. Consider dictionary encoding for columns with fewer than ~60,000 distinct values.
Use predicate pushdown: Structure queries so filters appear in
WHEREclauses that can be pushed down to the storage layer, leveraging zone map statistics.Denormalize for analytics: Unlike OLTP, columnar stores perform best on wide, denormalized tables. Reduce joins by pre-joining during ETL.
Related Concepts
- Eventual Consistency: Distributed columnar stores like Cassandra rely on eventual consistency models.
- High-Performance Streaming Operations: Streaming data into columnar stores for real-time analytics.
- Serverless and Container Workloads: Serverless query engines (Athena, BigQuery) that query columnar files directly.