Learning Kafka - Kafka Producers
Hello! I'm Yuvraj. I'm a Computer Science Student. I love to learn, create, and explore new things. I am currently doing a Bachelor of Computer Science from the University of Delhi.
Now that we understand the basic concepts of Kafka, let's create our first producer. A producer is an application that sends messages to Kafka topics.
Setting Up the Project
First, let's set up our Node.js project:
- Create a
package.jsonfile in the03-producerdirectory:
{
"name": "kafka-producer-example",
"version": "1.0.0",
"description": "A simple Kafka producer example",
"main": "producer.js",
"type": "module",
"scripts": {
"start": "node producer.js"
},
"dependencies": {
"kafkajs": "^2.2.4",
"dotenv": "^16.0.3"
}
}
Notice the "type": "module" line - this enables ES6 module syntax.
- Install the dependencies:
cd 03-producer
npm install
Creating the Producer
Now, let's create our producer. Create a file named producer.js in the 03-producer directory:
// producer.js
import { Kafka } from 'kafkajs';
// Define Kafka configuration
const kafka = new Kafka({
clientId: 'my-producer',
brokers: ['localhost:9092']
});
// Create a producer instance
const producer = kafka.producer();
// Define the topic to produce messages to
const TOPIC_NAME = 'first-topic';
/**
* Send a message to the Kafka topic
* @param {Object} message - The message to send
*/
const sendMessage = async (message) => {
try {
// Add a timestamp to the message
message.timestamp = new Date().toISOString();
// Send the message to the topic
await producer.send({
topic: TOPIC_NAME,
messages: [
{ value: JSON.stringify(message) }
],
});
console.log(`Message sent: ${JSON.stringify(message)}`);
} catch (error) {
console.error(`Error sending message: ${error.message}`);
}
};
/**
* Main function to run the producer
*/
const main = async () => {
try {
// Connect to the Kafka broker
await producer.connect();
console.log('Producer connected to Kafka');
// Send 5 messages
for (let i = 1; i <= 5; i++) {
const message = {
id: i,
text: `This is message #${i}`,
source: 'nodejs-producer'
};
await sendMessage(message);
// Wait for 1 second between messages
await new Promise(resolve => setTimeout(resolve, 1000));
}
} catch (error) {
console.error(`Producer error: ${error.message}`);
} finally {
// Disconnect the producer
await producer.disconnect();
console.log('Producer disconnected');
}
};
// Run the producer
main().catch(console.error);
Understanding the Code
Let's break down what this code does:
1. Importing the Kafka Client
import { Kafka } from 'kafkajs';
We're using ES6 import syntax to import the Kafka client from the kafkajs package.
2. Configuring Kafka
const kafka = new Kafka({
clientId: 'my-producer',
brokers: ['localhost:9092']
});
We create a Kafka client with:
clientId: A name for our applicationbrokers: The addresses of the Kafka servers to connect to
3. Creating a Producer
const producer = kafka.producer();
This creates a producer instance that we'll use to send messages.
4. Sending Messages
const sendMessage = async (message) => {
try {
// Add a timestamp to the message
message.timestamp = new Date().toISOString();
// Send the message to the topic
await producer.send({
topic: TOPIC_NAME,
messages: [
{ value: JSON.stringify(message) }
],
});
console.log(`Message sent: ${JSON.stringify(message)}`);
} catch (error) {
console.error(`Error sending message: ${error.message}`);
}
};
This function:
Adds a timestamp to the message
Sends the message to our topic
Logs the result or any errors
Note that we're converting our message to a JSON string using JSON.stringify(). Kafka messages are sent as binary data, so we need to serialize our JavaScript objects.
5. Main Function
const main = async () => {
try {
// Connect to the Kafka broker
await producer.connect();
console.log('Producer connected to Kafka');
// Send 5 messages
for (let i = 1; i <= 5; i++) {
const message = {
id: i,
text: `This is message #${i}`,
source: 'nodejs-producer'
};
await sendMessage(message);
// Wait for 1 second between messages
await new Promise(resolve => setTimeout(resolve, 1000));
}
} catch (error) {
console.error(`Producer error: ${error.message}`);
} finally {
// Disconnect the producer
await producer.disconnect();
console.log('Producer disconnected');
}
};
This function:
Connects to Kafka
Sends 5 messages, waiting 1 second between each
Disconnects from Kafka when done
Running the Producer
Make sure Kafka is running (from the setup in section 1), then run:
cd 03-producer
npm start
You should see output like:
Producer connected to Kafka
Message sent: {"id":1,"text":"This is message #1","source":"nodejs-producer","timestamp":"2023-05-10T12:34:56.789Z"}
Message sent: {"id":2,"text":"This is message #2","source":"nodejs-producer","timestamp":"2023-05-10T12:34:57.789Z"}
Message sent: {"id":3,"text":"This is message #3","source":"nodejs-producer","timestamp":"2023-05-10T12:34:58.789Z"}
Message sent: {"id":4,"text":"This is message #4","source":"nodejs-producer","timestamp":"2023-05-10T12:34:59.789Z"}
Message sent: {"id":5,"text":"This is message #5","source":"nodejs-producer","timestamp":"2023-05-10T12:35:00.789Z"}
Producer disconnected
Advanced Producer Options
Our example is simple, but KafkaJS producers have many options for advanced use cases:
Message Keys
You can add a key to your messages:
await producer.send({
topic: TOPIC_NAME,
messages: [
{
key: 'user-123', // Add a key
value: JSON.stringify(message)
}
],
});
Messages with the same key always go to the same partition. This is useful for ensuring order for related messages.
Batching Messages
You can send multiple messages at once:
await producer.send({
topic: TOPIC_NAME,
messages: [
{ value: JSON.stringify(message1) },
{ value: JSON.stringify(message2) },
{ value: JSON.stringify(message3) }
],
});
Compression
You can compress messages to save bandwidth:
const producer = kafka.producer({
allowAutoTopicCreation: true,
transactionTimeout: 30000,
compression: 'gzip' // Use gzip compression
});
Acknowledgment Levels
You can control when the producer considers a message "sent":
const producer = kafka.producer({
acks: 1 // 0 = no ack, 1 = leader ack, -1 = all acks
});
Next Steps
Now that we've created a producer, let's create a consumer to read the messages we've sent.