kafka group id что такое

understanding consumer group id

I did fresh installation of Apache Kafka 0.10.1.0.

I was able to send / receive messages on command prompt.

While using Producer / Consumer Java Example, I am not able to know group.id parameter on Consumer Example.

Let me know on how to fix this issue.

Below is Consumer Example I had used:

After running the command for consumer, I can see the messages (on the console) posted by producer. But unable to see the messages from java program

6 Answers 6

Consumers label themselves with a consumer group name, and each record published to a topic is delivered to one consumer instance within each subscribing consumer group. Consumer instances can be in separate processes or on separate machines.

If all the consumer instances have the same consumer group, then the records will effectively be load balanced over the consumer instances.

If all the consumer instances have different consumer groups, then each record will be broadcast to all the consumer processes.

The group.id is a string that uniquely identifies the group of consumer processes to which this consumer belongs.

Here are some test results on partition and consumer property group.id

consumer.group id is to load balance the produced data (if the group.id is different for each consumer, each consumer will get the copy of data)

if partition=1 and total consumers count = 2, only one out of two active consumer will get data

if partition=2 and total consumers count = 2, each of the two active consumers evenly get data

if partition=3 and total consumers count = 2, each of the two active consumers will get data. one consumer gets data from 2 partitions and other gets data from 1 partition.

if partition=3 and total consumers count = 3, each of the three active consumers evenly gets data.

Источник

Apache Kafka для чайников

Данная статья будет полезной тем, кто только начал знакомиться с микросервисной архитектурой и с сервисом Apache Kafka. Материал не претендует на подробный туториал, но поможет быстро начать работу с данной технологией. Я расскажу о том, как установить и настроить Kafka на Windows 10. Также мы создадим проект, используя Intellij IDEA и Spring Boot.

Зачем?

Трудности в понимании тех или иных инструментов часто связаны с тем, что разработчик никогда не сталкивался с ситуациями, в которых эти инструменты могут понадобиться. С Kafka всё обстоит точно также. Опишем ситуацию, в которой данная технология будет полезной. Если у вас монолитная архитектура приложения, то разумеется, никакая Kafka вам не нужна. Всё меняется с переходом на микросервисы. По сути, каждый микросервис – это отдельная программа, выполняющая ту или иную функцию, и которая может быть запущена независимо от других микросервисов. Микросервисы можно сравнить с сотрудниками в офисе, которые сидят за отдельными столами и независимо от коллег решают свою задачу. Работа такого распределённого коллектива немыслима без централизованной координации. Сотрудники должны иметь возможность обмениваться сообщениями и результатами своей работы между собой. Именно эту проблему и призвана решить Apache Kafka для микросервисов.

Apache Kafka является брокером сообщений. С его помощью микросервисы могут взаимодействовать друг с другом, посылая и получая важную информацию. Возникает вопрос, почему не использовать для этих целей обычный POST – reqest, в теле которого можно передать нужные данные и таким же образом получить ответ? У такого подхода есть ряд очевидных минусов. Например, продюсер (сервис, отправляющий сообщение) может отправить данные только в виде response’а в ответ на запрос консьюмера (сервиса, получающего данные). Допустим, консьюмер отправляет POST – запрос, и продюсер отвечает на него. В это время консьюмер по каким-то причинам не может принять полученный ответ. Что будет с данными? Они будут потеряны. Консьюмеру снова придётся отправлять запрос и надеяться, что данные, которые он хотел получить, за это время не изменились, и продюсер всё ещё готов принять request.

Apache Kafka решает эту и многие другие проблемы, возникающие при обмене сообщениями между микросервисами. Не лишним будет напомнить, что бесперебойный и удобный обмен данными – одна из ключевых проблем, которую необходимо решить для обеспечения устойчивой работы микросервисной архитектуры.

Установка и настройка ZooKeeper и Apache Kafka на Windows 10

Первое, что надо знать для начала работы — это то, что Apache Kafka работает поверх сервиса ZooKeeper. ZooKeeper — это распределенный сервис конфигурирования и синхронизации, и это всё, что нам нужно знать о нём в данном контексте. Мы должны скачать, настроить и запустить его перед тем, как начать работу с Kafka. Прежде чем начать работу с ZooKeeper, убедитесь, что у вас установлен и настроен JRE.

Извлекаем из скаченного архива ZooKeeper`а файлы в какую-нибудь папку на диске.
В папке zookeeper с номером версии, находим папку conf и в ней файл “zoo_sample.cfg”.

kafka group id что такое. Смотреть фото kafka group id что такое. Смотреть картинку kafka group id что такое. Картинка про kafka group id что такое. Фото kafka group id что такое

Копируем его и меняем название копии на “zoo.cfg”. Открываем файл-копию и находим в нём строчку dataDir=/tmp/zookeeper. Прописываем в данной строчке полный путь к нашей папке zookeeper-х.х.х. У меня это выглядит так: dataDir=C:\\ZooKeeper\\zookeeper-3.6.0
kafka group id что такое. Смотреть фото kafka group id что такое. Смотреть картинку kafka group id что такое. Картинка про kafka group id что такое. Фото kafka group id что такое
Теперь добавим системную переменную среды: ZOOKEEPER_HOME = C:\ ZooKeeper \zookeeper-3.4.9 и в конце системной переменной Path добавим запись: ;%ZOOKEEPER_HOME%\bin;

Запускаем командную строку и пишем команду:

Если всё сделано правильно, вы увидите примерно следующее.

kafka group id что такое. Смотреть фото kafka group id что такое. Смотреть картинку kafka group id что такое. Картинка про kafka group id что такое. Фото kafka group id что такое

Это означает, что ZooKeeper стартанул нормально. Переходим непосредственно к установке и настройке сервера Apache Kafka. Скачиваем свежую версию с официального сайта и извлекаем содержимое архива: kafka.apache.org/downloads

В папке с Kafka находим папку config, в ней находим файл server.properties и открываем его.

kafka group id что такое. Смотреть фото kafka group id что такое. Смотреть картинку kafka group id что такое. Картинка про kafka group id что такое. Фото kafka group id что такое

Находим строку log.dirs= /tmp/kafka-logs и указываем в ней путь, куда Kafka будет сохранять логи: log.dirs=c:/kafka/kafka-logs.

kafka group id что такое. Смотреть фото kafka group id что такое. Смотреть картинку kafka group id что такое. Картинка про kafka group id что такое. Фото kafka group id что такое

В этой же папке редактируем файл zookeeper.properties. Строчку dataDir=/tmp/zookeeper меняем на dataDir=c:/kafka/zookeeper-data, не забывая при этом, после имени диска указывать путь к своей папке с Kafka. Если вы всё сделали правильно, можно запускать ZooKeeper и Kafka.

kafka group id что такое. Смотреть фото kafka group id что такое. Смотреть картинку kafka group id что такое. Картинка про kafka group id что такое. Фото kafka group id что такое

Для кого-то может оказаться неприятной неожиданностью, что никакого GUI для управления Kafka нет. Возможно, это потому, что сервис рассчитан на суровых нёрдов, работающих исключительно с консолью. Так или иначе, для запуска кафки нам потребуется командная строка.

Сначала надо запустить ZooKeeper. В папке с кафкой находим папку bin/windows, в ней находим файл для запуска сервиса zookeeper-server-start.bat, кликаем по нему. Ничего не происходит? Так и должно быть. Открываем в этой папке консоль и пишем:

Опять не работает? Это норма. Всё потому что zookeeper-server-start.bat для своей работы требует параметры, прописанные в файле zookeeper.properties, который, как мы помним, лежит в папке config. Пишем в консоль:

Теперь всё должно стартануть нормально.

kafka group id что такое. Смотреть фото kafka group id что такое. Смотреть картинку kafka group id что такое. Картинка про kafka group id что такое. Фото kafka group id что такое

Ещё раз открываем консоль в этой папке (ZooKeeper не закрывать!) и запускаем kafka:

Для того, чтобы не писать каждый раз команды в командной строке, можно воспользоваться старым проверенным способом и создать батник со следующим содержимым:

Строка timeout 10 нужна для того, чтобы задать паузу между запуском zookeeper и kafka. Если вы всё сделали правильно, при клике на батник должны открыться две консоли с запущенным zookeeper и kafka.Теперь мы можем прямо из командной строки создать продюсера сообщений и консьюмера с нужными параметрами. Но, на практике это может понадобиться разве что для тестирования сервиса. Гораздо больше нас будет интересовать, как работать с kafka из IDEA.

Работа с kafka из IDEA

Мы напишем максимально простое приложение, которое одновременно будет и продюсером и консьюмером сообщения, а затем добавим в него полезные фичи. Создадим новый спринг-проект. Удобнее всего делать это с помощью спринг-инициалайзера. Добавляем зависимости org.springframework.kafka и spring-boot-starter-web

kafka group id что такое. Смотреть фото kafka group id что такое. Смотреть картинку kafka group id что такое. Картинка про kafka group id что такое. Фото kafka group id что такое

kafka group id что такое. Смотреть фото kafka group id что такое. Смотреть картинку kafka group id что такое. Картинка про kafka group id что такое. Фото kafka group id что такое

В итоге файл pom.xml должен выглядеть так:

kafka group id что такое. Смотреть фото kafka group id что такое. Смотреть картинку kafka group id что такое. Картинка про kafka group id что такое. Фото kafka group id что такое

В принципе, наш продюсер готов. Всё что осталось сделать – это вызвать у него метод send(). Имеется несколько перегруженных вариантов данного метода. Мы используем в нашем проекте вариант с 3 параметрами — send(String topic, K key, V data). Так как KafkaTemplate типизирован String-ом, то ключ и данные в методе send будут являться строкой. Первым параметром указывается топик, то есть тема, в которую будут отправляться сообщения, и на которую могут подписываться консьюмеры, чтобы их получать. Если топик, указанный в методе send не существует, он будет создан автоматически. Полный текст класса выглядит так.

Контроллер мапится на localhost:8080/msg, в теле запроса передаётся ключ и само сообщений.

Отправитель сообщений готов, теперь создадим слушателя. Spring так же позволяет cделать это без особых усилий. Достаточно создать метод и пометить его аннотацией @KafkaListener, в параметрах которой можно указать только топик, который будет слушаться. В нашем случае это выглядит так.

У самого метода, помеченного аннотацией, можно указать один принимаемый параметр, имеющий тип сообщения, передаваемого продюсером.

Класс, в котором будет создаваться консьюмер необходимо пометить аннотацией @EnableKafka.

Так же в файле настроек application.property необходимо указать параметр консьюмера groupe-id. Если этого не сделать, приложение не запустится. Параметр имеет тип String и может быть любым.

Наш простейший кафка-проект готов. У нас есть отправитель и получатель сообщений. Осталось только запустить. Для начала запускаем ZooKeeper и Kafka с помощью батника, который мы написали ранее, затем запускаем наше приложение. Отправлять запрос удобнее всего с помощью Postman. В теле запроса не забываем указывать параметры msgId и msg.
kafka group id что такое. Смотреть фото kafka group id что такое. Смотреть картинку kafka group id что такое. Картинка про kafka group id что такое. Фото kafka group id что такое
Если мы видим в IDEA такую картину, значит всё работает: продюсер отправил сообщение, консьюмер получил его и вывел в консоль.
kafka group id что такое. Смотреть фото kafka group id что такое. Смотреть картинку kafka group id что такое. Картинка про kafka group id что такое. Фото kafka group id что такое

Усложняем проект

Реальные проекты с использованием Kafka конечно же сложнее, чем тот, который мы создали. Теперь, когда мы разобрались с базовыми функциями сервиса, рассмотрим, какие дополнительные возможности он предоставляет. Для начала усовершенствуем продюсера.

Если вы открывали метод send(), то могли заметить, что у всех его вариантов есть возвращаемое значение ListenableFuture >. Сейчас мы не будем подробно рассматривать возможности данного интерфейса. Здесь будет достаточно сказать, что он нужен для просмотра результата отправки сообщения.

Метод addCallback() принимает два параметра – SuccessCallback и FailureCallback. Оба они являются функциональными интерфейсами. Из названия можно понять, что метод первого будет вызван в результате успешной отправки сообщения, второго – в результате ошибки.Теперь, если мы запустим проект, то увидим на консоли примерно следующее:

Посмотрим ещё раз внимательно на нашего продюсера. Интересно, что будет если в качестве ключа будет не String, а, допустим, Long, а в качестве передаваемого сообщения и того хуже – какая-нибудь сложная DTO? Попробуем для начала изменить ключ на числовое значение…

kafka group id что такое. Смотреть фото kafka group id что такое. Смотреть картинку kafka group id что такое. Картинка про kafka group id что такое. Фото kafka group id что такое

Если мы укажем в продюсере в качестве ключа Long, то приложение нормально запуститься, но при попытке отправить сообщение будет выброшен ClassCastException и будет сообщено, что класс Long не может быть приведён к классу String.

kafka group id что такое. Смотреть фото kafka group id что такое. Смотреть картинку kafka group id что такое. Картинка про kafka group id что такое. Фото kafka group id что такое

В методе producerConfigs() создаём мапу с конфигурациями и в качестве сериализатора для ключа указываем LongSerializer.class. Запускаем, отправляем запрос из Postman и видим, что теперь всё работает, как надо: продюсер отправляет сообщение, а консьюмер принимает его.

Теперь изменим тип передаваемого значения. Что если у нас не стандартный класс из библиотеки Java, а какой-нибудь кастомный DTO. Допустим такой.

Для отправки DTO в качестве сообщения, нужно внести некоторые изменения в конфигурацию продюсера. В качестве сериализатора значения сообщения укажем JsonSerializer.class и не забудем везде изменить тип String на UserDto.

Отправим сообщение. В консоль будет выведена следующая строка:

kafka group id что такое. Смотреть фото kafka group id что такое. Смотреть картинку kafka group id что такое. Картинка про kafka group id что такое. Фото kafka group id что такое

Теперь займёмся усложнением консьюмера. До этого наш метод public void msgListener(String msg), помеченный аннотацией @KafkaListener(topics=«msg») в качестве параметра принимал String и выводил его на консоль. Как быть, если мы хотим получить другие параметры передаваемого сообщения, например, ключ или партицию? В этом случае тип передаваемого значения необходимо изменить.

Из объекта ConsumerRecord мы можем получить все интересующие нас параметры.

kafka group id что такое. Смотреть фото kafka group id что такое. Смотреть картинку kafka group id что такое. Картинка про kafka group id что такое. Фото kafka group id что такое

Мы видим, что вместо ключа на консоль выводятся какие-то кракозябры. Это потому, что для десериализации ключа по умолчанию используется StringDeserializer, и если мы хотим, чтобы ключ в целочисленном формате корректно отображался, мы должны изменить его на LongDeserializer. Для настройки консьюмера в пакете config создадим класс KafkaConsumerConfig.

kafka group id что такое. Смотреть фото kafka group id что такое. Смотреть картинку kafka group id что такое. Картинка про kafka group id что такое. Фото kafka group id что такое

Видим, что теперь ключ отображается как надо, а это значит, что всё работает. Конечно, возможности Apache Kafka далеко выходят за пределы тех, что описаны в данной статье, однако, надеюсь, прочитав её, вы составите представление о данном сервисе и, самое главное, сможете начать работу с ним.

Мойте руки чаще, носите маски, не выходите без необходимости на улицу, и будьте здоровы.

Источник

Kafka ConsumerВ¶

Confluent Platform includes the Java consumer shipped with Apache KafkaВ®.

This section gives a high-level overview of how the consumer works and an introduction to the configuration settings for tuning.

To learn more about the consumer API, see this short video provided as part of the free Apache Kafka 101 course.

ConceptsВ¶

Consumer groupsВ¶

A consumer group is a set of consumers which cooperate to consume data from some topics. The partitions of all the topics are divided among the consumers in the group. As new group members arrive and old members leave, the partitions are re-assigned so that each member receives a proportional share of the partitions. This is known as rebalancing the group.

The main difference between the older “high-level” consumer and the new consumer is that the former depended on ZooKeeper for group management, while the latter uses a group protocol built into Kafka itself. In this protocol, one of the brokers is designated as the group’s coordinator and is responsible for managing the members of the group as well as their partition assignments.

When the consumer starts up, it finds the coordinator for its group and sends a request to join the group. The coordinator then begins a group rebalance so that the new member is assigned its fair share of the group’s partitions. Every rebalance results in a new generation of the group.

Each member in the group must send heartbeats to the coordinator in order to remain a member of the group. If no hearbeat is received before expiration of the configured session timeout, then the coordinator will kick the member out of the group and reassign its partitions to another member.

Offset ManagementВ¶

After the consumer receives its assignment from the coordinator, it must determine the initial position for each assigned partition. When the group is first created, before any messages have been consumed, the position is set according to a configurable offset reset policy ( auto.offset.reset ). Typically, consumption starts either at the earliest offset or the latest offset.

As a consumer in the group reads messages from the partitions assigned by the coordinator, it must commit the offsets corresponding to the messages it has read. If the consumer crashes or is shut down, its partitions will be re-assigned to another member, which will begin consumption from the last committed offset of each partition. If the consumer crashes before any offset has been committed, then the consumer which takes over its partitions will use the reset policy.

By default, the consumer is configured to auto-commit offsets. Using auto-commit gives you “at least once” delivery: Kafka guarantees that no messages will be missed, but duplicates are possible. Auto-commit basically works as a cron with a period set through the auto.commit.interval.ms configuration property. If the consumer crashes, then after a restart or a rebalance, the position of all partitions owned by the crashed consumer will be reset to the last committed offset. When this happens, the last committed position may be as old as the auto-commit interval itself. Any messages which have arrived since the last commit will have to be read again.

Each call to the commit API results in an offset commit request being sent to the broker. Using the synchronous API, the consumer is blocked until that request returns successfully. This may reduce overall throughput since the consumer might otherwise be able to process records while that commit is pending.

One way to deal with this is to increase the amount of data that is returned when polling. The consumer has a configuration setting fetch.min.bytes which controls how much data is returned in each fetch. The broker will hold on to the fetch until enough data is available (or fetch.max.wait.ms expires). The tradeoff, however, is that this also increases the amount of duplicates that have to be dealt with in a worst-case failure.

A second option is to use asynchronous commits. Instead of waiting for the request to complete, the consumer can send the request and return immediately by using asynchronous commits.

So if it helps performance, why not always use async commits? The main reason is that the consumer does not retry the request if the commit fails. This is something that committing synchronously gives you for free; it will retry indefinitely until the commit succeeds or an unrecoverable error is encountered. The problem with asynchronous commits is dealing with commit ordering. By the time the consumer finds out that a commit has failed, you may already have processed the next batch of messages and even sent the next commit. In this case, a retry of the old commit could cause duplicate consumption.

Instead of complicating the consumer internals to try and handle this problem in a sane way, the API gives you a callback which is invoked when the commit either succeeds or fails. If you like, you can use this callback to retry the commit, but you will have to deal with the same reordering problem.

Offset commit failures are merely annoying if the following commits succeed since they won’t actually result in duplicate reads. However, if the last commit fails before a rebalance occurs or before the consumer is shut down, then offsets will be reset to the last commit and you will likely see duplicates. A common pattern is therefore to combine async commits in the poll loop with sync commits on rebalances or shut down. Committing on close is straightforward, but you need a way to hook into rebalances.

Each rebalance has two phases: partition revocation and partition assignment. The revocation method is always called before a rebalance and is the last chance to commit offsets before the partitions are re-asssigned. The assignment method is always called after the rebalance and can be used to set the initial position of the assigned partitions. In this case, the revocation hook is used to commit the current offsets synchronously.

In general, asynchronous commits should be considered less safe than synchronous commits. Consecutive commit failures before a crash will result in increased duplicate processing. You can mitigate this danger by adding logic to handle commit failures in the callback or by mixing occasional synchronous commits, but you shouldn’t add too much complexity unless testing shows it is necessary. If you need more reliability, synchronous commits are there for you, and you can still scale up by increasing the number of topic partitions and the number of consumers in the group. But if you just want to maximize throughput and you’re willing to accept some increase in the number of duplicates, then asynchronous commits may be a good option.

A somewhat obvious point, but one that’s worth making is that asynchronous commits only make sense for “at least once” message delivery. To get “at most once,” you need to know if the commit succeeded before consuming the message. This implies a synchronous commit unless you have the ability to “unread” a message after you find that the commit failed.

In the examples, we show several detailed examples of the commit API and discuss the tradeoffs in terms of performance and reliability.

When writing to an external system, the consumer’s position must be coordinated with what is stored as output. That is why the consumer stores its offset in the same place as its output. For example, a Kafka Connect connector populates data in HDFS along with the offsets of the data it reads so that it is guaranteed that either data and offsets are both updated, or neither is. A similar pattern is followed for many other data systems that require these stronger semantics, and for which the messages do not have a primary key to allow for deduplication.

This is how Kafka supports exactly-once processing in Kafka Streams, and the transactional producer or consumer can be used generally to provide exactly-once delivery when transferring and processing data between Kafka topics. Otherwise, Kafka guarantees at-least-once delivery by default, and you can implement at-most-once delivery by disabling retries on the producer and committing offsets in the consumer prior to processing a batch of messages.

Consumers can fetch/consume from out-of-sync follower replicas if using a fetch-from-follower configuration. See Multi-Region Clusters to learn more.

Kafka Consumer ConfigurationВ¶

Core configurationВ¶

Group ConfigurationВ¶

You should always configure group.id unless you are using the simple assignment API and you don’t need to store offsets in Kafka.

You can control the session timeout by overriding the session.timeout.ms value. The default is 10 seconds in the C/C++ and Java clients, but you can increase the time to avoid excessive rebalancing, for example due to poor network connectivity or long GC pauses.

The main drawback to using a larger session timeout is that it will take longer for the coordinator to detect when a consumer instance has crashed, which means it will also take longer for another consumer in the group to take over its partitions. For normal shutdowns, however, the consumer sends an explicit request to the coordinator to leave the group which triggers an immediate rebalance.

Offset ManagementВ¶

Second, use auto.offset.reset to define the behavior of the consumer when there is no committed position (which would be the case when the group is first initialized) or when an offset is out of range. You can choose either to reset the position to the “earliest” offset or the “latest” offset (the default). You can also select “none” if you would rather set the initial offset yourself and you are willing to handle out of range errors manually.

Message handlingВ¶

While the Java consumer does all IO and processing in the foreground thread, librdkafka-based clients (C/C++, Python, Go and C#) use a background thread. The main consequence of this is that polling is totally safe when used from multiple threads. You can use this to parallelize message handling in multiple threads. From a high level, poll is taking messages off of a queue which is filled in the background.

Another consequence of using a background thread is that all heartbeats and rebalancing are executed in the background. The benefit of this is that you don’t need to worry about message handling causing the consumer to “miss” a rebalance. The drawback, however, is that the background thread will continue heartbeating even if your message processor dies. If this happens, then the consumer will continue to hold on to its partitions and the read lag will continue to build until the process is shut down.

Although the clients have taken different approaches internally, they are not as far apart as they seem. To provide the same abstraction in the Java client, you could place a queue in between the poll loop and the message processors. The poll loop would fill the queue and the processors would pull messages off of it.

Kafka Consumer Group Command ToolВ¶

Kafka includes an admin utility for viewing the status of consumer groups.

List GroupsВ¶

To get a list of the active groups in the cluster, you can use the kafka-consumer-groups utility included in the Kafka distribution. On a large cluster, this may take a while since it collects the list by inspecting each broker in the cluster.

Describe GroupВ¶

The utility kafka-consumer-groups can also be used to collect information on a current group. For example, to see the current assignments for the foo group, use the following command:

If you happen to invoke this while a rebalance is in progress, the command will report an error. Retry again and you should see the assignments for all the members in the current generation.

Example codeВ¶

Источник

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *