Appearance
High-Performance Streaming & MapReduce
Modern data pipelines must handle millions of events per second with low latency, high throughput, and fault tolerance. Streaming and MapReduce are the two dominant paradigms: streaming processes events as they arrive, while MapReduce processes bounded datasets in parallel batches. AWS services like EMR and Kinesis make both patterns operationally manageable at scale. Understanding the trade-offs and the right tool for each workload is central to building performant data systems.
Streaming vs Batch Processing
Choosing between streaming and batch depends on your latency requirements, data volume, and tolerance for complexity.
| Dimension | Batch | Stream |
|---|---|---|
| Latency | Minutes to hours | Milliseconds to seconds |
| Throughput | Very high (bulk read) | High (continuous) |
| Complexity | Lower | Higher (state, ordering) |
| Cost model | Pay per job run | Pay per shard/hour |
| Use cases | ETL, reports, ML training | Fraud detection, alerting, dashboards |
| Fault recovery | Re-run the job | Checkpointing, replay from offset |
MapReduce Programming Model
MapReduce was introduced by Google in 2004 and popularized by Apache Hadoop. It breaks a computation into three phases that can run in parallel across a cluster.
Map Phase
Each mapper reads a portion of the input and emits intermediate (key, value) pairs. Mappers run in parallel on the nodes where data resides, minimising data movement.
Shuffle & Sort Phase
The framework redistributes intermediate pairs by key across the network so all values for a given key land on the same reducer. Keys are sorted within each reducer's partition. This phase is the primary bottleneck and source of network I/O.
Reduce Phase
Each reducer receives a sorted list of values for each key and produces the final output, such as a count, sum, or aggregation.
Example 1 — WordCount MapReduce job (Mapper, Reducer, and Driver):
java
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.util.StringTokenizer;
// ----- Mapper -----
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private static final IntWritable ONE = new IntWritable(1);
private final Text word = new Text();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
StringTokenizer tokenizer = new StringTokenizer(value.toString());
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken().toLowerCase());
context.write(word, ONE);
}
}
}
// ----- Reducer -----
class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private final IntWritable result = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
// ----- Driver -----
class WordCountDriver {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word-count");
job.setJarByClass(WordCountDriver.class);
job.setMapperClass(WordCountMapper.class);
job.setCombinerClass(WordCountReducer.class); // local pre-aggregation
job.setReducerClass(WordCountReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}AWS EMR (Elastic MapReduce)
Amazon EMR manages Hadoop, Spark, and other frameworks on a cluster of EC2 instances, handling provisioning, configuration, and scaling automatically.
- Master node: orchestrates the cluster; runs YARN ResourceManager and HDFS NameNode. Do not use for data storage; loss of the master ends the cluster.
- Core nodes: run both DataNode (HDFS storage) and NodeManager (task execution). Terminating a core node risks data loss.
- Task nodes: run NodeManager only. Ideal for spot instances because they hold no HDFS data and can be removed safely.
Example 2 — Create an EMR cluster with AWS SDK v2:
java
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.emr.EmrClient;
import software.amazon.awssdk.services.emr.model.*;
public class EmrClusterCreate {
public static void main(String[] args) {
EmrClient emr = EmrClient.builder().region(Region.US_EAST_1).build();
RunJobFlowRequest request = RunJobFlowRequest.builder()
.name("my-spark-cluster")
.releaseLabel("emr-6.15.0")
.serviceRole("EMR_DefaultRole")
.jobFlowRole("EMR_EC2_DefaultRole")
.logUri("s3://my-bucket/emr-logs/")
.applications(Application.builder().name("Spark").build(),
Application.builder().name("Hadoop").build())
.instances(JobFlowInstancesConfig.builder()
.ec2KeyName("my-keypair")
.instanceGroups(
// Master
InstanceGroupConfig.builder()
.instanceRole(InstanceRoleType.MASTER)
.instanceType("m5.xlarge")
.instanceCount(1)
.build(),
// Core
InstanceGroupConfig.builder()
.instanceRole(InstanceRoleType.CORE)
.instanceType("m5.2xlarge")
.instanceCount(4)
.build(),
// Task (spot)
InstanceGroupConfig.builder()
.instanceRole(InstanceRoleType.TASK)
.instanceType("m5.2xlarge")
.instanceCount(8)
.market(MarketType.SPOT)
.bidPrice("0.20")
.build()
)
.keepJobFlowAliveWhenNoSteps(false)
.build())
.build();
RunJobFlowResponse response = emr.runJobFlow(request);
System.out.println("Cluster ID: " + response.jobFlowId());
emr.close();
}
}Example 3 — Submit a Spark step to an EMR cluster:
java
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.emr.EmrClient;
import software.amazon.awssdk.services.emr.model.*;
import java.util.List;
public class EmrSparkStep {
public static void main(String[] args) {
EmrClient emr = EmrClient.builder().region(Region.US_EAST_1).build();
String clusterId = "j-XXXXXXXXXXXX";
HadoopJarStepConfig sparkStep = HadoopJarStepConfig.builder()
.jar("command-runner.jar")
.args(List.of(
"spark-submit",
"--deploy-mode", "cluster",
"--class", "com.example.WordCountDriver",
"s3://my-bucket/jars/wordcount.jar",
"s3://my-bucket/input/",
"s3://my-bucket/output/"
))
.build();
AddJobFlowStepsRequest stepRequest = AddJobFlowStepsRequest.builder()
.jobFlowId(clusterId)
.steps(StepConfig.builder()
.name("WordCount-Spark")
.actionOnFailure(ActionOnFailure.CONTINUE)
.hadoopJarStep(sparkStep)
.build())
.build();
AddJobFlowStepsResponse response = emr.addJobFlowSteps(stepRequest);
System.out.println("Step IDs: " + response.stepIds());
emr.close();
}
}AWS Kinesis Family
AWS Kinesis is a suite of managed services for real-time data streaming.
Kinesis Data Streams
A shard is the base unit of capacity: each shard supports 1 MB/s ingest (1,000 records/s) and 2 MB/s read. Data is retained for 24 hours by default, extendable to 365 days. Records within a shard are strictly ordered.
Kinesis Data Firehose
Firehose is a fully managed delivery service. It buffers incoming records and delivers them in batches to S3, Redshift, OpenSearch, or HTTP endpoints. It supports automatic GZIP/Snappy compression and Lambda-based transformation.
Kinesis Data Analytics
Kinesis Data Analytics allows SQL queries or Apache Flink applications to run against a live Kinesis stream. Results can be emitted to another stream, Firehose, or a Lambda function.
Example 4 — Kinesis putRecord and getRecords with AWS SDK v2:
java
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.model.*;
import java.nio.charset.StandardCharsets;
import java.util.List;
public class KinesisDataStreamsDemo {
private static final String STREAM_NAME = "my-event-stream";
private static final KinesisClient client =
KinesisClient.builder().region(Region.US_EAST_1).build();
// --- Producer: put a single record ---
public static void putRecord(String partitionKey, String data) {
PutRecordRequest request = PutRecordRequest.builder()
.streamName(STREAM_NAME)
.partitionKey(partitionKey)
.data(SdkBytes.fromString(data, StandardCharsets.UTF_8))
.build();
PutRecordResponse response = client.putRecord(request);
System.out.println("Sequence: " + response.sequenceNumber()
+ " | Shard: " + response.shardId());
}
// --- Consumer: get records from a shard ---
public static void consumeShard(String shardId) {
// Get the shard iterator at the TRIM_HORIZON (oldest available)
GetShardIteratorRequest iterRequest = GetShardIteratorRequest.builder()
.streamName(STREAM_NAME)
.shardId(shardId)
.shardIteratorType(ShardIteratorType.TRIM_HORIZON)
.build();
String iterator = client.getShardIterator(iterRequest).shardIterator();
while (iterator != null) {
GetRecordsRequest recRequest = GetRecordsRequest.builder()
.shardIterator(iterator)
.limit(100)
.build();
GetRecordsResponse response = client.getRecords(recRequest);
List<Record> records = response.records();
records.forEach(r ->
System.out.println("Data: " + r.data().asUtf8String()
+ " | Seq: " + r.sequenceNumber()));
iterator = response.nextShardIterator();
if (records.isEmpty()) break; // no more records for now
}
}
public static void main(String[] args) {
putRecord("user-123", "{\"event\":\"click\",\"itemId\":42}");
consumeShard("shardId-000000000000");
client.close();
}
}Example 5 — Firehose putRecord and putRecordBatch with AWS SDK v2:
java
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.firehose.FirehoseClient;
import software.amazon.awssdk.services.firehose.model.*;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class FirehoseDemo {
private static final String DELIVERY_STREAM = "my-delivery-stream";
private static final FirehoseClient firehose =
FirehoseClient.builder().region(Region.US_EAST_1).build();
// Single record
public static void putSingle(String json) {
PutRecordRequest req = PutRecordRequest.builder()
.deliveryStreamName(DELIVERY_STREAM)
.record(Record.builder()
.data(SdkBytes.fromString(json + "\n", StandardCharsets.UTF_8))
.build())
.build();
firehose.putRecord(req);
}
// Batch up to 500 records per call
public static void putBatch(List<String> events) {
List<Record> records = events.stream()
.map(e -> Record.builder()
.data(SdkBytes.fromString(e + "\n", StandardCharsets.UTF_8))
.build())
.collect(Collectors.toList());
PutRecordBatchRequest batchReq = PutRecordBatchRequest.builder()
.deliveryStreamName(DELIVERY_STREAM)
.records(records)
.build();
PutRecordBatchResponse response = firehose.putRecordBatch(batchReq);
if (response.failedPutCount() > 0) {
System.err.println("Failed records: " + response.failedPutCount());
// Handle partial failures by retrying failed entries
}
}
public static void main(String[] args) {
putSingle("{\"userId\":\"u1\",\"action\":\"purchase\"}");
List<String> batch = IntStream.range(0, 100)
.mapToObj(i -> "{\"id\":" + i + ",\"val\":\"data-" + i + "\"}")
.collect(Collectors.toList());
putBatch(batch);
firehose.close();
}
}Example 6 — Kinesis Analytics SQL (CREATE STREAM with tumbling window):
sql
-- Define source stream mapping to the input Kinesis stream
CREATE OR REPLACE STREAM "SOURCE_SQL_STREAM_001" (
user_id VARCHAR(64),
event_type VARCHAR(32),
amount DOUBLE,
event_time TIMESTAMP
);
-- Create destination stream for aggregated results
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
window_start TIMESTAMP,
event_type VARCHAR(32),
total_amount DOUBLE,
event_count BIGINT
);
-- Pump aggregated results into the destination stream
-- Tumbling window of 1 minute
CREATE OR REPLACE PUMP "STREAM_PUMP" AS
INSERT INTO "DESTINATION_SQL_STREAM"
SELECT
STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND) AS window_start,
event_type,
SUM(amount) AS total_amount,
COUNT(*) AS event_count
FROM "SOURCE_SQL_STREAM_001"
GROUP BY
event_type,
STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND);Sharding Strategies and Performance Optimization
Example 7 — Shard count calculation utility:
java
import java.math.BigDecimal;
import java.math.RoundingMode;
/**
* Calculates the minimum number of Kinesis shards required for a given workload.
* Each shard supports: 1 MB/s write, 1,000 records/s write, 2 MB/s read.
*/
public class ShardCalculator {
private static final double MAX_WRITE_MB_PER_SHARD = 1.0;
private static final int MAX_WRITE_RPS_PER_SHARD = 1000;
private static final double MAX_READ_MB_PER_SHARD = 2.0;
/**
* @param writeMbPerSecond total expected write throughput (MB/s)
* @param writeRecordsPerSec total expected write records per second
* @param readMbPerSecond total expected read throughput (MB/s, accounting for all consumers)
* @param consumerCount number of standard (non-enhanced-fan-out) consumers
* @return recommended shard count (with 20% headroom)
*/
public static int calculateShards(double writeMbPerSecond,
int writeRecordsPerSec,
double readMbPerSecond,
int consumerCount) {
// Shards required by write throughput
double byWriteMb = writeMbPerSecond / MAX_WRITE_MB_PER_SHARD;
// Shards required by write record rate
double byWriteRps = (double) writeRecordsPerSec / MAX_WRITE_RPS_PER_SHARD;
// Shards required by read throughput (shared across standard consumers)
double byReadMb = (readMbPerSecond * consumerCount) / MAX_READ_MB_PER_SHARD;
double raw = Math.max(byWriteMb, Math.max(byWriteRps, byReadMb));
// Add 20% headroom and round up
int recommended = (int) Math.ceil(raw * 1.2);
System.out.printf("By write MB/s: %.2f | By write RPS: %.2f | By read MB/s: %.2f%n",
byWriteMb, byWriteRps, byReadMb);
System.out.println("Recommended shards (with 20%% headroom): " + recommended);
return recommended;
}
public static void main(String[] args) {
// Example: 3 MB/s writes, 2500 rps, 4 MB/s reads, 2 consumers
calculateShards(3.0, 2500, 4.0, 2);
}
}Example 8 — Hash-based partition key strategy to avoid hot shards:
java
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.HexFormat;
import java.util.UUID;
/**
* Partition key strategies for even shard distribution in Kinesis.
*/
public class PartitionKeyStrategy {
/**
* Strategy 1: Use the natural entity ID as partition key.
* Good when IDs are already uniformly distributed (e.g., UUIDs).
*/
public static String naturalKey(String entityId) {
return entityId; // UUID → naturally distributed
}
/**
* Strategy 2: Hash a high-cardinality attribute.
* Good when the raw key has hot spots (e.g., timestamps, status codes).
*/
public static String hashedKey(String rawKey) throws NoSuchAlgorithmException {
MessageDigest md = MessageDigest.getInstance("MD5");
byte[] hash = md.digest(rawKey.getBytes());
return HexFormat.of().formatHex(hash);
}
/**
* Strategy 3: Random suffix on a composite key.
* Spreads records from the same logical partition across multiple shards —
* use only when strict ordering within a key is NOT required.
*/
public static String randomSuffixKey(String logicalKey, int numShards) {
int suffix = (int) (Math.random() * numShards);
return logicalKey + "-" + suffix;
}
/**
* Strategy 4: Round-robin across shard count.
* Guarantees even distribution; loses ordering guarantees.
*/
private static int roundRobinCounter = 0;
public static synchronized String roundRobinKey(int numShards) {
return "shard-" + (roundRobinCounter++ % numShards);
}
public static void main(String[] args) throws Exception {
System.out.println("Natural: " + naturalKey(UUID.randomUUID().toString()));
System.out.println("Hashed: " + hashedKey("2024-01-01T00:00:00Z"));
System.out.println("Random suffix:" + randomSuffixKey("order-processing", 8));
System.out.println("Round-robin: " + roundRobinKey(8));
}
}Real-World Use Cases
- Clickstream analytics — web events flow into Kinesis Data Streams; Kinesis Analytics computes per-minute page-view counts; Firehose archives raw events to S3 for historical analysis.
- Fraud detection — transaction events are processed by a Flink application on KDA; a sliding window detects unusual spending patterns and invokes a Lambda to block the card in real time.
- Log aggregation and search — application logs are shipped via Firehose to OpenSearch; Firehose applies Lambda transformation to parse and enrich log records before indexing.
- ETL and machine learning — raw event data is accumulated in S3; a nightly EMR Spark job transforms, joins, and feature-engineers the data into a format suitable for SageMaker training.
- IoT sensor processing — millions of device readings per second are ingested via Kinesis Data Streams with a shard per device group; a Flink job on EMR detects anomalies and writes alerts to DynamoDB.
Best Practices
- Calculate shard count with headroom. Use the formula:
max(writeMB/1, writeRPS/1000, readMB_total/2)and add 20% headroom. Re-evaluate after each order-of-magnitude throughput change. - Use
putRecords(batch), notputRecord(single).putRecordssends up to 500 records in one API call, dramatically reducing per-record cost and network round-trips. Handle partial failures by inspectingFailedRecordCountand retrying failed entries. - Enable GZIP compression in Firehose. GZIP typically achieves 70–90% size reduction for JSON payloads, reducing S3 storage costs and downstream Athena query costs.
- Monitor
GetRecords.IteratorAgeMillisecondsin CloudWatch. This metric reports how far behind the consumer is. Alert when it exceeds your SLA latency. A sustained increase indicates consumer under-provisioning. - Handle shard splits and merges. When auto-scaling increases or decreases shard count, parent shards become read-only. Consumers must finish reading parent shards before moving to children. Use the Kinesis Client Library (KCL) to handle this automatically.
- Use Enhanced Fan-Out for low-latency consumers. Standard consumers share 2 MB/s per shard. Enhanced Fan-Out gives each registered consumer a dedicated 2 MB/s pipe with ~70 ms average propagation latency, compared to ~200 ms for polling.
- Choose partition keys carefully. Hot partition keys cause throttling (
ProvisionedThroughputExceededException). When ordering is not required, use hashed or random-suffix keys for uniform distribution. - Right-size EMR task nodes on Spot. Task nodes hold no HDFS data, so Spot interruptions only slow a job — they do not corrupt it. Using Spot for task nodes can reduce cluster cost by 60–80%.
Related Concepts
- Asynchronous Programming — async and non-blocking patterns used within streaming consumers
- Serverless Containers — deploying lightweight consumers without managing EC2 instances