Skip to main content

Command Palette

Search for a command to run...

Learning Kafka - Real World Example

Updated
10 min read
Y

Hello! I'm Yuvraj. I'm a Computer Science Student. I love to learn, create, and explore new things. I am currently doing a Bachelor of Computer Science from the University of Delhi.

Now that we understand the basics of Kafka producers and consumers, let's build a more realistic example that shows how Kafka can be used in a real-world scenario.

We'll create a simple order processing system with these components:

  1. Order Service: Creates new orders and sends them to Kafka

  2. Payment Service: Processes payments for orders

  3. Shipping Service: Handles shipping for paid orders

  4. Notification Service: Sends notifications about order status

This is a common pattern in microservices architectures, where different services communicate through Kafka.

Project Structure

Our project will have the following structure:

05-real-world/
├── order-service/
│   ├── package.json
│   └── index.js
├── payment-service/
│   ├── package.json
│   └── index.js
├── shipping-service/
│   ├── package.json
│   └── index.js
├── notification-service/
│   ├── package.json
│   └── index.js
└── package.json

Setting Up the Project

First, let's create a main package.json file:

{
  "name": "kafka-order-processing",
  "version": "1.0.0",
  "description": "A real-world example of Kafka for order processing",
  "scripts": {
    "order": "cd order-service && npm start",
    "payment": "cd payment-service && npm start",
    "shipping": "cd shipping-service && npm start",
    "notification": "cd notification-service && npm start",
    "install-all": "npm install && cd order-service && npm install && cd ../payment-service && npm install && cd ../shipping-service && npm install && cd ../notification-service && npm install"
  },
  "dependencies": {
    "kafkajs": "^2.2.4",
    "dotenv": "^16.0.3"
  }
}

Creating the Services

1. Order Service

The Order Service creates new orders and sends them to Kafka.

package.json

{
  "name": "order-service",
  "version": "1.0.0",
  "description": "Service for creating orders",
  "main": "index.js",
  "type": "module",
  "scripts": {
    "start": "node index.js"
  },
  "dependencies": {
    "kafkajs": "^2.2.4",
    "dotenv": "^16.0.3",
    "express": "^4.18.2",
    "body-parser": "^1.20.2",
    "uuid": "^9.0.0"
  }
}

index.js

// order-service/index.js
import express from 'express';
import bodyParser from 'body-parser';
import { v4 as uuidv4 } from 'uuid';
import { Kafka } from 'kafkajs';

// Create Express app
const app = express();
app.use(bodyParser.json());

// Define Kafka configuration
const kafka = new Kafka({
  clientId: 'order-service',
  brokers: ['localhost:9092']
});

// Create a producer
const producer = kafka.producer();

// Define the topic for new orders
const NEW_ORDER_TOPIC = 'new-orders';

// Connect to Kafka
const connectToKafka = async () => {
  try {
    await producer.connect();
    console.log('Order service connected to Kafka');
  } catch (error) {
    console.error('Failed to connect to Kafka:', error);
    process.exit(1);
  }
};

// Send an order to Kafka
const sendOrderToKafka = async (order) => {
  try {
    await producer.send({
      topic: NEW_ORDER_TOPIC,
      messages: [
        { 
          key: order.id, 
          value: JSON.stringify(order),
          headers: {
            'event-type': 'order-created',
            'timestamp': Date.now().toString()
          }
        }
      ],
    });
    console.log(`Order sent to Kafka: ${order.id}`);
    return true;
  } catch (error) {
    console.error(`Error sending order to Kafka: ${error.message}`);
    return false;
  }
};

// API endpoint to create a new order
app.post('/orders', async (req, res) => {
  const { customerId, items, totalAmount } = req.body;

  // Validate request
  if (!customerId || !items || !totalAmount) {
    return res.status(400).json({ error: 'Missing required fields' });
  }

  // Create a new order
  const order = {
    id: uuidv4(),
    customerId,
    items,
    totalAmount,
    status: 'created',
    createdAt: new Date().toISOString()
  };

  // Send the order to Kafka
  const success = await sendOrderToKafka(order);

  if (success) {
    res.status(201).json({
      message: 'Order created successfully',
      orderId: order.id
    });
  } else {
    res.status(500).json({
      error: 'Failed to create order'
    });
  }
});

// Start the server
const PORT = 3000;
app.listen(PORT, async () => {
  await connectToKafka();
  console.log(`Order service running on port ${PORT}`);
});

// Handle graceful shutdown
const shutdown = async () => {
  try {
    await producer.disconnect();
    console.log('Order service disconnected from Kafka');
    process.exit(0);
  } catch (error) {
    console.error(`Error during shutdown: ${error.message}`);
    process.exit(1);
  }
};

process.on('SIGINT', shutdown);
process.on('SIGTERM', shutdown);

2. Payment Service

The Payment Service processes payments for orders.

package.json

{
  "name": "payment-service",
  "version": "1.0.0",
  "description": "Service for processing payments",
  "main": "index.js",
  "type": "module",
  "scripts": {
    "start": "node index.js"
  },
  "dependencies": {
    "kafkajs": "^2.2.4",
    "dotenv": "^16.0.3"
  }
}

index.js

// payment-service/index.js
import { Kafka } from 'kafkajs';

// Define Kafka configuration
const kafka = new Kafka({
  clientId: 'payment-service',
  brokers: ['localhost:9092']
});

// Create consumer and producer
const consumer = kafka.consumer({ groupId: 'payment-service-group' });
const producer = kafka.producer();

// Define topics
const NEW_ORDER_TOPIC = 'new-orders';
const PAYMENT_TOPIC = 'payments';

// Connect to Kafka
const connectToKafka = async () => {
  try {
    await consumer.connect();
    await producer.connect();
    console.log('Payment service connected to Kafka');

    // Subscribe to new orders
    await consumer.subscribe({ topic: NEW_ORDER_TOPIC, fromBeginning: true });
    console.log('Subscribed to new orders topic');

    // Start consuming messages
    await consumer.run({
      eachMessage: async ({ topic, partition, message }) => {
        await processOrder(message);
      },
    });
  } catch (error) {
    console.error('Failed to connect to Kafka:', error);
    process.exit(1);
  }
};

// Process an order
const processOrder = async (message) => {
  try {
    // Parse the order
    const order = JSON.parse(message.value.toString());
    console.log(`Processing payment for order: ${order.id}`);

    // Simulate payment processing
    const paymentSuccessful = Math.random() > 0.2; // 80% success rate

    // Create payment result
    const paymentResult = {
      orderId: order.id,
      status: paymentSuccessful ? 'paid' : 'failed',
      amount: order.totalAmount,
      processedAt: new Date().toISOString()
    };

    // Send payment result to Kafka
    await producer.send({
      topic: PAYMENT_TOPIC,
      messages: [
        { 
          key: order.id, 
          value: JSON.stringify(paymentResult),
          headers: {
            'event-type': paymentSuccessful ? 'payment-successful' : 'payment-failed',
            'timestamp': Date.now().toString()
          }
        }
      ],
    });

    console.log(`Payment ${paymentSuccessful ? 'successful' : 'failed'} for order: ${order.id}`);
  } catch (error) {
    console.error(`Error processing payment: ${error.message}`);
  }
};

// Start the service
const start = async () => {
  await connectToKafka();
  console.log('Payment service is running');
};

// Handle graceful shutdown
const shutdown = async () => {
  try {
    await consumer.disconnect();
    await producer.disconnect();
    console.log('Payment service disconnected from Kafka');
    process.exit(0);
  } catch (error) {
    console.error(`Error during shutdown: ${error.message}`);
    process.exit(1);
  }
};

process.on('SIGINT', shutdown);
process.on('SIGTERM', shutdown);

// Start the service
start().catch(console.error);

3. Shipping Service

The Shipping Service handles shipping for paid orders.

package.json

{
  "name": "shipping-service",
  "version": "1.0.0",
  "description": "Service for handling shipping",
  "main": "index.js",
  "type": "module",
  "scripts": {
    "start": "node index.js"
  },
  "dependencies": {
    "kafkajs": "^2.2.4",
    "dotenv": "^16.0.3"
  }
}

index.js

// shipping-service/index.js
import { Kafka } from 'kafkajs';

// Define Kafka configuration
const kafka = new Kafka({
  clientId: 'shipping-service',
  brokers: ['localhost:9092']
});

// Create consumer and producer
const consumer = kafka.consumer({ groupId: 'shipping-service-group' });
const producer = kafka.producer();

// Define topics
const PAYMENT_TOPIC = 'payments';
const SHIPPING_TOPIC = 'shipping';

// Connect to Kafka
const connectToKafka = async () => {
  try {
    await consumer.connect();
    await producer.connect();
    console.log('Shipping service connected to Kafka');

    // Subscribe to payments
    await consumer.subscribe({ topic: PAYMENT_TOPIC, fromBeginning: true });
    console.log('Subscribed to payments topic');

    // Start consuming messages
    await consumer.run({
      eachMessage: async ({ topic, partition, message }) => {
        await processPayment(message);
      },
    });
  } catch (error) {
    console.error('Failed to connect to Kafka:', error);
    process.exit(1);
  }
};

// Process a payment
const processPayment = async (message) => {
  try {
    // Parse the payment
    const payment = JSON.parse(message.value.toString());

    // Only process successful payments
    if (payment.status !== 'paid') {
      console.log(`Skipping shipping for failed payment: ${payment.orderId}`);
      return;
    }

    console.log(`Processing shipping for order: ${payment.orderId}`);

    // Simulate shipping processing
    const shippingSuccessful = Math.random() > 0.1; // 90% success rate

    // Create shipping result
    const shippingResult = {
      orderId: payment.orderId,
      status: shippingSuccessful ? 'shipped' : 'shipping-failed',
      trackingNumber: shippingSuccessful ? `TRK-${Math.floor(Math.random() * 1000000)}` : null,
      processedAt: new Date().toISOString()
    };

    // Send shipping result to Kafka
    await producer.send({
      topic: SHIPPING_TOPIC,
      messages: [
        { 
          key: payment.orderId, 
          value: JSON.stringify(shippingResult),
          headers: {
            'event-type': shippingSuccessful ? 'order-shipped' : 'shipping-failed',
            'timestamp': Date.now().toString()
          }
        }
      ],
    });

    console.log(`Shipping ${shippingSuccessful ? 'successful' : 'failed'} for order: ${payment.orderId}`);
  } catch (error) {
    console.error(`Error processing shipping: ${error.message}`);
  }
};

// Start the service
const start = async () => {
  await connectToKafka();
  console.log('Shipping service is running');
};

// Handle graceful shutdown
const shutdown = async () => {
  try {
    await consumer.disconnect();
    await producer.disconnect();
    console.log('Shipping service disconnected from Kafka');
    process.exit(0);
  } catch (error) {
    console.error(`Error during shutdown: ${error.message}`);
    process.exit(1);
  }
};

process.on('SIGINT', shutdown);
process.on('SIGTERM', shutdown);

// Start the service
start().catch(console.error);

4. Notification Service

The Notification Service sends notifications about order status.

package.json

{
  "name": "notification-service",
  "version": "1.0.0",
  "description": "Service for sending notifications",
  "main": "index.js",
  "type": "module",
  "scripts": {
    "start": "node index.js"
  },
  "dependencies": {
    "kafkajs": "^2.2.4",
    "dotenv": "^16.0.3"
  }
}

index.js

// notification-service/index.js
import { Kafka } from 'kafkajs';

// Define Kafka configuration
const kafka = new Kafka({
  clientId: 'notification-service',
  brokers: ['localhost:9092']
});

// Create consumer
const consumer = kafka.consumer({ groupId: 'notification-service-group' });

// Define topics to listen to
const TOPICS = ['new-orders', 'payments', 'shipping'];

// Connect to Kafka
const connectToKafka = async () => {
  try {
    await consumer.connect();
    console.log('Notification service connected to Kafka');

    // Subscribe to all relevant topics
    for (const topic of TOPICS) {
      await consumer.subscribe({ topic, fromBeginning: true });
      console.log(`Subscribed to ${topic} topic`);
    }

    // Start consuming messages
    await consumer.run({
      eachMessage: async ({ topic, partition, message }) => {
        await processMessage(topic, message);
      },
    });
  } catch (error) {
    console.error('Failed to connect to Kafka:', error);
    process.exit(1);
  }
};

// Process a message
const processMessage = async (topic, message) => {
  try {
    // Parse the message
    const data = JSON.parse(message.value.toString());
    const eventType = message.headers['event-type']?.toString();

    // Determine notification type based on topic and event type
    let notificationType;
    let notificationMessage;

    switch (topic) {
      case 'new-orders':
        notificationType = 'ORDER_CREATED';
        notificationMessage = `New order created with ID: ${data.id}`;
        break;
      case 'payments':
        if (data.status === 'paid') {
          notificationType = 'PAYMENT_SUCCESSFUL';
          notificationMessage = `Payment successful for order: ${data.orderId}`;
        } else {
          notificationType = 'PAYMENT_FAILED';
          notificationMessage = `Payment failed for order: ${data.orderId}`;
        }
        break;
      case 'shipping':
        if (data.status === 'shipped') {
          notificationType = 'ORDER_SHIPPED';
          notificationMessage = `Order shipped: ${data.orderId}. Tracking number: ${data.trackingNumber}`;
        } else {
          notificationType = 'SHIPPING_FAILED';
          notificationMessage = `Shipping failed for order: ${data.orderId}`;
        }
        break;
      default:
        notificationType = 'UNKNOWN';
        notificationMessage = `Unknown event for order: ${data.id || data.orderId}`;
    }

    // Send notification (in a real system, this would send an email, SMS, push notification, etc.)
    console.log('NOTIFICATION SENT:');
    console.log(`  Type: ${notificationType}`);
    console.log(`  Message: ${notificationMessage}`);
    console.log(`  Timestamp: ${new Date().toISOString()}`);
    console.log('-'.repeat(50));
  } catch (error) {
    console.error(`Error processing notification: ${error.message}`);
  }
};

// Start the service
const start = async () => {
  await connectToKafka();
  console.log('Notification service is running');
};

// Handle graceful shutdown
const shutdown = async () => {
  try {
    await consumer.disconnect();
    console.log('Notification service disconnected from Kafka');
    process.exit(0);
  } catch (error) {
    console.error(`Error during shutdown: ${error.message}`);
    process.exit(1);
  }
};

process.on('SIGINT', shutdown);
process.on('SIGTERM', shutdown);

// Start the service
start().catch(console.error);

Running the Example

  1. First, make sure Kafka is running (from the setup in section 1)

  2. Create the required topics:

docker compose -f ../01-setup/docker-compose.yml exec kafka kafka-topics --create --topic new-orders --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1
dockercompose -f ../01-setup/docker-compose.yml exec kafka kafka-topics --create --topic payments --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1
docker-compose -f ../01-setup/docker-compose.yml exec kafka kafka-topics --create --topic shipping --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1
  1. Install dependencies for all services:
cd 05-real-world
npm run install-all
  1. Start each service in a separate terminal:
# Terminal 1
npm run notification

# Terminal 2
npm run payment

# Terminal 3
npm run shipping

# Terminal 4
npm run order
  1. Create a new order using curl or any API client:
curl -X POST http://localhost:3000/orders \
  -H "Content-Type: application/json" \
  -d '{
    "customerId": "customer-123",
    "items": [
      {"id": "item-1", "name": "Product 1", "quantity": 2, "price": 10.99},
      {"id": "item-2", "name": "Product 2", "quantity": 1, "price": 24.99}
    ],
    "totalAmount": 46.97
  }'
  1. Watch the flow of messages through the system:
  • The Order Service creates a new order and sends it to the new-orders topic

  • The Payment Service consumes from new-orders, processes the payment, and sends the result to the payments topic

  • The Shipping Service consumes from payments, processes shipping for paid orders, and sends the result to the shipping topic

  • The Notification Service consumes from all topics and sends notifications for each event

Understanding the Architecture

This example demonstrates a common pattern in event-driven microservices:

  1. Event Sourcing: Each service publishes events about what has happened

  2. Command-Query Responsibility Segregation (CQRS): Services are separated by their responsibilities

  3. Choreography: Services react to events rather than being directly commanded

The benefits of this architecture include:

  • Loose Coupling: Services don't need to know about each other directly

  • Scalability: Each service can scale independently

  • Resilience: If one service fails, others can continue working

  • Auditability: All events are recorded in Kafka, providing a complete history

Next Steps

Congratulations! You've now learned the basics of Kafka and seen how it can be used in a real-world scenario. Here are some ideas for further exploration:

  1. Add Error Handling: Implement retry logic and dead-letter queues

  2. Add Monitoring: Use Kafka's metrics to monitor performance

  3. Implement Exactly-Once Semantics: Use transactions to ensure messages are processed exactly once

  4. Explore Kafka Streams: For more complex stream processing

  5. Implement Schema Registry: To manage and evolve message schemas

Happy Kafka-ing!

35 views