Skip to main content

Command Palette

Search for a command to run...

Learning Kafka - Kafka Producers

Updated
5 min read
Y

Hello! I'm Yuvraj. I'm a Computer Science Student. I love to learn, create, and explore new things. I am currently doing a Bachelor of Computer Science from the University of Delhi.

Now that we understand the basic concepts of Kafka, let's create our first producer. A producer is an application that sends messages to Kafka topics.

Setting Up the Project

First, let's set up our Node.js project:

  1. Create a package.json file in the 03-producer directory:
{
  "name": "kafka-producer-example",
  "version": "1.0.0",
  "description": "A simple Kafka producer example",
  "main": "producer.js",
  "type": "module",
  "scripts": {
    "start": "node producer.js"
  },
  "dependencies": {
    "kafkajs": "^2.2.4",
    "dotenv": "^16.0.3"
  }
}

Notice the "type": "module" line - this enables ES6 module syntax.

  1. Install the dependencies:
cd 03-producer
npm install

Creating the Producer

Now, let's create our producer. Create a file named producer.js in the 03-producer directory:

// producer.js
import { Kafka } from 'kafkajs';

// Define Kafka configuration
const kafka = new Kafka({
  clientId: 'my-producer',
  brokers: ['localhost:9092']
});

// Create a producer instance
const producer = kafka.producer();

// Define the topic to produce messages to
const TOPIC_NAME = 'first-topic';

/**
 * Send a message to the Kafka topic
 * @param {Object} message - The message to send
 */
const sendMessage = async (message) => {
  try {
    // Add a timestamp to the message
    message.timestamp = new Date().toISOString();

    // Send the message to the topic
    await producer.send({
      topic: TOPIC_NAME,
      messages: [
        { value: JSON.stringify(message) }
      ],
    });

    console.log(`Message sent: ${JSON.stringify(message)}`);
  } catch (error) {
    console.error(`Error sending message: ${error.message}`);
  }
};

/**
 * Main function to run the producer
 */
const main = async () => {
  try {
    // Connect to the Kafka broker
    await producer.connect();
    console.log('Producer connected to Kafka');

    // Send 5 messages
    for (let i = 1; i <= 5; i++) {
      const message = {
        id: i,
        text: `This is message #${i}`,
        source: 'nodejs-producer'
      };

      await sendMessage(message);

      // Wait for 1 second between messages
      await new Promise(resolve => setTimeout(resolve, 1000));
    }
  } catch (error) {
    console.error(`Producer error: ${error.message}`);
  } finally {
    // Disconnect the producer
    await producer.disconnect();
    console.log('Producer disconnected');
  }
};

// Run the producer
main().catch(console.error);

Understanding the Code

Let's break down what this code does:

1. Importing the Kafka Client

import { Kafka } from 'kafkajs';

We're using ES6 import syntax to import the Kafka client from the kafkajs package.

2. Configuring Kafka

const kafka = new Kafka({
  clientId: 'my-producer',
  brokers: ['localhost:9092']
});

We create a Kafka client with:

  • clientId: A name for our application

  • brokers: The addresses of the Kafka servers to connect to

3. Creating a Producer

const producer = kafka.producer();

This creates a producer instance that we'll use to send messages.

4. Sending Messages

const sendMessage = async (message) => {
  try {
    // Add a timestamp to the message
    message.timestamp = new Date().toISOString();

    // Send the message to the topic
    await producer.send({
      topic: TOPIC_NAME,
      messages: [
        { value: JSON.stringify(message) }
      ],
    });

    console.log(`Message sent: ${JSON.stringify(message)}`);
  } catch (error) {
    console.error(`Error sending message: ${error.message}`);
  }
};

This function:

  1. Adds a timestamp to the message

  2. Sends the message to our topic

  3. Logs the result or any errors

Note that we're converting our message to a JSON string using JSON.stringify(). Kafka messages are sent as binary data, so we need to serialize our JavaScript objects.

5. Main Function

const main = async () => {
  try {
    // Connect to the Kafka broker
    await producer.connect();
    console.log('Producer connected to Kafka');

    // Send 5 messages
    for (let i = 1; i <= 5; i++) {
      const message = {
        id: i,
        text: `This is message #${i}`,
        source: 'nodejs-producer'
      };

      await sendMessage(message);

      // Wait for 1 second between messages
      await new Promise(resolve => setTimeout(resolve, 1000));
    }
  } catch (error) {
    console.error(`Producer error: ${error.message}`);
  } finally {
    // Disconnect the producer
    await producer.disconnect();
    console.log('Producer disconnected');
  }
};

This function:

  1. Connects to Kafka

  2. Sends 5 messages, waiting 1 second between each

  3. Disconnects from Kafka when done

Running the Producer

Make sure Kafka is running (from the setup in section 1), then run:

cd 03-producer
npm start

You should see output like:

Producer connected to Kafka
Message sent: {"id":1,"text":"This is message #1","source":"nodejs-producer","timestamp":"2023-05-10T12:34:56.789Z"}
Message sent: {"id":2,"text":"This is message #2","source":"nodejs-producer","timestamp":"2023-05-10T12:34:57.789Z"}
Message sent: {"id":3,"text":"This is message #3","source":"nodejs-producer","timestamp":"2023-05-10T12:34:58.789Z"}
Message sent: {"id":4,"text":"This is message #4","source":"nodejs-producer","timestamp":"2023-05-10T12:34:59.789Z"}
Message sent: {"id":5,"text":"This is message #5","source":"nodejs-producer","timestamp":"2023-05-10T12:35:00.789Z"}
Producer disconnected

Advanced Producer Options

Our example is simple, but KafkaJS producers have many options for advanced use cases:

Message Keys

You can add a key to your messages:

await producer.send({
  topic: TOPIC_NAME,
  messages: [
    { 
      key: 'user-123',  // Add a key
      value: JSON.stringify(message) 
    }
  ],
});

Messages with the same key always go to the same partition. This is useful for ensuring order for related messages.

Batching Messages

You can send multiple messages at once:

await producer.send({
  topic: TOPIC_NAME,
  messages: [
    { value: JSON.stringify(message1) },
    { value: JSON.stringify(message2) },
    { value: JSON.stringify(message3) }
  ],
});

Compression

You can compress messages to save bandwidth:

const producer = kafka.producer({
  allowAutoTopicCreation: true,
  transactionTimeout: 30000,
  compression: 'gzip'  // Use gzip compression
});

Acknowledgment Levels

You can control when the producer considers a message "sent":

const producer = kafka.producer({
  acks: 1  // 0 = no ack, 1 = leader ack, -1 = all acks
});

Next Steps

Now that we've created a producer, let's create a consumer to read the messages we've sent.

Next: Your First Consumer

24 views