Learning Kafka - Kafka Consumers
Hello! I'm Yuvraj. I'm a Computer Science Student. I love to learn, create, and explore new things. I am currently doing a Bachelor of Computer Science from the University of Delhi.
Now that we've created a producer that sends messages to Kafka, let's create a consumer that reads those messages. A consumer is an application that subscribes to Kafka topics and processes the messages.
Setting Up the Project
First, let's set up our Node.js project:
- Create a
package.jsonfile in the04-consumerdirectory:
{
"name": "kafka-consumer-example",
"version": "1.0.0",
"description": "A simple Kafka consumer example",
"main": "consumer.js",
"type": "module",
"scripts": {
"start": "node consumer.js"
},
"dependencies": {
"kafkajs": "^2.2.4",
"dotenv": "^16.0.3"
}
}
- Install the dependencies:
cd 04-consumer
npm install
Creating the Consumer
Now, let's create our consumer. Create a file named consumer.js in the 04-consumer directory:
// consumer.js
import { Kafka } from 'kafkajs';
// Define Kafka configuration
const kafka = new Kafka({
clientId: 'my-consumer',
brokers: ['localhost:9092']
});
// Define the topic to consume messages from
const TOPIC_NAME = 'first-topic';
// Define the consumer group ID
const GROUP_ID = 'my-group';
// Create a consumer instance
const consumer = kafka.consumer({ groupId: GROUP_ID });
/**
* Process a message from Kafka
* @param {Object} message - The message to process
*/
const processMessage = (message) => {
try {
// Parse the message value
const value = JSON.parse(message.value.toString());
// Print message details
console.log('Received message:');
console.log(` Partition: ${message.partition}`);
console.log(` Offset: ${message.offset}`);
console.log(` Key: ${message.key || 'null'}`);
console.log(` Value: ${JSON.stringify(value)}`);
console.log('-'.repeat(50));
// Here you would typically do something with the message
// For example, save it to a database, trigger an action, etc.
} catch (error) {
console.error(`Error processing message: ${error.message}`);
}
};
/**
* Main function to run the consumer
*/
const main = async () => {
try {
// Connect to the Kafka broker
await consumer.connect();
console.log('Consumer connected to Kafka');
// Subscribe to the topic
await consumer.subscribe({
topic: TOPIC_NAME,
fromBeginning: true // Read from the beginning of the topic
});
console.log(`Subscribed to topic: ${TOPIC_NAME}`);
// Start consuming messages
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
processMessage(message);
},
});
console.log('Consumer started. Waiting for messages...');
console.log('Press Ctrl+C to exit');
} catch (error) {
console.error(`Consumer error: ${error.message}`);
}
};
// Handle graceful shutdown
const shutdown = async () => {
try {
await consumer.disconnect();
console.log('Consumer disconnected');
process.exit(0);
} catch (error) {
console.error(`Error during shutdown: ${error.message}`);
process.exit(1);
}
};
// Listen for termination signals
process.on('SIGINT', shutdown);
process.on('SIGTERM', shutdown);
// Run the consumer
main().catch(console.error);
Understanding the Code
Let's break down what this code does:
1. Importing the Kafka Client
import { Kafka } from 'kafkajs';
Just like with the producer, we import the Kafka client from the kafkajs package.
2. Configuring Kafka
const kafka = new Kafka({
clientId: 'my-consumer',
brokers: ['localhost:9092']
});
We create a Kafka client with:
clientId: A name for our consumer applicationbrokers: The addresses of the Kafka servers to connect to
3. Creating a Consumer
const consumer = kafka.consumer({ groupId: GROUP_ID });
This creates a consumer instance with a specific consumer group ID. The group ID is important because:
It identifies which consumer group this consumer belongs to
Kafka uses it to manage which consumers read from which partitions
If multiple consumers have the same group ID, they'll share the workload
4. Subscribing to a Topic
await consumer.subscribe({
topic: TOPIC_NAME,
fromBeginning: true // Read from the beginning of the topic
});
This subscribes our consumer to the topic. The fromBeginning: true option means we want to read all messages in the topic, even those that were sent before our consumer started.
5. Processing Messages
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
processMessage(message);
},
});
This starts the consumer running and defines what happens when a message is received. For each message, we call our processMessage function.
const processMessage = (message) => {
try {
// Parse the message value
const value = JSON.parse(message.value.toString());
// Print message details
console.log('Received message:');
console.log(` Partition: ${message.partition}`);
console.log(` Offset: ${message.offset}`);
console.log(` Key: ${message.key || 'null'}`);
console.log(` Value: ${JSON.stringify(value)}`);
console.log('-'.repeat(50));
// Here you would typically do something with the message
// For example, save it to a database, trigger an action, etc.
} catch (error) {
console.error(`Error processing message: ${error.message}`);
}
};
This function:
Parses the message value (converting it from a JSON string back to a JavaScript object)
Logs details about the message
Would typically do something useful with the message (in a real application)
6. Graceful Shutdown
const shutdown = async () => {
try {
await consumer.disconnect();
console.log('Consumer disconnected');
process.exit(0);
} catch (error) {
console.error(`Error during shutdown: ${error.message}`);
process.exit(1);
}
};
// Listen for termination signals
process.on('SIGINT', shutdown);
process.on('SIGTERM', shutdown);
This code ensures that when the application is stopped (e.g., by pressing Ctrl+C), it properly disconnects from Kafka before exiting.
Running the Consumer
Make sure Kafka is running (from the setup in section 1), then run:
cd 04-consumer
npm start
You should see output like:
Consumer connected to Kafka
Subscribed to topic: first-topic
Consumer started. Waiting for messages...
Press Ctrl+C to exit
If you've already run the producer from the previous section, you should see the messages it sent:
Received message:
Partition: 0
Offset: 0
Key: null
Value: {"id":1,"text":"This is message #1","source":"nodejs-producer","timestamp":"2023-05-10T12:34:56.789Z"}
--------------------------------------------------
Received message:
Partition: 0
Offset: 1
Key: null
Value: {"id":2,"text":"This is message #2","source":"nodejs-producer","timestamp":"2023-05-10T12:34:57.789Z"}
--------------------------------------------------
...
If you run the producer again while the consumer is running, you'll see new messages appear in real-time.
Advanced Consumer Options
Our example is simple, but KafkaJS consumers have many options for advanced use cases:
Controlling Offsets
You can manually control which messages you've processed:
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
processMessage(message);
// Manually commit the offset
await consumer.commitOffsets([
{ topic, partition, offset: (parseInt(message.offset) + 1).toString() }
]);
},
});
This is useful if you want to ensure a message is fully processed before marking it as consumed.
Batch Processing
Instead of processing one message at a time, you can process messages in batches:
await consumer.run({
eachBatch: async ({ batch, resolveOffset, heartbeat, isRunning, isStale }) => {
for (let message of batch.messages) {
if (!isRunning() || isStale()) break;
processMessage(message);
resolveOffset(message.offset);
await heartbeat();
}
},
});
This can be more efficient for high-volume processing.
Multiple Topics
You can subscribe to multiple topics:
await consumer.subscribe({ topic: 'topic1' });
await consumer.subscribe({ topic: 'topic2' });
await consumer.subscribe({ topic: 'topic3' });
Consumer Groups and Rebalancing
When you have multiple consumers in the same group, Kafka will distribute partitions among them. If a consumer joins or leaves the group, Kafka will "rebalance" the partitions.
You can listen for these events:
consumer.on(consumer.events.GROUP_JOIN, ({ payload }) => {
console.log(`Consumer joined group: ${payload.groupId}`);
});
consumer.on(consumer.events.REBALANCE, ({ payload }) => {
console.log(`Rebalancing: ${payload.type}`);
});
Next Steps
Now that we've created both a producer and a consumer, let's build a more realistic example that shows how Kafka can be used in a real-world scenario.