tl;dr - Kafka configs you should consider using #
Broker/Topic Configs #
compression.type=producer(keep this default)
- create topics with a partition replication factor of
Producer Configs #
0, Kafka Streams uses
batch.size=972800- ~95% of topic’s
buffer.memory=67108864- increase for high-throughput apps (default:
- if order matters
- add a
keyto all of your messages
- add a
There are caveats for all of these, read on for further discussion.
Who is this post intended for? #
This is the information about understanding Kafka partitioning and replication that I’ve been trying to search for the last couple of years. I finally stopped searching and read the source code.
This post assumes quite a bit of familiarity with how Kafka works at the 10k foot level. It dives into the details and describes which config settings you should be looking at when tuning your system.
How are Kafka Topics structured? #
Kafka topics hold a series of ordered events (also called records). The
mango record is the oldest and
the newest. Any new records would be added after
This conceptual model breaks down when you learn more about Kafka. In reality, Kafka topics have a number of components to increase the reliability and throughput of each topic. These components also increase complexity and are worth learning more about if you want to be able to reason about how your data will flow through the system.
Topic Partitions #
A topic is split into one or more zero-based partitions, with each partition having approximately the same fraction of records on it.
It is the Kafka producer’s job to create new records and decide which partition a record should go to. It uses a partitioning strategy to decide which partition a record should be sent to.
The default partitioner uses this algorithm:
- if the user assigned an explicit partition, use it
- else if the record has a key, use the key’s hash to determine the partition
- else add the record to a batch and send it to a random partition
Default Partitioner Algorithm Details #
1. If the user assigned an explicit partition, use it #
If the producer has manually assigned a record to a partition, it will honor that choice. If explicit partitions are used, it is the producer’s responsibility to ensure that records are balanced across the partitions.
This is useful when you want to partition your records using something other than just the key.
Kafka uses this strategy internally on its
that tracks the state all consumers’
group.ids. Messages are keyed on
group.id + topic name + partition, but are
partitioned on just the
This allows new consumers to subscribe to a single partition that will hold all updates for their
Example command to see data on the
kafka-console-consumer --bootstrap-server localhost:9092 --topic __consumer_offsets \ --formatter 'kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter' \ --partition 0 --from-beginning
Here are two example output lines for from
0. It shows two records
my-group-id-14 group id on the
fruit-prices topic. One for partition
1 with a
commit value at
offset=5497, and another for partition
0 with a value of
[my-group-id-14,fruit-prices,1]::OffsetAndMetadata(offset=5497, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1619425537926, expireTimestamp=None) [my-group-id-14,fruit-prices,0]::OffsetAndMetadata(offset=3550, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1619425553694, expireTimestamp=None)
2. else if the record has a key, use the key’s hash to determine the partition #
If the record hasn’t been assigned an explicit partition, but it does have a key, then the partition is determined by
murmur2 32-bit hash of the key
turning that into a “positive value” with some clever bit twiddling
and then modulus the number of partitions on the topic1.
positiveValue(murmur2_32("grape")) % 3 -> partition 0 positiveValue(murmur2_32("lime")) % 3 -> partition 0 positiveValue(murmur2_32("banana")) % 3 -> partition 1 positiveValue(murmur2_32("apple")) % 3 -> partition 1 positiveValue(murmur2_32("mango")) % 3 -> partition 2 positiveValue(murmur2_32("cherry")) % 3 -> partition 2
Each record is stored with an
offset number. An
offset is a monotonically increasing sequence number that starts
0. A partition will never change the offset assigned to a record and will never assign a different record to
Example: The record with the key
"lime" hashes to partition
0. It was assigned an offset of
1 because it is
the second record on that partition.
key to a record will ensure that all messages with the same key will be sent to the same partition. This
is useful for consumers who only care about a subset of the data. It also ensures that there aren’t race conditions
when updating a value or, if you’re using a compacted topic, that the latest value is preserved.
3. else add the record to a batch and send it to a random partition #
If the record wasn’t assigned an explicit partition and it doesn’t have a key, it will be added to a batch of records that will be sent to a random partition.
After each record batch is sent, a new partition will be picked for the next batch (assuming there is more than one partition for the topic).
Producer Record Batching #
Once the partition for a record has been determined, the producer will add that record to a record batch destined for that topic partition.
Record batches have a few benefits:
- more efficient compression, all records in the batch are compressed together
- they reduce the number of individual requests to the Kafka brokers which reduces overhead and increases throughput
Record Batch Compression #
Compressing record batches trades a little bit of CPU for a significant reduction in network bandwidth and disk space usage.
Unless the values of your records are already compressed, your producers should be using compression.
your producer config. It can be worth comparing
lz4 compressed size and performance with
with your real data.
gzip, it is almost always strictly worse than the other options.
Even if your individual record values are already compressed, there can be advantages to using compression as the entire record (including the key, value, headers and other attributes) will be added to a record batch. This lets the compression algorithm find repeated sections across all records in the batch.
Most broker/topic configs should keep the default
producer. This lets the Kafka broker know that the producer will handle compression
and it will not spend time re-compressing the record batch.
If your broker specifies a codec that is different than the producer’s codec it will re-encode the batch with the broker’s preferred codec. This can put significant burden on the brokers’ memory and CPU. It is better to offload compression to the producers if possible.
How Many Records will fit in a Batch? #
Napkin math: (average compressed record’s size in bytes)/(batch.size bytes - 61 bytes)
The batch record header always takes up 61 bytes.
So if your average compressed record (including key, value, and headers) is 500 bytes and you use the
16384, you can expect to have 32 records in the average batch.
If your average compressed record takes up 32KB, every “batch” will be a record by itself.
Batch Config Details #
A producer will batch up to
16384 - 16KB) of compressed records (minus a little overhead for the batch’s metadata) per topic partition.
When creating a new batch, Kafka will allocate a byte buffer for that batch that is the larger of
or the uncompressed size of the record it wants to add to the batch. It then compresses the record and adds it to
When the next record comes in, if that batch hasn’t been sent yet, it will see if the existing batch has room for the record by seeing if the uncompressed record will fit into the batch. If there isn’t room, it will close that batch and allocate a new buffer with this
If there is room, it will compress and add the record to the batch and leave the batch open for new records to be added.
This means that if all of your messages are 32K uncompressed, every batch you have will have a single record in it.
If you increase the
batch.size to 32K, you will still only have a single record in each batch. To get 2 or more
records in a batch, your
batch.size must be greater than
(uncompressed message size) + (prevously compressed message size).
In testing messages that were 750KiB in size, I’ve gotten 3x the performance by increasing my
batch.size from 750K to
950K because compression was allowing 2 compressed messages to fit in the batch and still seeing enough room to add
one more 750K message.
Additionally, the producer will allocate a pool of
33554432 - 32MB) bytes that is shared for all active and in-flight compressed record batches across every
The pool of
buffer.memory will be allocated in
batch.size chunks that are cleared and reused by the producer for
garbage collection efficiency reasons.
If a batch isn’t full it will wait up to
0) after the last message has been added to the batch before sending it.
linger.ms is not enabled by
default, but can help reduce the number of requests sent without much additional latency if you set it to a low value.
So if your average compressed record is 32KB, you’ll want to test out significantly increasing
10-20x and possibly also
buffer.memory depending on how many unique topic partitions you could produce to.
If you increase
batch.size, you’ll want to be sure to use compression. I believe the existing setting is for
none. In my testing, larger batches left at
none do far worse, but with compression on
can do much better.
The maximum value of
batch.size should be no larger than about 95% of
for the topic/broker. Values much larger than this risk being rejected by the broker and sent back to the producer
to split up and resend in smaller batches. If a broker has 1MiB (1024*1024)
message.max.bytes, I suggest trying
1024*950, 950KiB) to allow large batches without them being too large.
What Configs Limit the Maximum Record Size? #
This can be a very confusing setting, because there is a similar sounding
broker / topic
message.max.bytes that also has a default of 1MB, but
message.max.bytes is what the broker
will accept after compression.
This means that if you’re compressing messages on the producer and want to send compressed records as large as
1MB over the wire, you will want to increase the producer’s
max.request.size by 400%+ to 4MB (this assumes your
compressed messages are <25% the uncompressed size). The broker will still reject batches with compressed records
that are above its
message.max.bytes limit with a
Will a Broker ever store records in a different order than they were sent? #
With the default settings? Yes, record batches on the same partition can end up in a different order than intended.
Records within a record batch will always be in the order that they were sent.
By default, a producer will retry failed requests and it also allows multiple concurrent record batches to be in-flight to a broker simultaneously.
With these settings, if record batch
A fails and is retried, it could arrive after record batch
B has been
saved and acknowledged by the broker.
Some options for preventing this:
max.in.flight.requests.per.connection=1to allow only one batch to be sent to a broker at a time3.
retries=0and crash before committing if a publish fails. On restart, the messages will be tried again.
- Keep track of the offset assigned to records in the producer’s
sendcallback and crash before committing if they are ever out of order. On restart, the messages will be sent again.
- Turn on idempotent messages with
processing.guarantee=exactly_oncewith Kafka Streams. This adds transactions and “exactly once” delivery of messages which have their own costs.
Option #1 is probably the easiest, though it’s worth testing the effect of your system’s throughput.
Partition Leader and Replicas #
For each topic partition, there will be a broker that is the partition’s “leader”, and zero or more “replicas”.
A topic has a replication factor that determines how
many copies of data should be kept. A replication factor of
1 means that the records will only exist on the partition
leader. If that broker crashes, the data can be lost. All replicas after the first are backup copies of the data.
Their data will only be used if the leader dies, or a new leader is elected.
Data is written to, and read from, the broker that is the partition’s leader. If you try to consume directly from a replica, it will redirect you to the current leader for the partition.4
For topics where you want to avoid data loss, you should have a replication factor of
3 (all data on the leader + 2 replicas)
to ensure that the leader and at least one replica must acknowledge receipt of the record.
The producer should use
This tells the producer to wait for the leader and all currently in-sync replicas to acknowledge that they’ve
persisted the record. The default of
1 will only wait for the leader to acknowledge it, if the leader dies before
the data is replicated those records could be lost.
Replication Example #
fruit-prices topic has
3 partitions, a
min.insync.replicas=2 and a producer using
Scenario 1: all replicas are in-sync with the leader #
When producing a record, it will be sent to the leader (Broker 101) as well as both in-sync replicas (Brokers 100 and 104).
When consumers read a record, it will always come from the leader (Broker 101).
describe a kafka topic in this state you’ll see:
kafka-topics --bootstrap-server 127.0.0.1:9092 --topic user-events --describe Topic:user-events PartitionCount:3 ReplicationFactor:3 Configs:min.insync.replicas=2,cleanup.policy=compact,segment.bytes=1073741824,retention.ms=172800000,min.cleanable.dirty.ratio=0.5,delete.retention.ms=86400000 Topic: user-events Partition: 0 Leader: 101 Replicas: 101,100,104 Isr: 101,100,104 Topic: user-events Partition: 1 Leader: 104 Replicas: 104,101,102 Isr: 104,101,102 Topic: user-events Partition: 2 Leader: 102 Replicas: 102,100,103 Isr: 102,100,103
Isr (“In Sync Replicas”) column shows that all replicas are in-sync with the leader of the partition.
Scenario 2: only one replica is in-sync with the leader #
If one broker becomes unreachable, or is lagging behind for some reason, you’re still able to produce to the topic and consume from it.
This is why having a replication factor of 3 is often desirable. The cluster can tolerate a broker being down but still maintain guarantees about not having the data in a single, vulnerable place.
"guava" record is able to be produced to Broker 101 and replicated to Broker 100. The consumer is able to
"guava" record from the leader, Broker 101. Broker 104 being out of sync doesn’t stop the cluster from continuing to work.
describe a Kafka topic in this state it will look like this:
kafka-topics --bootstrap-server 127.0.0.1:9092 --topic user-events --describe Topic:user-events PartitionCount:3 ReplicationFactor:3 Configs:min.insync.replicas=2,cleanup.policy=compact,segment.bytes=1073741824,retention.ms=172800000,min.cleanable.dirty.ratio=0.5,delete.retention.ms=86400000 Topic: user-events Partition: 0 Leader: 101 Replicas: 101,100,104 Isr: 101,100 Topic: user-events Partition: 1 Leader: 104 Replicas: 104,101,102 Isr: 104,101,102 Topic: user-events Partition: 2 Leader: 102 Replicas: 102,100,103 Isr: 102,100,103
Replicas column shows that there are three replicas, but only 2
Isr (In Sync Replicas): 101 and 100. Broker
104 is a replica, but it is not in-sync. It is likely trying to catch up, but this is a signal that the broker should
be looked at to see why it is not caught up if it lasts more than a minute or two.
Scenario 3: no replicas in-sync with the leader #
No replicas are currently in-sync with the leader, all producers are halted because the cluster cannot meet
Consumers can still retrieve any values they haven’t seen (as long as they can reach the leader), but will soon run out of events to process.
describe this Kafka topic you’ll see that there is only a single
Isr (“In Sync Replica”), Broker 101, the leader.
kafka-topics --bootstrap-server 127.0.0.1:9092 --topic user-events --describe Topic:user-events PartitionCount:3 ReplicationFactor:3 Configs:min.insync.replicas=2,cleanup.policy=compact,segment.bytes=1073741824,retention.ms=172800000,min.cleanable.dirty.ratio=0.5,delete.retention.ms=86400000 Topic: user-events Partition: 0 Leader: 101 Replicas: 101,100,104 Isr: 101 Topic: user-events Partition: 1 Leader: 104 Replicas: 104,101,102 Isr: 104,101,102 Topic: user-events Partition: 2 Leader: 102 Replicas: 102,100,103 Isr: 102,100,103
One way to get in this situation would be if the other replicas’ brokers crashed and new brokers were picked to be replicas for the partition. Those brokers will start streaming the existing records from the leader, but will not be “in sync” till they have replicated all the data on the leader.
The leader checks that there are enough in-sync replicas before persisting the record batch, but there is a possible race condition where it can persist to its log before discovering the outage. In this case, it will return an error to the producer so it will resend the record batch.
Kafka translates this setting to
Senderclass and will “mute” other partitions from being sent while that one is in-flight. ↩