Monday, October 18, 2021

Reliable Streaming / Messaging with Apache Kafka Integrations






Eco System of Kafka




Kafka Architecture - ( Push Pull Model, Brokers , Topics, Partitions, Data Offsets ) 



Kafka Architecture benefits over RabbitMQ and ActiveMQ


Apache Kafka

ActiveMQ / RabbitMQ


Good for High TPS and near real-time systems


Good for low TPS background jobs

Reliable Messaging Life Cycle Available

send →re-try -> In-flight → ack

Not developed like Kafka

Can be re-processed old messages

Once consumed, messages cannot be re-processed

High Resiliency, Fault Tolerance, & High Availability with Zookeeper integration

Average

Consumer guarantee the message delivery – PULL based messaging

Publisher guarantee the message delivery – PUSH based messaging

Supports high throughput due to sequential disk I/Os.

Low performance

Guarantee the message order based on Partition offsets

No message ordering

Can work with slow consumers as well.

Slow consumers can cause memory issues.

Highly scalable with partition replications

Average



Setting-up Kafka Services Locally.

1. Download Open JDK 11, Kafka Framework kafka_2.13-2.8.0.

2. Extract Java and Kafka to your preferred place. In my case i have installed both in below location : /data/software

3. Open terminal and Start Kafka services

export JAVA_HOME="/data/software/jdk-11.0.2"
cd /data/software/kafka_2.13-2.8.0/bin

./zookeeper-server-start.sh ../config/zookeeper.properties &
./kafka-server-start.sh ../config/server.properties &
disown
disown

4. Create Topic in Kafka Service 

export JAVA_HOME="/data/software/jdk-11.0.2"
cd /data/software/kafka_2.13-2.8.0/bin
./kafka-console-producer.sh --topic test_topic  --bootstrap-server localhost:9092

5. It provides prompt to publish data like below
    >

6. Publish some data and enter
    > {"name":"John", "age":30, "car":null} 

7. Consume the published data in a separate terminal

export JAVA_HOME="/data/software/jdk-11.0.2"
cd /data/software/kafka_2.13-2.8.0/bin
./kafka-console-consumer.sh -bootstrap-server localhost:9092 -topic test_topic

8. View data

>   {"name":"John", "age":30, "car":null}





Kafka Consumer in Java


Config Class

@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> prop = new HashMap<>();
prop.put("compression.type", "lz4");   // Compression type to increase delivery efficiency 
prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // Offset will be read from last read index
prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // Kafka server IP:PORTS, Can add multiple IP:PORTS
prop.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "50"); // Batch Size / Rate limit / TPS
prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");

return prop;

}

Use below properties in if you are working on Kerbros Authentication

System.setProperty("java.security.auth.login.config", path to jaas config file);
System.setProperty("java.security.krb5.conf", path to krb config file);
prop.put(CommonClientConfigs.SESSION_TIMEOUT_MS_CONFIG, sessionTimeOut);
prop.put(CommonClientConfigs.HEARTBEAT_INTERVAL_MS_CONFIG, heartBeat);
prop.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
prop.put("sasl.kerberos.service.name", "kafka");


@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
factory.setConcurrency(concurency);  // Number of concurrent listeners
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
return factory;
}




Sample KRB file ( Will be given by Kafka Admin )

krb5.conf

[libdefaults]
default_realm = MYAPP.MYAPP.COM
dns_lookup_kdc = true
dns_lookup_realm = false
ticket_lifetime = 86400
renew_lifetime = 604800
forwardable = true
default_tgs_enctypes = rc4-hmac des3-hmac-sha1 arcfour-hmac des-hmac-sha1 des-cbc-md5 des-cbc-crc
default_tkt_enctypes = rc4-hmac des3-hmac-sha1 arcfour-hmac des-hmac-sha1 des-cbc-md5 des-cbc-crc
permitted_enctypes = rc4-hmac des3-hmac-sha1 arcfour-hmac des-hmac-sha1 des-cbc-md5 des-cbc-crc
udp_preference_limit = 1
kdc_timeout = 9000
ignore_acceptor_hostname = true
[realms]
MYAPP.MYAPP.COM = {
        kdc = hofdmc04.MYAPP.MYAPP.com
        admin_server = hofdmc04.MYAPP.MYAPP.com
        #kdc = MYAPP.MYAPP.com
        #admin_server = MYAPP.MYAPP.com
}


Sample jaas config file( Will be given by Kafka Admin ) - For IBM Machines

jaas.conf

KafkaClient {

       com.ibm.security.auth.module.Krb5LoginModule required
       credsType=both
       useKeytab="path to keytab file"
       refreshKrb5Config=true
       principal="myapp@abc.com";
};


Sample jaas config file ( Will be given by Kafka Admin ) - For Linux Machines


jaas.conf ( Add both Client and KafkaClient )

Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="myapp.keytab"
principal="myapp@abc.com";
};

KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="myapp.keytab"
principal="myapp@abc.com";
};



Consumer Class for Multiple Topics

@KafkaListener(topics = "topicA", containerFactory = "kafkaListenerContainerFactory", groupId = "groupA", autoStartup="true")
public void consumerTopicA(@Payload List<String> messages, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions, @Header(KafkaHeaders.OFFSET) List<Long> offsets, Acknowledgment acknowledgment) {

       // Consumer for TopicA
       // Step 1. Save consumed data set before doing any pre-processing
       // Step 2. Send Acknowledge to Kafka server regarding last offset that we consumed, if no errors in Step 1
       // Step 3. Iterate over messages and process
        
}
@KafkaListener(topics = "topicB", containerFactory = "kafkaListenerContainerFactory", groupId = "groupB", autoStartup="true")
public void consumerTopicB(@Payload List<String> messages, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions, @Header(KafkaHeaders.OFFSET) List<Long> offsets, Acknowledgment acknowledgment) {

       // Consumer for TopicB
       // Step 1. Save consumed data set before doing any pre-processing
       // Step 2. Send Acknowledge to Kafka server regarding last offset that we consumed, if no errors in Step 1
       // Step 3. Iterate over messages and process

}
@KafkaListener(topics = "topicC", containerFactory = "kafkaListenerContainerFactory", groupId = "groupC" , autoStartup="true")
public void consumerTopicC(@Payload List<String> messages, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions, @Header(KafkaHeaders.OFFSET) List<Long> offsets, Acknowledgment acknowledgment) {

       // Consumer for TopicC
       // Step 1. Save consumed data set before doing any pre-processing
       // Step 2. Send Acknowledge to Kafka server regarding last offset that we consumed, if no errors in Step 1
       // Step 3. Iterate over messages and process

}
@KafkaListener(topics = "topicD", containerFactory = "kafkaListenerContainerFactory", groupId = "groupD", autoStartup = "true" )
public void consumerTopicD(@Payload List<String> messages, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions, @Header(KafkaHeaders.OFFSET) List<Long> offsets, Acknowledgment acknowledgment) {

       // Consumer for TopicD
       // Step 1. Save consumed data set before doing any pre-processing
       // Step 2. Send Acknowledge to Kafka server regarding last offset that we consumed, if no errors in Step 1
       // Step 3. Iterate over messages and process

}


Best practices for Kafka Consumers

1. Set last read kafka offset before starting consumer. In case of data deletion done by Admin, your offsets will be set to oldest known index. Then old data will be read again and again.

2. Define rate limits. Then you will be able to read batch of messages with given rate

3. Save batch messages as it is before doing any pre-processing. If no issues occurred during saving the batch of messages, then send the acknowledge. 

4. Once processed, you can delete processed data if required.

5. Use concurrency. If you set concurrency to 5 and rate limit to 50, then you can read 250 messages via consumer.

6. Use Asynchronous mechanism to process messages with proper DB connection pool configurations.

7. Make sure your DB performance is good enough to deal with high TPS Kafka consumers.

8. To avoid reading any duplicate messages, maintain unique key constraints in DB tables. 


Thank you...

No comments:

Post a Comment