Node.js Kafka потребитель: пример кода

Создадим потребителя, получающего сообщения из темы animals, генерируемые производителем случайных названий животных. Создайте файл consumer.js в корне проекта.

Код потребителя

Воспользуемся кодом из producer.js, удалив функцию produceMessage, переменную chance и её инстанс. Заменим clientId на myConsumer. Настройки брокеров остаются прежними (три брокера в кластере).

Создадим потребителя методом consumer объекта kafka. Тема остаётся animals. Для подключения и подписки на тему используем метод subscribe и запустим потребление методом run (пример из документации kafkajs). Выведем ID раздела, ID сообщения и значение (преобразованное в строку) в консоль. Флаг fromBeginning опустим; он обрабатывает сообщения, отправленные до запуска потребителя.

consumer.subscribe({ topic: topic });
consumer.run({
  eachMessage: async ({ topic, partition, message }) => {
    console.log({
      partition: partition,
      offset: message.offset,
      value: message.value.toString(),
    });
  },
});

Настройка groupId

Для корректной работы необходимо указать groupId. Добавим параметр groupId со значением consumerGroup в конфигурацию потребителя:

const consumer = kafka.consumer({ groupId: 'consumerGroup' });

Запуск

Запустите производитель (node producer.js) и в другом терминальном окне — потребитель (node consumer.js). Потребитель будет получать сообщения пакетно, выводя в консоль ID раздела, смещение (offset) и значения сообщений (случайные названия животных).

Результат

Теперь создан производитель, отправляющий сообщения в тему animals кластера Kafka, и потребитель, регулярно получающий эти сообщения. Мы рассмотрели создание производителей и потребителей Kafka с использованием Node.js.

Что будем искать? Например,программа