Создадим потребителя, получающего сообщения из темы animals, генерируемые производителем случайных названий животных. Создайте файл consumer.js в корне проекта.
Код потребителя
Воспользуемся кодом из producer.js, удалив функцию produceMessage, переменную chance и её инстанс. Заменим clientId на myConsumer. Настройки брокеров остаются прежними (три брокера в кластере).
Создадим потребителя методом consumer объекта kafka. Тема остаётся animals. Для подключения и подписки на тему используем метод subscribe и запустим потребление методом run (пример из документации kafkajs). Выведем ID раздела, ID сообщения и значение (преобразованное в строку) в консоль. Флаг fromBeginning опустим; он обрабатывает сообщения, отправленные до запуска потребителя.
consumer.subscribe({ topic: topic });
consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log({
partition: partition,
offset: message.offset,
value: message.value.toString(),
});
},
});
Настройка groupId
Для корректной работы необходимо указать groupId. Добавим параметр groupId со значением consumerGroup в конфигурацию потребителя:
const consumer = kafka.consumer({ groupId: 'consumerGroup' });
Запуск
Запустите производитель (node producer.js) и в другом терминальном окне — потребитель (node consumer.js). Потребитель будет получать сообщения пакетно, выводя в консоль ID раздела, смещение (offset) и значения сообщений (случайные названия животных).
Результат
Теперь создан производитель, отправляющий сообщения в тему animals кластера Kafka, и потребитель, регулярно получающий эти сообщения. Мы рассмотрели создание производителей и потребителей Kafka с использованием Node.js.