Learning Kafka - Real World Example
Hello! I'm Yuvraj. I'm a Computer Science Student. I love to learn, create, and explore new things. I am currently doing a Bachelor of Computer Science from the University of Delhi.
Now that we understand the basics of Kafka producers and consumers, let's build a more realistic example that shows how Kafka can be used in a real-world scenario.
We'll create a simple order processing system with these components:
Order Service: Creates new orders and sends them to Kafka
Payment Service: Processes payments for orders
Shipping Service: Handles shipping for paid orders
Notification Service: Sends notifications about order status
This is a common pattern in microservices architectures, where different services communicate through Kafka.
Project Structure
Our project will have the following structure:
05-real-world/
├── order-service/
│ ├── package.json
│ └── index.js
├── payment-service/
│ ├── package.json
│ └── index.js
├── shipping-service/
│ ├── package.json
│ └── index.js
├── notification-service/
│ ├── package.json
│ └── index.js
└── package.json
Setting Up the Project
First, let's create a main package.json file:
{
"name": "kafka-order-processing",
"version": "1.0.0",
"description": "A real-world example of Kafka for order processing",
"scripts": {
"order": "cd order-service && npm start",
"payment": "cd payment-service && npm start",
"shipping": "cd shipping-service && npm start",
"notification": "cd notification-service && npm start",
"install-all": "npm install && cd order-service && npm install && cd ../payment-service && npm install && cd ../shipping-service && npm install && cd ../notification-service && npm install"
},
"dependencies": {
"kafkajs": "^2.2.4",
"dotenv": "^16.0.3"
}
}
Creating the Services
1. Order Service
The Order Service creates new orders and sends them to Kafka.
package.json
{
"name": "order-service",
"version": "1.0.0",
"description": "Service for creating orders",
"main": "index.js",
"type": "module",
"scripts": {
"start": "node index.js"
},
"dependencies": {
"kafkajs": "^2.2.4",
"dotenv": "^16.0.3",
"express": "^4.18.2",
"body-parser": "^1.20.2",
"uuid": "^9.0.0"
}
}
index.js
// order-service/index.js
import express from 'express';
import bodyParser from 'body-parser';
import { v4 as uuidv4 } from 'uuid';
import { Kafka } from 'kafkajs';
// Create Express app
const app = express();
app.use(bodyParser.json());
// Define Kafka configuration
const kafka = new Kafka({
clientId: 'order-service',
brokers: ['localhost:9092']
});
// Create a producer
const producer = kafka.producer();
// Define the topic for new orders
const NEW_ORDER_TOPIC = 'new-orders';
// Connect to Kafka
const connectToKafka = async () => {
try {
await producer.connect();
console.log('Order service connected to Kafka');
} catch (error) {
console.error('Failed to connect to Kafka:', error);
process.exit(1);
}
};
// Send an order to Kafka
const sendOrderToKafka = async (order) => {
try {
await producer.send({
topic: NEW_ORDER_TOPIC,
messages: [
{
key: order.id,
value: JSON.stringify(order),
headers: {
'event-type': 'order-created',
'timestamp': Date.now().toString()
}
}
],
});
console.log(`Order sent to Kafka: ${order.id}`);
return true;
} catch (error) {
console.error(`Error sending order to Kafka: ${error.message}`);
return false;
}
};
// API endpoint to create a new order
app.post('/orders', async (req, res) => {
const { customerId, items, totalAmount } = req.body;
// Validate request
if (!customerId || !items || !totalAmount) {
return res.status(400).json({ error: 'Missing required fields' });
}
// Create a new order
const order = {
id: uuidv4(),
customerId,
items,
totalAmount,
status: 'created',
createdAt: new Date().toISOString()
};
// Send the order to Kafka
const success = await sendOrderToKafka(order);
if (success) {
res.status(201).json({
message: 'Order created successfully',
orderId: order.id
});
} else {
res.status(500).json({
error: 'Failed to create order'
});
}
});
// Start the server
const PORT = 3000;
app.listen(PORT, async () => {
await connectToKafka();
console.log(`Order service running on port ${PORT}`);
});
// Handle graceful shutdown
const shutdown = async () => {
try {
await producer.disconnect();
console.log('Order service disconnected from Kafka');
process.exit(0);
} catch (error) {
console.error(`Error during shutdown: ${error.message}`);
process.exit(1);
}
};
process.on('SIGINT', shutdown);
process.on('SIGTERM', shutdown);
2. Payment Service
The Payment Service processes payments for orders.
package.json
{
"name": "payment-service",
"version": "1.0.0",
"description": "Service for processing payments",
"main": "index.js",
"type": "module",
"scripts": {
"start": "node index.js"
},
"dependencies": {
"kafkajs": "^2.2.4",
"dotenv": "^16.0.3"
}
}
index.js
// payment-service/index.js
import { Kafka } from 'kafkajs';
// Define Kafka configuration
const kafka = new Kafka({
clientId: 'payment-service',
brokers: ['localhost:9092']
});
// Create consumer and producer
const consumer = kafka.consumer({ groupId: 'payment-service-group' });
const producer = kafka.producer();
// Define topics
const NEW_ORDER_TOPIC = 'new-orders';
const PAYMENT_TOPIC = 'payments';
// Connect to Kafka
const connectToKafka = async () => {
try {
await consumer.connect();
await producer.connect();
console.log('Payment service connected to Kafka');
// Subscribe to new orders
await consumer.subscribe({ topic: NEW_ORDER_TOPIC, fromBeginning: true });
console.log('Subscribed to new orders topic');
// Start consuming messages
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
await processOrder(message);
},
});
} catch (error) {
console.error('Failed to connect to Kafka:', error);
process.exit(1);
}
};
// Process an order
const processOrder = async (message) => {
try {
// Parse the order
const order = JSON.parse(message.value.toString());
console.log(`Processing payment for order: ${order.id}`);
// Simulate payment processing
const paymentSuccessful = Math.random() > 0.2; // 80% success rate
// Create payment result
const paymentResult = {
orderId: order.id,
status: paymentSuccessful ? 'paid' : 'failed',
amount: order.totalAmount,
processedAt: new Date().toISOString()
};
// Send payment result to Kafka
await producer.send({
topic: PAYMENT_TOPIC,
messages: [
{
key: order.id,
value: JSON.stringify(paymentResult),
headers: {
'event-type': paymentSuccessful ? 'payment-successful' : 'payment-failed',
'timestamp': Date.now().toString()
}
}
],
});
console.log(`Payment ${paymentSuccessful ? 'successful' : 'failed'} for order: ${order.id}`);
} catch (error) {
console.error(`Error processing payment: ${error.message}`);
}
};
// Start the service
const start = async () => {
await connectToKafka();
console.log('Payment service is running');
};
// Handle graceful shutdown
const shutdown = async () => {
try {
await consumer.disconnect();
await producer.disconnect();
console.log('Payment service disconnected from Kafka');
process.exit(0);
} catch (error) {
console.error(`Error during shutdown: ${error.message}`);
process.exit(1);
}
};
process.on('SIGINT', shutdown);
process.on('SIGTERM', shutdown);
// Start the service
start().catch(console.error);
3. Shipping Service
The Shipping Service handles shipping for paid orders.
package.json
{
"name": "shipping-service",
"version": "1.0.0",
"description": "Service for handling shipping",
"main": "index.js",
"type": "module",
"scripts": {
"start": "node index.js"
},
"dependencies": {
"kafkajs": "^2.2.4",
"dotenv": "^16.0.3"
}
}
index.js
// shipping-service/index.js
import { Kafka } from 'kafkajs';
// Define Kafka configuration
const kafka = new Kafka({
clientId: 'shipping-service',
brokers: ['localhost:9092']
});
// Create consumer and producer
const consumer = kafka.consumer({ groupId: 'shipping-service-group' });
const producer = kafka.producer();
// Define topics
const PAYMENT_TOPIC = 'payments';
const SHIPPING_TOPIC = 'shipping';
// Connect to Kafka
const connectToKafka = async () => {
try {
await consumer.connect();
await producer.connect();
console.log('Shipping service connected to Kafka');
// Subscribe to payments
await consumer.subscribe({ topic: PAYMENT_TOPIC, fromBeginning: true });
console.log('Subscribed to payments topic');
// Start consuming messages
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
await processPayment(message);
},
});
} catch (error) {
console.error('Failed to connect to Kafka:', error);
process.exit(1);
}
};
// Process a payment
const processPayment = async (message) => {
try {
// Parse the payment
const payment = JSON.parse(message.value.toString());
// Only process successful payments
if (payment.status !== 'paid') {
console.log(`Skipping shipping for failed payment: ${payment.orderId}`);
return;
}
console.log(`Processing shipping for order: ${payment.orderId}`);
// Simulate shipping processing
const shippingSuccessful = Math.random() > 0.1; // 90% success rate
// Create shipping result
const shippingResult = {
orderId: payment.orderId,
status: shippingSuccessful ? 'shipped' : 'shipping-failed',
trackingNumber: shippingSuccessful ? `TRK-${Math.floor(Math.random() * 1000000)}` : null,
processedAt: new Date().toISOString()
};
// Send shipping result to Kafka
await producer.send({
topic: SHIPPING_TOPIC,
messages: [
{
key: payment.orderId,
value: JSON.stringify(shippingResult),
headers: {
'event-type': shippingSuccessful ? 'order-shipped' : 'shipping-failed',
'timestamp': Date.now().toString()
}
}
],
});
console.log(`Shipping ${shippingSuccessful ? 'successful' : 'failed'} for order: ${payment.orderId}`);
} catch (error) {
console.error(`Error processing shipping: ${error.message}`);
}
};
// Start the service
const start = async () => {
await connectToKafka();
console.log('Shipping service is running');
};
// Handle graceful shutdown
const shutdown = async () => {
try {
await consumer.disconnect();
await producer.disconnect();
console.log('Shipping service disconnected from Kafka');
process.exit(0);
} catch (error) {
console.error(`Error during shutdown: ${error.message}`);
process.exit(1);
}
};
process.on('SIGINT', shutdown);
process.on('SIGTERM', shutdown);
// Start the service
start().catch(console.error);
4. Notification Service
The Notification Service sends notifications about order status.
package.json
{
"name": "notification-service",
"version": "1.0.0",
"description": "Service for sending notifications",
"main": "index.js",
"type": "module",
"scripts": {
"start": "node index.js"
},
"dependencies": {
"kafkajs": "^2.2.4",
"dotenv": "^16.0.3"
}
}
index.js
// notification-service/index.js
import { Kafka } from 'kafkajs';
// Define Kafka configuration
const kafka = new Kafka({
clientId: 'notification-service',
brokers: ['localhost:9092']
});
// Create consumer
const consumer = kafka.consumer({ groupId: 'notification-service-group' });
// Define topics to listen to
const TOPICS = ['new-orders', 'payments', 'shipping'];
// Connect to Kafka
const connectToKafka = async () => {
try {
await consumer.connect();
console.log('Notification service connected to Kafka');
// Subscribe to all relevant topics
for (const topic of TOPICS) {
await consumer.subscribe({ topic, fromBeginning: true });
console.log(`Subscribed to ${topic} topic`);
}
// Start consuming messages
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
await processMessage(topic, message);
},
});
} catch (error) {
console.error('Failed to connect to Kafka:', error);
process.exit(1);
}
};
// Process a message
const processMessage = async (topic, message) => {
try {
// Parse the message
const data = JSON.parse(message.value.toString());
const eventType = message.headers['event-type']?.toString();
// Determine notification type based on topic and event type
let notificationType;
let notificationMessage;
switch (topic) {
case 'new-orders':
notificationType = 'ORDER_CREATED';
notificationMessage = `New order created with ID: ${data.id}`;
break;
case 'payments':
if (data.status === 'paid') {
notificationType = 'PAYMENT_SUCCESSFUL';
notificationMessage = `Payment successful for order: ${data.orderId}`;
} else {
notificationType = 'PAYMENT_FAILED';
notificationMessage = `Payment failed for order: ${data.orderId}`;
}
break;
case 'shipping':
if (data.status === 'shipped') {
notificationType = 'ORDER_SHIPPED';
notificationMessage = `Order shipped: ${data.orderId}. Tracking number: ${data.trackingNumber}`;
} else {
notificationType = 'SHIPPING_FAILED';
notificationMessage = `Shipping failed for order: ${data.orderId}`;
}
break;
default:
notificationType = 'UNKNOWN';
notificationMessage = `Unknown event for order: ${data.id || data.orderId}`;
}
// Send notification (in a real system, this would send an email, SMS, push notification, etc.)
console.log('NOTIFICATION SENT:');
console.log(` Type: ${notificationType}`);
console.log(` Message: ${notificationMessage}`);
console.log(` Timestamp: ${new Date().toISOString()}`);
console.log('-'.repeat(50));
} catch (error) {
console.error(`Error processing notification: ${error.message}`);
}
};
// Start the service
const start = async () => {
await connectToKafka();
console.log('Notification service is running');
};
// Handle graceful shutdown
const shutdown = async () => {
try {
await consumer.disconnect();
console.log('Notification service disconnected from Kafka');
process.exit(0);
} catch (error) {
console.error(`Error during shutdown: ${error.message}`);
process.exit(1);
}
};
process.on('SIGINT', shutdown);
process.on('SIGTERM', shutdown);
// Start the service
start().catch(console.error);
Running the Example
First, make sure Kafka is running (from the setup in section 1)
Create the required topics:
docker compose -f ../01-setup/docker-compose.yml exec kafka kafka-topics --create --topic new-orders --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1
dockercompose -f ../01-setup/docker-compose.yml exec kafka kafka-topics --create --topic payments --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1
docker-compose -f ../01-setup/docker-compose.yml exec kafka kafka-topics --create --topic shipping --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1
- Install dependencies for all services:
cd 05-real-world
npm run install-all
- Start each service in a separate terminal:
# Terminal 1
npm run notification
# Terminal 2
npm run payment
# Terminal 3
npm run shipping
# Terminal 4
npm run order
- Create a new order using curl or any API client:
curl -X POST http://localhost:3000/orders \
-H "Content-Type: application/json" \
-d '{
"customerId": "customer-123",
"items": [
{"id": "item-1", "name": "Product 1", "quantity": 2, "price": 10.99},
{"id": "item-2", "name": "Product 2", "quantity": 1, "price": 24.99}
],
"totalAmount": 46.97
}'
- Watch the flow of messages through the system:
The Order Service creates a new order and sends it to the
new-orderstopicThe Payment Service consumes from
new-orders, processes the payment, and sends the result to thepaymentstopicThe Shipping Service consumes from
payments, processes shipping for paid orders, and sends the result to theshippingtopicThe Notification Service consumes from all topics and sends notifications for each event
Understanding the Architecture
This example demonstrates a common pattern in event-driven microservices:
Event Sourcing: Each service publishes events about what has happened
Command-Query Responsibility Segregation (CQRS): Services are separated by their responsibilities
Choreography: Services react to events rather than being directly commanded
The benefits of this architecture include:
Loose Coupling: Services don't need to know about each other directly
Scalability: Each service can scale independently
Resilience: If one service fails, others can continue working
Auditability: All events are recorded in Kafka, providing a complete history
Next Steps
Congratulations! You've now learned the basics of Kafka and seen how it can be used in a real-world scenario. Here are some ideas for further exploration:
Add Error Handling: Implement retry logic and dead-letter queues
Add Monitoring: Use Kafka's metrics to monitor performance
Implement Exactly-Once Semantics: Use transactions to ensure messages are processed exactly once
Explore Kafka Streams: For more complex stream processing
Implement Schema Registry: To manage and evolve message schemas
Happy Kafka-ing!