kafka partitions что такое
Understanding Kafka Topic Partitions
Everything in Kafka is modeled around partitions. They rule Kafka’s storage, scalability, replication, and message movement.
P.S I did some edits to reflect the feedback I received from the audience. Thanks for your valuable contribution. I expect more! 🙂
Everything in Kafka revolves around partitions. They play a crucial role in structuring Kafka’s storage and the production and consumption of messages.
Understanding partitions helps you learn Kafka faster. This article walks through the concepts, structure, and behavior of Kafka’s partitions.
Events, Streams, and Kafka Topics
Before diving into partitions, we need to set the stage here. So let’s look at some high-level concepts and how they relate to partitions.
Events
An event represents a fact that happened in the past. Events are immutable and never stay in one place. They always travel from one system to another system, carrying the state changes that happened.
Streams
An event stream represents related events in motion.
Topics
When an event stream enters Kafka, it is persisted as a topic. In Kafka’s universe, a topic is a materialized event stream. In other words, a topic is a stream at rest.
Topic groups related events together and durably stores them. The closest analogy for a Kafka topic is a table in a database or folder in a file system.
Topics are the central concept in Kafka that decouples producers and consumers. A consumer pulls messages off of a Kafka topic while producers push messages into a Kafka topic. A topic can have many producers and many consumers.
Partitions
Kafka’s topics are divided into several partitions. While the topic is a logical concept in Kafka, a partition is the smallest storage unit that holds a subset of records owned by a topic. Each partition is a single log file where records are written to it in an append-only fashion.
When talking about the content inside a partition, I will use the terms record and message interchangeably.
Offsets and the ordering of messages
The records in the partitions are each assigned a sequential identifier called the offset, which is unique for each record within the partition.
The offset is an incremental and immutable number, maintained by Kafka. When a record is written to a partition, it is appended to the end of the log, assigning the next sequential offset. Offsets are particularly useful for consumers when reading records from a partition. We’ll come to that at a later point.
The figure below shows a topic with three partitions. Records are being appended to the end of each one.
Although the messages within a partition are ordered, messages across a topic are not guaranteed to be ordered.
Partitions are the way that Kafka provides scalability
A Kafka cluster is made of one or more servers. In the Kafka universe, they are called Brokers. Each broker holds a subset of records that belongs to the entire cluster.
Kafka distributes the partitions of a particular topic across multiple brokers. By doing so, we’ll get the following benefits.
Partitions are the way that Kafka provides redundancy.
Kafka keeps more than one copy of the same partition across multiple brokers. This redundant copy is called a replica. If a broker fails, Kafka can still serve consumers with the replicas of partitions that failed broker owned.
Partition replication is complex, and it deserves its own post. Next time maybe?
Writing records to partitions.
How does a producer decide to which partition a record should go? There are three ways a producer can rule on that.
Using a partition key to specify the partition
A producer can use a partition key to direct messages to a specific partition. A partition key can be any value that can be derived from the application context. A unique device ID or a user ID will make a good partition key.
By default, the partition key is passed through a hashing function, which creates the partition assignment. That assures that all records produced with the same key will arrive at the same partition. Specifying a partition key enables keeping related events together in the same partition and in the exact order in which they were sent.
Key based partition assignment can lead to broker skew if keys aren’t well distributed.
For example, when customer ID is used as the partition key, and one customer generates 90% of traffic, then one partition will be getting 90% of the traffic most of the time. On small topics, this is negligible, on larger ones, it can sometime take a broker down.
When choosing a partition key, ensure that they are well distributed.
Allowing Kafka to decide the partition
If a producer doesn’t specify a partition key when producing a record, Kafka will use a round-robin partition assignment. Those records will be written evenly across all partitions of a particular topic.
However, if no partition key is used, the ordering of records can not be guaranteed within a given partition.
The key takeaway is to use a partition key to put related events together in the same partition in the exact order in which they were sent.
Writing a custom partitioner
In some situations, a producer can use its own partitioner implementation that uses other business rules to do the partition assignment.
Reading records from partitions.
Unlike the other pub/sub implementations, Kafka doesn’t push messages to consumers. Instead, consumers have to pull messages off Kafka topic partitions. A consumer connects to a partition in a broker, reads the messages in the order in which they were written.
The offset of a message works as a consumer side cursor at this point. The consumer keeps track of which messages it has already consumed by keeping track of the offset of messages. After reading a message, the consumer advances its cursor to the next offset in the partition and continues. Advancing and remembering the last read offset within a partition is the responsibility of the consumer. Kafka has nothing to do with it.
By remembering the offset of the last consumed message for each partition, a consumer can join a partition at the point in time they choose and resume from there. That is particularly useful for a consumer to resume reading after recovering from a crash.
A partition can be consumed by one or more consumers, each reading at different offsets.
Kafka has the concept of consumer groups where several consumers are grouped to consume a given topic. Consumers in the same consumer group are assigned the same group-id value.
The consumer group concept ensures that a message is only ever read by a single consumer in the group.
When a consumer group consumes the partitions of a topic, Kafka makes sure that each partition is consumed by exactly one consumer in the group.
The following figure shows the above relationship.
Consumer groups enable consumers to parallelize and process messages at very high throughputs. However, the maximum parallelism of a group will be equal to the number of partitions of that topic.
For example, if you have N + 1 consumers for a topic with N partitions, then the first N consumers will be assigned a partition, and the remaining consumer will be idle, unless one of the N consumers fails, then the waiting consumer will be assigned its partition. This is a good strategy to implement a hot failover.
The figure below illustrates this.
The key takeaway is that number of consumers don’t govern the degree of parallelism of a topic. It’s the number of partitions.
Event-driven Utopia
Quality content on building event-driven, asynchronous, cloud-native application architectures.
Apache Kafka – мой конспект
Это мой конспект, в котором коротко и по сути затрону такие понятия Kafka как:
— Тема (Topic)
— Подписчики (consumer)
— Издатель (producer)
— Группа (group), раздел (partition)
— Потоки (streams)
Kafka — основное
При изучении Kafka возникали вопросы, ответы на которые мне приходилось эксперементально получать на примерах, вот это и изложено в этом конспекте. Как стартовать и с чего начать я дам одну из ссылок ниже в материалах.
Apache Kafka – диспетчер сообщений на Java платформе. В Kafka есть тема сообщения в которую издатели пишут сообщения и есть подписчики в темах, которые читают эти сообщения, все сообщения в процессе диспетчеризации пишутся на диск и не зависит от потребителей.
В состав Kafka входят набор утилит по созданию тем, разделов, готовые издатели, подписчики для примеров и др. Для работы Kafka необходим координатор «ZooKeeper», поэтому вначале стартуем ZooKeeper (zkServer.cmd) затем сервер Kafka (kafka-server-start.bat), командные файлы находятся в соответствующих папках bin, там же и утилиты.
Создадим тему Kafka утилитой, ходящей в состав
здесь указываем сервер zookeeper, replication-factor это количество реплик журнала сообщений, partitions – количество разделов в теме (об этом ниже) и собственно сама тема – “out-topic”.
Для простого тестирования можно использовать входящие в состав готовые приложения «kafka-console-consumer» и «kafka-console-producer», но я сделаю свои. Подписчики на практике объединяют в группы, это позволит разным приложениям читать сообщения из темы параллельно.
Для каждого приложения будет организованна своя очередь, читая из которой оно выполняет перемещения указателя последнего прочитанного сообщения (offset), это называется фиксацией (commit) чтения. И так если издатель отправит сообщение в тему, то оно будет гарантированно прочитано получателем этой темы если он запущен или, как только он подключится. Причем если есть разные клиенты (client.id), которые читают из одной темы, но в разных группах, то сообщения они получат не зависимо друг от друга и в то время, когда будут готовы.
Так можно представить последователь сообщений и независимое чтение их потребителями из одной темы.
Но есть ситуация, когда сообщения в тему могут начать поступать быстрее чем уходить, т.е. потребители обрабатывают их дольше. Для этого в теме можно предусмотреть разделы (partitions) и запускать потребителей в одной группе для этой темы.
Тогда произойдет распределение нагрузки и не все сообщения в теме и группе пойдут через одного потребителя. И тогда уже будет выбрана стратегия, как распределять сообщения по разделам. Есть несколько стратегий: round-robin – это по кругу, по хэш значению ключа, или явное указание номера раздела куда писать. Подписчики в этом случае распределяются равномерно по разделам. Если, например, подписчиков будет в группе будет больше чем разделов, то кто-то не получит сообщения. Таким образом разделы делаются для улучшения масштабируемости.
Например после создания темы с одним разделом я изменил на два раздела.
my_kafka_run.cmd com.home.SimpleProducer out-topic (издатель)
my_kafka_run.cmd com.home.SimpleConsumer out-topic testGroup01 client01 (первый подписчик)
my_kafka_run.cmd com.home.SimpleConsumer out-topic testGroup01 client02 (второй подписчик)
Начав вводить в издателе пары ключ: значение можно наблюдать кто их получает. Так, например, по стратегии распределения по хэшу ключа сообщение m:1 попало клиенту client01
а сообщение n:1 клиенту client02
Если начну вводить без указания пар ключ: значение (такую возможность сделал в издателе), будет выбрана стратегия по кругу. Первое сообщение «m» попало client01, а уже втрое client02.
И еще вариант с указанием раздела, например в таком формате key:value:partition
Ранее в стратегии по хэш, m:1 уходил другому клиенту (client01), теперь при явном указании раздела (№1, нумеруются с 0) — к client02.
Если запустить подписчика с другим именем группы testGroup02 и для той же темы, то сообщения будут уходить параллельно и независимо подписчикам, т.е. если первый прочитал, а второй не был активен, то он прочитает, как только станет активен.
Можно посмотреть описания групп, темы соответственно:
Для запуска своих программ я сделал командный файл — my_kafka_run.cmd
my_kafka_run.cmd com.home.SimpleConsumer out-topic testGroup02 client01
Kafka Streams
Итак, потоки в Kafka это последовательность событий, которые получают из темы, над которой можно выполнять определенные операции, трансформации и затем результат отдать далее, например, в другую тему или сохранить в БД, в общем куда угодно. Операции могут быть как например фильтрации (filter), преобразования (map), так и агрегации (count, sum, avg). Для этого есть соответствующие классы KStream, KTable, где KTable можно представить как таблицу с текущими агрегированными значениями которые постоянно обновляются по мере поступления новых сообщений в тему. Как это происходит?
Например, издатель пишет в тему события (сообщения), Kafka все сообщения сохраняет в журнале сообщений, который имеет политику хранения (Retention Policy), например 7 дней. Например события изменения котировки это поток, далее хотим узнать среднее значение, тогда создадим Stream который возьмет историю из журнала и посчитает среднее, где ключом будет акция, а значением – среднее (это уже таблица с состоянием). Тут есть особенность – операции агрегирования в отличии от операций, например, фильтрации, сохраняют состояние. Поэтому вновь поступающие сообщения (события) в тему, будут подвержены вычислению, а результат будет сохраняться (state store), далее вновь поступающие будут писаться в журнал, Stream их будет обрабатывать, добавлять изменения к уже сохраненному состоянию. Операции фильтрации не требуют сохранения состояния. И тут тоже stream будет делать это не зависимо от издателя. Например, издатель пишет сообщения, а программа — stream в это время не работает, ничего не пропадет, все сообщения будут сохранены в журнале и как только программа-stream станет активной, она сделает вычисления, сохранит состояние, выполнит смещение для прочитанных сообщений (пометит что они прочитаны) и в дальнейшем она уже к ним не вернется, более того эти сообщения уйдут из журнала (kafka-logs). Тут видимо главное, чтобы журнал (kafka-logs) и его политика хранения позволило это. По умолчанию состояние Kafka Stream хранит в RocksDB. Журнал сообщений и все с ним связанное (темы, смещения, потоки, клиенты и др.) располагается по пути указанном в параметре «log.dirs=kafka-logs» файла конфигурации «config\server.properties», там же указывается политика хранения журнала «log.retention.hours=48». Пример лога
А путь к базе с состояниями stream указывается в параметре приложения
Состояния хранятся по ИД приложениям независимо (StreamsConfig.APPLICATION_ID_CONFIG). Пример
Проверим теперь как работает Stream. Подготовим приложение Stream из примера, который есть поставке (с некоторой доработкой для эксперимента), которое считает количество одинаковых слов и приложение издатель и подписчик. Писать будет в тему in-topic
my_kafka_run.cmd com.home.SimpleProducer in-topic
my_kafka_run.cmd com.home.KafkaCountStream in-topic app_01
my_kafka_run.cmd com.home.SimpleProducer in-topic
my_kafka_run.cmd com.home.SimpleConsumer out-topic testGroup01 client01
Начинаем вводить слова и видим их подсчет с указанием какой Stream App-ID их подсчитал
Работа будет идти независимо, можно остановить Stream и продолжать писать в тему, он потом при старте посчитает. А теперь подключим второй Stream c App-ID = app_02 (это тоже приложение, но с другим ИД), он прочитает журнал (последовательность событий, которая сохраняется согласно политике Retention), подсчитает кол-во, сохранит состояние и выдаст результат. Таким образом два потока начав работать в разное время пришли к одному результату.
А теперь представим наш журнал устарел (Retention политика) или мы его удалили (что бывает надо делать) и подключаем третий stream с App-ID = app_03 (я для этого остановил Kafka, удалил kafka-logs и вновь стартовал) и вводим в тему новое сообщение и видим первый (app_01) поток продолжил подсчет а новый третий начал с нуля.
Если затем запустим поток app_02, то он догонит первый и они будут равны в значениях. Из примера стало понятно, как Kafka обрабатывает текущий журнал, добавляет к ранее сохраненному состоянию и так далее.
Тема Kafka очень обширна, я для себя сделал первое общее представление 🙂
Understanding Kafka Topics and Partitions
I am starting to learn Kafka, during my readings, some questions came to my mind:
Does each consumer group have a corresponding partition on the broker or does each consumer have one?
Are the partitions created by the broker, and therefore not a concern for the consumers?
Since this is a queue with an offset for each partition, is it the responsibility of the consumer to specify which messages it wants to read? Does it need to save its state?
4 Answers 4
This post already has answers, but I am adding my view with a few pictures from Kafka Definitive Guide
Before answering the questions, let’s look at an overview of producer components:
The producer will decide target partition to place any message, depending on:
You should always configure group.id unless you are using the simple assignment API and you don’t need to store offsets in Kafka. It will not be a part of any group. source
3. Does each consumer group have a corresponding partition on the broker or does each consumer have one?
In one consumer group, each partition will be processed by one consumer only. These are the possible scenarios
4. As the partitions created by the broker, therefore not a concern for the consumers?
Consumer should be aware of the number of partitions, as was discussed in question 3.
5. Since this is a queue with an offset for each partition, is it the responsibility of the consumer to specify which messages it wants to read? Does it need to save its state?
More about Group Coordinator:
Let’s take those in order 🙂
By default, the producer doesn’t care about partitioning. You have the option to use a customized partitioner to have a better control, but it’s totally optional.
Yes, consumers join (or create if they’re alone) a consumer group to share load. No two consumers in the same group will ever receive the same message.
They’re not, but you can see from 3 that it’s totally useless to have more consumers than existing partitions, so it’s your maximum parallelism level for consuming.
Yes, consumers save an offset per topic per partition. This is totally handled by Kafka, no worries about it.
If a consumer ever request an offset not available for a partition on the brokers (for example, due to deletion), it enters an error mode, and ultimately reset itself for this partition to either the most recent or the oldest message available (depending on the auto.offset.reset configuration value), and continue working.
Kafka uses Topic conception which comes to bring order into message flow.
To balance the load, a topic may be divided into multiple partitions and replicated across brokers.
Partitions are ordered, immutable sequences of messages that’s continually appended i.e. a commit log.
Messages in the partition have a sequential id number that uniquely identifies each message within the partition.
Partitions allow a topic’s log to scale beyond a size that will fit on a single server (a broker) and act as the unit of parallelism.
The partitions of a topic are distributed over the brokers in the Kafka cluster where each broker handles data and requests for a share of the partitions.
Each partition is replicated across a configurable number of brokers to ensure fault tolerance.
Yes, the Producer does specify the topic
The more partitions there are in a Kafka cluster, the higher the throughput one can achieve. A rough formula for picking the number of partitions is based on throughput. You measure the throughout that you can achieve on a single partition for production (call it p) and consumption (call it c).
When the Kafka consumer is constructed and group.id does not exist yet (i.e. there are no existing consumers that are part of the group), the consumer group will be created automatically. If all consumers in a group leave the group, the group is automatically destroyed.
Each consumer group is assigned a partition, multiple consumer groups can access a single partition, but not 2 consumers belonging to a consumer group are assigned the same partition because consumer consumes messages sequentially in a group and if multiple consumers from a single group consume messages from the same partition then sequence might be lost, whereas groups being logically independent can consume from the same partition.
Brokers already have partitions. Each broker to have up to 4,000 partitions and each cluster to have up to 200,000 partitions.
Whenever a consumer enters or leaves a consumer group, the brokers rebalance the partitions across consumers, meaning Kafka handles load balancing with respect to the number of partitions per application instance for you.
Before assigning partitions to a consumer, Kafka would first check if there are any existing consumers with the given group-id. When there are no existing consumers with the given group-id, it would assign all the partitions of that topic to this new consumer. When there are two consumers already with the given group-id and a third consumer wants to consume with the same group-id. It would assign the partitions equally among all three consumers. No two consumers of the same group-id would be assigned to the same partition source
Offset is handled internally by Kafka. The current offset is a pointer to the last record that Kafka has already sent to a consumer in the most recent poll. So, the consumer doesn’t get the same record twice because of the current offset. It doesn’t need to be specified exclusively
It automatically reconfigures themselves according to need. It should give an error.