Eco System of Kafka
Kafka Architecture - ( Push Pull Model, Brokers , Topics, Partitions, Data Offsets )
Kafka Architecture benefits over RabbitMQ and ActiveMQ
Apache Kafka
|
ActiveMQ / RabbitMQ
|
Good for High TPS and near real-time systems
|
Good for low TPS background jobs
|
Reliable Messaging Life Cycle Available
send →re-try -> In-flight → ack
|
Not developed like Kafka
|
Can be re-processed old messages
|
Once consumed, messages cannot be re-processed
|
High Resiliency, Fault Tolerance, & High Availability with Zookeeper integration
|
Average
|
Consumer guarantee the message delivery – PULL based messaging
|
Publisher guarantee the message delivery – PUSH based messaging
|
Supports high throughput due to sequential disk I/Os.
|
Low performance
|
Guarantee the message order based on Partition offsets
|
No message ordering
|
Can work with slow consumers as well.
|
Slow consumers can cause memory issues.
|
Highly scalable with partition replications
|
Average
|
Setting-up Kafka Services Locally.
1. Download Open JDK 11, Kafka Framework kafka_2.13-2.8.0.
2. Extract Java and Kafka to your preferred place. In my case i have installed both in below location : /data/software
3. Open terminal and Start Kafka services
export JAVA_HOME="/data/software/jdk-11.0.2"
cd /data/software/kafka_2.13-2.8.0/bin
./zookeeper-server-start.sh ../config/zookeeper.properties &
./kafka-server-start.sh ../config/server.properties &
disown
disown
4. Create Topic in Kafka Service
export JAVA_HOME="/data/software/jdk-11.0.2"
cd /data/software/kafka_2.13-2.8.0/bin
./kafka-console-producer.sh --topic test_topic --bootstrap-server localhost:9092
5. It provides prompt to publish data like below
>
6. Publish some data and enter
> {"name":"John", "age":30, "car":null}
7. Consume the published data in a separate terminal
export JAVA_HOME="/data/software/jdk-11.0.2"
cd /data/software/kafka_2.13-2.8.0/bin
./kafka-console-consumer.sh -bootstrap-server localhost:9092 -topic test_topic
8. View data
> {"name":"John", "age":30, "car":null}
Kafka Consumer in Java
Config Class
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> prop = new HashMap<>();
prop.put("compression.type", "lz4"); // Compression type to increase delivery efficiency
prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // Offset will be read from last read index
prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // Kafka server IP:PORTS, Can add multiple IP:PORTS
prop.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "50"); // Batch Size / Rate limit / TPS
prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
return prop;
}
Use below properties in if you are working on Kerbros Authentication
System.setProperty("java.security.auth.login.config", path to jaas config file);
System.setProperty("java.security.krb5.conf", path to krb config file);
prop.put(CommonClientConfigs.SESSION_TIMEOUT_MS_CONFIG, sessionTimeOut);
prop.put(CommonClientConfigs.HEARTBEAT_INTERVAL_MS_CONFIG, heartBeat);
prop.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
prop.put("sasl.kerberos.service.name", "kafka");
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
factory.setConcurrency(concurency); // Number of concurrent listeners
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
return factory;
}
Sample KRB file ( Will be given by Kafka Admin )
[libdefaults]
default_realm = MYAPP.MYAPP.COM
dns_lookup_kdc = true
dns_lookup_realm = false
ticket_lifetime = 86400
renew_lifetime = 604800
forwardable = true
default_tgs_enctypes = rc4-hmac des3-hmac-sha1 arcfour-hmac des-hmac-sha1 des-cbc-md5 des-cbc-crc
default_tkt_enctypes = rc4-hmac des3-hmac-sha1 arcfour-hmac des-hmac-sha1 des-cbc-md5 des-cbc-crc
permitted_enctypes = rc4-hmac des3-hmac-sha1 arcfour-hmac des-hmac-sha1 des-cbc-md5 des-cbc-crc
udp_preference_limit = 1
kdc_timeout = 9000
ignore_acceptor_hostname = true
[realms]
MYAPP.MYAPP.COM = {
kdc = hofdmc04.MYAPP.MYAPP.com
admin_server = hofdmc04.MYAPP.MYAPP.com
#kdc = MYAPP.MYAPP.com
#admin_server = MYAPP.MYAPP.com
}
Sample jaas config file( Will be given by Kafka Admin ) - For IBM Machines
jaas.conf
KafkaClient {
com.ibm.security.auth.module.Krb5LoginModule required
credsType=both
useKeytab="path to keytab file"
refreshKrb5Config=true
principal="myapp@abc.com";
};
Sample jaas config file ( Will be given by Kafka Admin ) - For Linux Machines
jaas.conf ( Add both Client and KafkaClient )
Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="myapp.keytab"
principal="myapp@abc.com";
};
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="myapp.keytab"
principal="myapp@abc.com";
};
Consumer Class for Multiple Topics
@KafkaListener(topics = "topicA", containerFactory = "kafkaListenerContainerFactory", groupId = "groupA", autoStartup="true")
public void consumerTopicA(@Payload List<String> messages, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions, @Header(KafkaHeaders.OFFSET) List<Long> offsets, Acknowledgment acknowledgment) {
// Consumer for TopicA
// Step 1. Save consumed data set before doing any pre-processing
// Step 2. Send Acknowledge to Kafka server regarding last offset that we consumed, if no errors in Step 1
// Step 3. Iterate over messages and process
}
@KafkaListener(topics = "topicB", containerFactory = "kafkaListenerContainerFactory", groupId = "groupB", autoStartup="true")
public void consumerTopicB(@Payload List<String> messages, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions, @Header(KafkaHeaders.OFFSET) List<Long> offsets, Acknowledgment acknowledgment) {
// Consumer for TopicB
// Step 1. Save consumed data set before doing any pre-processing
// Step 2. Send Acknowledge to Kafka server regarding last offset that we consumed, if no errors in Step 1
// Step 3. Iterate over messages and process
}
@KafkaListener(topics = "topicC", containerFactory = "kafkaListenerContainerFactory", groupId = "groupC" , autoStartup="true")
public void consumerTopicC(@Payload List<String> messages, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions, @Header(KafkaHeaders.OFFSET) List<Long> offsets, Acknowledgment acknowledgment) {
// Consumer for TopicC
// Step 1. Save consumed data set before doing any pre-processing
// Step 2. Send Acknowledge to Kafka server regarding last offset that we consumed, if no errors in Step 1
// Step 3. Iterate over messages and process
}
@KafkaListener(topics = "topicD", containerFactory = "kafkaListenerContainerFactory", groupId = "groupD", autoStartup = "true" )
public void consumerTopicD(@Payload List<String> messages, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions, @Header(KafkaHeaders.OFFSET) List<Long> offsets, Acknowledgment acknowledgment) {
// Consumer for TopicD
// Step 1. Save consumed data set before doing any pre-processing
// Step 2. Send Acknowledge to Kafka server regarding last offset that we consumed, if no errors in Step 1
// Step 3. Iterate over messages and process
}
Best practices for Kafka Consumers
1. Set last read kafka offset before starting consumer. In case of data deletion done by Admin, your offsets will be set to oldest known index. Then old data will be read again and again.
2. Define rate limits. Then you will be able to read batch of messages with given rate
3. Save batch messages as it is before doing any pre-processing. If no issues occurred during saving the batch of messages, then send the acknowledge.
4. Once processed, you can delete processed data if required.
5. Use concurrency. If you set concurrency to 5 and rate limit to 50, then you can read 250 messages via consumer.
6. Use Asynchronous mechanism to process messages with proper DB connection pool configurations.
7. Make sure your DB performance is good enough to deal with high TPS Kafka consumers.
8. To avoid reading any duplicate messages, maintain unique key constraints in DB tables.
Thank you...