Inventory adjusted (v1.0.1)
Indicates a change in inventory level
Overview
The Inventory Adjusted event is triggered whenever there is a change in the inventory levels of a product. This could occur due to various reasons such as receiving new stock, sales, returns, or manual adjustments by the inventory management team. The event ensures that all parts of the system that rely on inventory data are kept up-to-date with the latest inventory levels.
Architecture diagram
Payload example
Event example you my see being published.
{ "Name": "John Doe", "Age": 30, "Department": "Engineering", "Position": "Software Engineer", "Salary": 85000.50, "JoinDate": "2024-01-15"}Schema (avro)
{ "type" : "record", "namespace" : "Tutorialspoint", "name" : "Employee", "fields" : [ { "name" : "Name", "type" : "string" }, { "name" : "Age", "type" : "int" }, { "name" : "Department", "type" : "string" }, { "name" : "Position", "type" : "string" }, { "name" : "Salary", "type" : "double" }, { "name" : "JoinDate", "type" : "string", "logicalType": "date" } ]}Producing the Event
Select the language you want to produce the event in to see an example.
from kafka import KafkaProducerimport jsonfrom datetime import datetime
# Kafka configurationproducer = KafkaProducer( bootstrap_servers=['localhost:9092'], value_serializer=lambda v: json.dumps(v).encode('utf-8'))
# Event dataevent_data = { "event_id": "abc123", "timestamp": datetime.utcnow().isoformat() + 'Z', "product_id": "prod987", "adjusted_quantity": 10, "new_quantity": 150, "adjustment_reason": "restock", "adjusted_by": "user123"}
# Send event to Kafka topicproducer.send('inventory.adjusted', event_data)producer.flush()import { Kafka } from 'kafkajs';
// Kafka configurationconst kafka = new Kafka({ clientId: 'inventory-producer', brokers: ['localhost:9092']});
const producer = kafka.producer();
// Event dataconst eventData = { event_id: "abc123", timestamp: new Date().toISOString(), product_id: "prod987", adjusted_quantity: 10, new_quantity: 150, adjustment_reason: "restock", adjusted_by: "user123"};
// Send event to Kafka topicasync function produceEvent() { await producer.connect(); await producer.send({ topic: 'inventory.adjusted', messages: [ { value: JSON.stringify(eventData) } ], }); await producer.disconnect();}
produceEvent().catch(console.error);import org.apache.kafka.clients.producer.*;import org.apache.kafka.common.serialization.StringSerializer;import com.fasterxml.jackson.databind.ObjectMapper;import java.util.Properties;import java.util.HashMap;import java.util.Map;import java.time.Instant;
public class InventoryProducer { public static void main(String[] args) { // Kafka configuration Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.CLIENT_ID_CONFIG, "inventory-producer"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
Producer<String, String> producer = new KafkaProducer<>(props); ObjectMapper mapper = new ObjectMapper();
try { // Event data Map<String, Object> eventData = new HashMap<>(); eventData.put("event_id", "abc123"); eventData.put("timestamp", Instant.now().toString()); eventData.put("product_id", "prod987"); eventData.put("adjusted_quantity", 10); eventData.put("new_quantity", 150); eventData.put("adjustment_reason", "restock"); eventData.put("adjusted_by", "user123");
// Create producer record ProducerRecord<String, String> record = new ProducerRecord<>( "inventory.adjusted", mapper.writeValueAsString(eventData) );
// Send event to Kafka topic producer.send(record, (metadata, exception) -> { if (exception != null) { System.err.println("Error producing message: " + exception); } });
} catch (Exception e) { e.printStackTrace(); } finally { producer.flush(); producer.close(); } }}Consuming the Event
To consume an Inventory Adjusted event, use the following example Kafka consumer configuration in Python:
from kafka import KafkaConsumerimport json
# Kafka configurationconsumer = KafkaConsumer( 'inventory.adjusted', bootstrap_servers=['localhost:9092'], auto_offset_reset='earliest', enable_auto_commit=True, group_id='inventory_group', value_serializer=lambda v: json.dumps(v).encode('utf-8'))
# Consume eventsfor message in consumer: event_data = json.loads(message.value) print(f"Received Inventory Adjusted event: {event_data}") Event-driven architecture documentation: ZALORA-test