Skip to main content

Command Palette

Search for a command to run...

Learning Kafka - Kafka Consumers

Updated
6 min read
Y

Hello! I'm Yuvraj. I'm a Computer Science Student. I love to learn, create, and explore new things. I am currently doing a Bachelor of Computer Science from the University of Delhi.

Now that we've created a producer that sends messages to Kafka, let's create a consumer that reads those messages. A consumer is an application that subscribes to Kafka topics and processes the messages.

Setting Up the Project

First, let's set up our Node.js project:

  1. Create a package.json file in the 04-consumer directory:
{
  "name": "kafka-consumer-example",
  "version": "1.0.0",
  "description": "A simple Kafka consumer example",
  "main": "consumer.js",
  "type": "module",
  "scripts": {
    "start": "node consumer.js"
  },
  "dependencies": {
    "kafkajs": "^2.2.4",
    "dotenv": "^16.0.3"
  }
}
  1. Install the dependencies:
cd 04-consumer
npm install

Creating the Consumer

Now, let's create our consumer. Create a file named consumer.js in the 04-consumer directory:

// consumer.js
import { Kafka } from 'kafkajs';

// Define Kafka configuration
const kafka = new Kafka({
  clientId: 'my-consumer',
  brokers: ['localhost:9092']
});

// Define the topic to consume messages from
const TOPIC_NAME = 'first-topic';
// Define the consumer group ID
const GROUP_ID = 'my-group';

// Create a consumer instance
const consumer = kafka.consumer({ groupId: GROUP_ID });

/**
 * Process a message from Kafka
 * @param {Object} message - The message to process
 */
const processMessage = (message) => {
  try {
    // Parse the message value
    const value = JSON.parse(message.value.toString());

    // Print message details
    console.log('Received message:');
    console.log(`  Partition: ${message.partition}`);
    console.log(`  Offset: ${message.offset}`);
    console.log(`  Key: ${message.key || 'null'}`);
    console.log(`  Value: ${JSON.stringify(value)}`);
    console.log('-'.repeat(50));

    // Here you would typically do something with the message
    // For example, save it to a database, trigger an action, etc.
  } catch (error) {
    console.error(`Error processing message: ${error.message}`);
  }
};

/**
 * Main function to run the consumer
 */
const main = async () => {
  try {
    // Connect to the Kafka broker
    await consumer.connect();
    console.log('Consumer connected to Kafka');

    // Subscribe to the topic
    await consumer.subscribe({ 
      topic: TOPIC_NAME, 
      fromBeginning: true  // Read from the beginning of the topic
    });
    console.log(`Subscribed to topic: ${TOPIC_NAME}`);

    // Start consuming messages
    await consumer.run({
      eachMessage: async ({ topic, partition, message }) => {
        processMessage(message);
      },
    });

    console.log('Consumer started. Waiting for messages...');
    console.log('Press Ctrl+C to exit');
  } catch (error) {
    console.error(`Consumer error: ${error.message}`);
  }
};

// Handle graceful shutdown
const shutdown = async () => {
  try {
    await consumer.disconnect();
    console.log('Consumer disconnected');
    process.exit(0);
  } catch (error) {
    console.error(`Error during shutdown: ${error.message}`);
    process.exit(1);
  }
};

// Listen for termination signals
process.on('SIGINT', shutdown);
process.on('SIGTERM', shutdown);

// Run the consumer
main().catch(console.error);

Understanding the Code

Let's break down what this code does:

1. Importing the Kafka Client

import { Kafka } from 'kafkajs';

Just like with the producer, we import the Kafka client from the kafkajs package.

2. Configuring Kafka

const kafka = new Kafka({
  clientId: 'my-consumer',
  brokers: ['localhost:9092']
});

We create a Kafka client with:

  • clientId: A name for our consumer application

  • brokers: The addresses of the Kafka servers to connect to

3. Creating a Consumer

const consumer = kafka.consumer({ groupId: GROUP_ID });

This creates a consumer instance with a specific consumer group ID. The group ID is important because:

  • It identifies which consumer group this consumer belongs to

  • Kafka uses it to manage which consumers read from which partitions

  • If multiple consumers have the same group ID, they'll share the workload

4. Subscribing to a Topic

await consumer.subscribe({ 
  topic: TOPIC_NAME, 
  fromBeginning: true  // Read from the beginning of the topic
});

This subscribes our consumer to the topic. The fromBeginning: true option means we want to read all messages in the topic, even those that were sent before our consumer started.

5. Processing Messages

await consumer.run({
  eachMessage: async ({ topic, partition, message }) => {
    processMessage(message);
  },
});

This starts the consumer running and defines what happens when a message is received. For each message, we call our processMessage function.

const processMessage = (message) => {
  try {
    // Parse the message value
    const value = JSON.parse(message.value.toString());

    // Print message details
    console.log('Received message:');
    console.log(`  Partition: ${message.partition}`);
    console.log(`  Offset: ${message.offset}`);
    console.log(`  Key: ${message.key || 'null'}`);
    console.log(`  Value: ${JSON.stringify(value)}`);
    console.log('-'.repeat(50));

    // Here you would typically do something with the message
    // For example, save it to a database, trigger an action, etc.
  } catch (error) {
    console.error(`Error processing message: ${error.message}`);
  }
};

This function:

  1. Parses the message value (converting it from a JSON string back to a JavaScript object)

  2. Logs details about the message

  3. Would typically do something useful with the message (in a real application)

6. Graceful Shutdown

const shutdown = async () => {
  try {
    await consumer.disconnect();
    console.log('Consumer disconnected');
    process.exit(0);
  } catch (error) {
    console.error(`Error during shutdown: ${error.message}`);
    process.exit(1);
  }
};

// Listen for termination signals
process.on('SIGINT', shutdown);
process.on('SIGTERM', shutdown);

This code ensures that when the application is stopped (e.g., by pressing Ctrl+C), it properly disconnects from Kafka before exiting.

Running the Consumer

Make sure Kafka is running (from the setup in section 1), then run:

cd 04-consumer
npm start

You should see output like:

Consumer connected to Kafka
Subscribed to topic: first-topic
Consumer started. Waiting for messages...
Press Ctrl+C to exit

If you've already run the producer from the previous section, you should see the messages it sent:

Received message:
  Partition: 0
  Offset: 0
  Key: null
  Value: {"id":1,"text":"This is message #1","source":"nodejs-producer","timestamp":"2023-05-10T12:34:56.789Z"}
--------------------------------------------------
Received message:
  Partition: 0
  Offset: 1
  Key: null
  Value: {"id":2,"text":"This is message #2","source":"nodejs-producer","timestamp":"2023-05-10T12:34:57.789Z"}
--------------------------------------------------
...

If you run the producer again while the consumer is running, you'll see new messages appear in real-time.

Advanced Consumer Options

Our example is simple, but KafkaJS consumers have many options for advanced use cases:

Controlling Offsets

You can manually control which messages you've processed:

await consumer.run({
  eachMessage: async ({ topic, partition, message }) => {
    processMessage(message);

    // Manually commit the offset
    await consumer.commitOffsets([
      { topic, partition, offset: (parseInt(message.offset) + 1).toString() }
    ]);
  },
});

This is useful if you want to ensure a message is fully processed before marking it as consumed.

Batch Processing

Instead of processing one message at a time, you can process messages in batches:

await consumer.run({
  eachBatch: async ({ batch, resolveOffset, heartbeat, isRunning, isStale }) => {
    for (let message of batch.messages) {
      if (!isRunning() || isStale()) break;

      processMessage(message);
      resolveOffset(message.offset);
      await heartbeat();
    }
  },
});

This can be more efficient for high-volume processing.

Multiple Topics

You can subscribe to multiple topics:

await consumer.subscribe({ topic: 'topic1' });
await consumer.subscribe({ topic: 'topic2' });
await consumer.subscribe({ topic: 'topic3' });

Consumer Groups and Rebalancing

When you have multiple consumers in the same group, Kafka will distribute partitions among them. If a consumer joins or leaves the group, Kafka will "rebalance" the partitions.

You can listen for these events:

consumer.on(consumer.events.GROUP_JOIN, ({ payload }) => {
  console.log(`Consumer joined group: ${payload.groupId}`);
});

consumer.on(consumer.events.REBALANCE, ({ payload }) => {
  console.log(`Rebalancing: ${payload.type}`);
});

Next Steps

Now that we've created both a producer and a consumer, let's build a more realistic example that shows how Kafka can be used in a real-world scenario.

Next: Real-world Example

24 views