Shubhajit-Ming

Monday, May 02, 2022

Machine Learning: Development and Deployment Workflow

 There is significant financial upside and business benefit in understanding how to avoid the

potential pitfalls of an AI development initiative so that you can quickly capture positive ROI. To get

a sense of the challenges in developing and deploying AI applications at scale—and why the right

expertise, partners, and development platform are critical—let’s look at what’s involved in a

machine learning development process. In this section, I outline the sequential workflow in

developing and deploying a machine learning AI application. This process is well understood by

machine learning experts.

1. Data Assembly and Preparation

The first step is to identify the required and relevant data sets, and then assemble the data in a

unified image that is useful for machine learning. Because the data come from multiple disparate

sources and software systems, there are often issues with data quality such as data duplication, gaps

in data, unavailable data, and data out of sequence. The development platform must therefore

provide tools to address those issues, including capabilities to automate the process of ingesting,

integrating, normalizing, and federating data into a unified image suitable for machine learning.

2. Feature Engineering

The next step is feature engineering. This involves going through the data and crafting individual

signals that the data scientist and domain experts think will be relevant to the problem being solved.

In the case of AI-based predictive maintenance, signals could include the count of specific fault

alarms over the trailing 7 days, 14 days, and 21 days; the sum of the specific alarms over the same

trailing periods; and the maximum value of certain sensor signals over those trailing periods.

3. Labeling the Outcomes
This step involves labeling the outcomes the model tries to predict (e.g., “engine failure”). Often the
specific outcomes are not clearly defined in the data since the original source data sets and business
processes were not originally defined with AI in mind. For example, in AI-based predictive
maintenance applications, source data sets rarely identify actual failure labels. Instead, practitioners
have to infer failure points based on combinations of factors such as fault codes and technician work
orders.
4. Setting Up the Training Data
Now comes the process of setting up the data set for training the algorithm. There are a number of
nuances to this process that may require outside expertise. For classification tasks, data scientists
need to ensure that labels are appropriately balanced with positive and negative examples to provide
the classifier algorithm enough balanced data. Data scientists also need to ensure the classifier is not
biased by artificial patterns in the data. For example, in a recent fraud detection deployment for a
utility, a classifier trained on historical cases on a large country-wide data set incorrectly identified a
number of suspected fraud cases on a remote island. Further examination revealed that because the
island is so remote and hard to access, investigators traveled there only if they were certain of fraud.
All historical cases investigated on the island were therefore true positive labels. Consequently, the
classifier always correlated the island location with incidence of fraud, so the algorithm had to be
adjusted.
5. Choosing and Training the Algorithm
The next step is to choose the actual algorithm and then train it with the training data set.
Numerous algorithm libraries are available to data scientists today, created by companies,
universities, research organizations, government agencies, and individual contributors. Many are
available as open source software from repositories like GitHub and Apache Software Foundation.
AI practitioners typically run specialized searches across these libraries to identify the right
algorithm and build the best-trained model. Experienced data scientists know how to narrow their
searches to focus on the right classes of algorithms to test for a specific use case.
6. Deploying the Algorithm into Production
The machine learning algorithm then must be deployed to operate in a production environment: It
needs to receive new data, generate outputs, and have some action or decision be made based on
those outputs. This may mean embedding the algorithm within an enterprise application used by
humans to make decisions—for example, a predictive maintenance application that identifies and
prioritizes equipment requiring maintenance to provide guidance for maintenance crews. This is
where the real value is created—by reducing equipment downtime and servicing costs through more
accurate failure prediction that enables proactive maintenance before the equipment actually fails.
In order for the machine learning algorithm to operate in production, the underlying compute
infrastructure needs to be set up and managed. This includes elastic scale-out and big data
management abilities (e.g., ingestion, integration, etc.) necessary for large data sets.
7. Closed-Loop Continuous Improvement
Once in production, the performance of the AI algorithm needs to be tracked and managed.
Algorithms typically require frequent retraining by data science teams as market conditions change,
business objectives and processes evolve, and new data sources are identified. Organizations need to
maintain technical agility so they can rapidly develop, retrain, and deploy new models as
circumstances change.
The science of AI has evolved and matured over the last several decades. We are now at a point
where not only are the underlying technologies available, but also organizations now have access to
domain experts, data scientists, and professional services providers that can help them harness the
power of AI for competitive advantage.

Business Benefits of AI
AI technologies deliver real business benefits today. In particular, technology companies like
Google, LinkedIn, Netflix, and Amazon use AI at large scale. McKinsey Global Institute (MGI)
estimates that technology companies spent $20 billion to $30 billion on AI in 2016.24 Some of the
most established applications for AI delivering concrete business benefits are in online search,
advertising placement, and product or service recommendations.
In addition to technology companies, sophisticated industries advanced in digitization, such as
financial services and telecom, are starting to use AI technologies in meaningful ways. For example,
banks use AI to detect and intercept credit card fraud; to reduce customer churn by predicting when
customers are likely to switch; and to streamline new customer acquisition.
The health care industry is just starting to unlock value from AI. Significant opportunities exist for
health care companies to use machine learning to improve patient outcomes, predict chronic
diseases, prevent addiction to opioids and other drugs, and improve disease coding accuracy.
Industrial and manufacturing companies have also started to unlock value from AI applications as
well, including using AI for predictive maintenance and advanced optimization across entire supply
chains.
Energy companies have transformed operations using AI. Utility companies use advanced AI
applications to identify and reduce fraud, forecast electricity consumption, and maintain their
generation, transmission, and distribution assets.
There are several emerging applications of AI in defense. Already, the U.S. military uses AI-based
predictive maintenance to improve military readiness and streamline operations. Other use cases
include logistics optimization, inventory optimization, and recruiting and personnel management
(e.g., matching new recruits to jobs).



Digital Transformation and Build your Organization

 Cloud Features

Five core features of cloud computing make it essential to

digital transformation:

1. Infinite capacity: Storage and compute resources are essentially unlimited.

2. On-demand self-service: Users can unilaterally provision computing

resources without requiring human interaction from the cloud provider.

3. Broad network access: Users can operate on the cloud through traditional

telecommunication services like Wi-Fi, internet, and mobile (e.g., 3G, 4G, or

LTE) on nearly any device, which means cloud computing is accessible

anywhere.

4. Resource pooling: Cloud providers serve multiple users through a

multitenant model, enabling a pool of physical and virtual resources to be

dynamically assigned and reassigned according to each user’s demand—

thereby reducing resource costs for all users.

5. Rapid elasticity: Resources can be automatically, seamlessly, and rapidly

provisioned and deprovisioned as a user’s demand increases or decreases.


Cloud Deployment Models

Beyond its core technical features, two aspects of cloud computing—the deployment model (who owns the infrastructure) and the service model (what type of services areprovided)—have significant impacts on business operations.

There are three different deployment models, determined by

ownership:

• Public cloud is infrastructure available for use by anyone. It is owned, managed, and operated by a

business (e.g., AWS, Azure, IBM, or Google Cloud) or a government. The public cloud has gained significant traction with corporations due to its infinite capacity, near-real-time elasticity, strong security, and high

reliability.

• Private cloud is infrastructure owned by and operated for the benefit of a single organization—effectively a data center, or collection of data centers, operated on a cloud model by an organization for its exclusive use. An organization’s private cloud often has limited elasticity and finite capacity because it is gated by hardware.

• Hybrid cloud combines private and public cloud infrastructures. Hybrid cloud infrastructure is a

dynamic space, where public cloud providers are offering up dynamic extensible private cloud

environments (e.g., AWS GovCloud) within a public cloud, thereby offering the best of both worlds.

Leadership Style: A Framework

 




Leadership Style: Fundamental Tensions

Authenticity (being true to yourself self) vs. Adaptability (adjusting your self-presentation to fit
the context)
•Conformity (fitting in with others) vs. Differentiation (standing out from the pack)

Takeaways

•Leadership styles emerge as a function of individual personality traits and dispositions, as well as situational demands and constraints

•There is no one most effective leadership style; each entails tradeoffs.

•Effective leaders typically have a broad repertoire of styles and know which style to adopt in different situations.

•Leadership styles can be learned and developed through practice.

•An effective way to begin to change one’s overall leadership style is to begin by “dialing up” or “dialing down” specific communication styles:assertiveness, credibility, likeability, trustworthiness, and openness


Tuesday, August 04, 2020

Kafka Consumer Lag Monitoring

What is Consumer Lag?

When people talk about Kafka or about a Kafka cluster, they are typically referring to Kafka Brokers. You can think of a Kafka Broker as a Kafka server. A Broker is what actually stores and serves Kafka messages. Kafka Producers are applications that write messages into Kafka (Brokers). Kafka Consumers are applications that read messages from Kafka (Brokers).
Inside Kafka Brokers data is stored in one or more Topics, and each Topic consists of one or more Partitions. When writing data a Broker actually writes it into a specific Partition. As it writes data it keeps track of the last “write position” in each Partition. This is called Latest Offset also known as Log End Offset. Each Partition has its own independent Latest Offset.
Just like Brokers keep track of their write position in each Partition, each Consumer keeps track of “read position” in each Partition whose data it is consuming. That is, it keeps track of which data it has read. This is known as Consumer Offset. This Consumer Offset is periodically persisted (to ZooKeeper or a special Topic in Kafka itself) so it can survive Consumer crashes or unclean shutdowns and avoid re-consuming too much old data.
Image title

Kafka Consumer Lag and Read/Write Rates

In our diagram above we can see yellow bars, which represents the rate at which Brokers are writing messages created by Producers.  The orange bars represent the rate at which Consumers are consuming messages from Brokers. The rates look roughly equal – and they need to be, otherwise the Consumers will fall behind.  However, there is always going to be some delay between the moment a message is written and the moment it is consumed. Reads are always going to be lagging behind writes, and that is what we call Consumer Lag. The Consumer Lag is simply the delta between the Latest Offset and Consumer Offset.

Why is Consumer Lag Important?

Many applications today are based on being able to process (near) real-time data. Think about performance monitoring system like SPM or log management service like Logsene. They continuously process infinite streams of near real-time data. If they were to show you metrics or logs with too much delay – if the Consumer Lag were too big – they’d be nearly useless.  This Consumer Lag tells us how far behind each Consumer (Group) is in each Partition.  The smaller the lag the more real-time the data consumption.

Monitoring Read and Write Rates

Kafka Consumer Lag and Broker Offset Changes
As we just learned the delta between the Latest Offset and the Consumer Offset is what gives us the Consumer Lag.  In the above chart from SPM you may have noticed a few other metrics:
  • Broker Write Rate
  • Consume Rate
  • Broker Earliest Offset Changes
Image title
The rate metrics are derived metrics.  If you look at Kafka’s metrics you won’t find them there.  Under the hood SPM collects a few metrics with various offsets from which these rates are computed.  In addition, it charts Broker Earliest Offset Changes, which is  the earliest known offset in each Broker’s Partition.  Put another way, this offset is the offset of the oldest message in a Partition.  While this offset alone may not be super useful, knowing how it’s changing could be handy when things go awry.  Data in Kafka has has a certain TTL (Time To Live) to allow for easy purging of old data.  This purging is performed by Kafka itself.  Every time such purging kicks in the offset of the oldest data changes.  SPM’s Broker Earliest Offset Change surfaces this information for your monitoring pleasure.  This metric gives you an idea how often purges are happening and how many messages they’ve removed each time they ran.
There are several Kafka monitoring tools out there that, like LinkedIn’s Burrow, whose Offset and Consumer Lag monitoring approach is used in SPM.  If you need a good Kafka monitoring solution, give SPM a go.  Ship your Kafka and other logs into Logsene and you’ve got yourself a DevOps solution that will make troubleshooting easy instead of dreadful.

Kafka Architecture: Core Kafka

Kafka Architecture - Core Kafka Diagram

Kafka Needs ZooKeeper

Kafka uses ZooKeeper to do leadership election of Kafka broker and topic partition pairs. Kafka uses ZooKeeper to manage service discovery for Kafka brokers that form the cluster. ZooKeeper sends changes of the topology to Kafka, so each node in the cluster knows when a new broker joins, a Broker dies, a topic was removed or a topic was added, etc. ZooKeeper provides an in-sync view of Kafka Cluster configuration.

Kafka Producer, Consumer, Topic Details

Kafka producers write to Topics. Kafka consumers read from Topics. A topic is associated with a log which is data structure on disk. Kafka appends records from a producer(s) to the end of a topic log. A topic log consists of many partitions that are spread over multiple files which can be spread on multiple Kafka cluster nodes. Consumers read from Kafka topics at their cadence and can pick where they are (offset) in the topic log. Each consumer group tracks offset from where they left off reading. Kafka distributes topic log partitions on different nodes in a cluster for high performance with horizontal scalability. Spreading partitions aids in writing data quickly. Topic log partitions are Kafka way to shard reads and writes to the topic log. Also, partitions are needed to have multiple consumers in a consumer group work at the same time. Kafka replicates partitions to many nodes to provide failover.

Kafka Architecture: Topic Partition, Consumer Group, Offset, and Producers

Kafka Architecture: Topic Partition, Consumer group, Offset and Producers Diagram

Kafka Scale and Speed

How can Kafka scale if multiple producers and consumers read and write to same Kafka topic log at the same time? First Kafka is fast, Kafka writes to filesystem sequentially, which is fast. On a modern fast drive, Kafka can easily write up to 700 MB or more bytes of data a second. Kafka scales writes and reads by sharding topic logs into partitions. Recall topics logs can be split into multiple partitions which can be stored on multiple different servers, and those servers can use multiple disks. Multiple producers can write to different partitions of the same topic. Multiple consumers from multiple consumer groups can read from different partitions efficiently.

Kafka Brokers

Kafka cluster is made up of multiple Kafka Brokers. Each Kafka Broker has a unique ID (number). Kafka Brokers contain topic log partitions. Connecting to one broker bootstraps a client to the entire Kafka cluster. For failover, you want to start with at least three to five brokers. A Kafka cluster can have, 10, 100, or 1,000 brokers in a cluster if needed.

Kafka Cluster, Failover, ISRs

Kafka supports replication to support failover. Recall that Kafka uses ZooKeeper to form Kafka Brokers into a cluster and each node in Kafka cluster is called a Kafka Broker. Topic partitions can be replicated across multiple nodes for failover. The topic should have a replication factor greater than 1 (2, or 3). For example, if you are running in AWS, you would want to be able to survive a single availability zone outage. If one Kafka Broker goes down, then the Kafka Broker which is an ISR (in-sync replica) can serve data.

Kafka Failover vs. Kafka Disaster Recovery

Kafka uses replication for failover. Replication of Kafka topic log partitions allows for failure of a rack or AWS availability zone (AZ). You need a replication factor of at least 3 to survive a single AZ failure. You need to use Mirror Maker, a Kafka utility that ships with Kafka core, for disaster recovery. Mirror Maker replicates a Kafka cluster to another datacenter or AWS region. They call what Mirror Maker does mirroring as not to be confused with replication.
Note that there is no hard and fast rule on how you have to set up the Kafka cluster per se. You could, for example, set up the whole cluster in a single AZ so you can use AWS enhanced networking and placement groups for higher throughput, and then use Mirror Maker to mirror the cluster to another AZ in the same region as a hot-standby.

Kafka Architecture: Kafka Zookeeper Coordination

Kafka Architecture - Kafka Zookeeper Coordination Diagram

Kafka Topics Architecture

Please continue reading about Kafka Architecture. The next article covers Kafka Topics Architecture with a discussion of how partitions are used for fail-over and parallel processing.

Understanding Kafka Failover

Understanding Kafka Failover for Brokers and Consumers

This Kafka tutorial picks up right where the first Kafka tutorial from the command line left off. The first tutorial has instructions on how to run ZooKeeper and use Kafka utils.
In this tutorial, we are going to run many Kafka Nodes on our development laptop so that you will need at least 16 GB of RAM for local dev machine. You can run just two servers if you have less memory than 16 GB. We are going to create a replicated topic. We then demonstrate consumer failover and broker failover. We also demonstrate load balancing Kafka consumers. We show how, with many groups, Kafka acts like a Publish/Subscribe. But, when we put all of our consumers in the same group, Kafka will load share the messages to the consumers in the same group (more like a queue than a topic in a traditional MOM sense).
If not already running, then start up ZooKeeper (./run-zookeeper.sh from the first tutorial). Also, shut down Kafka from the first tutorial.
Next, you need to copy server properties for three brokers (detailed instructions to follow). Then we will modify these Kafka server properties to add unique Kafka ports, Kafka log locations, and unique Broker ids. Then we will create three scripts to start these servers up using these properties, and then start the servers. Lastly, we create replicated topic and use it to demonstrate Kafka consumer failover, and Kafka broker failover.

Create Three New Kafka server-n.properties Files

In this section, we will copy the existing Kafka server.properties to server-0.propertiesserver-1.properties, and server-2.properties. Then we change server-0.properties to set log.dirs to “./logs/kafka-0. Then we modify server-1.properties to set port to 9093, broker id to 1, and log.dirs to “./logs/kafka-1”. Lastly modify server-2.propertiesto use port 9094, broker id 2, and log.dirs “./logs/kafka-2”.

Copy Server Properties File:

$ ~/kafka-training
$ mkdir -p lab2/config
$ cp kafka/config/server.properties kafka/lab2/config/server-0.properties
$ cp kafka/config/server.properties kafka/lab2/config/server-1.properties
$ cp kafka/config/server.properties kafka/lab2/config/server-2.properties
With your favorite text editor, change server-0.properties so that log.dirs is set to ./logs/kafka-0. Leave the rest of the file the same. Make sure log.dirs is only defined once.

~/kafka-training/lab2/config/server-0.properties

broker.id=0
port=9092
log.dirs=./logs/kafka-0
...
Change log.dirsbroker.id and and log.dirs of server-1.properties as follows.

~/kafka-training/lab2/config/server-1.properties

broker.id=1
port=9093
log.dirs=./logs/kafka-1
...
Change log.dirsbroker.id and and log.dirs of server-2.properties as follows.

~/kafka-training/lab2/config/server-2.properties

broker.id=2
port=9094
log.dirs=./logs/kafka-2
...

Create Startup Scripts for Three Kafka Servers

The startup scripts will just run kafka-server-start.sh with the corresponding properties file.

~/kafka-training/lab2/start-1st-server.sh

#!/usr/bin/env bash
CONFIG=`pwd`/config
cd ~/kafka-training
## Run Kafka
kafka/bin/kafka-server-start.sh \
    "$CONFIG/server-0.properties"

~/kafka-training/lab2/start-2nd-server.sh

#!/usr/bin/env bash
CONFIG=`pwd`/config
cd ~/kafka-training
## Run Kafka
kafka/bin/kafka-server-start.sh \
    "$CONFIG/server-1.properties"

~/kafka-training/lab2/start-3rd-server.sh

#!/usr/bin/env bash
CONFIG=`pwd`/config
cd ~/kafka-training
## Run Kafka
kafka/bin/kafka-server-start.sh \
    "$CONFIG/server-2.properties"
Notice that we are passing the Kafka server properties files that we created in the last step.
Now run all three in separate terminals/shells.

Run Kafka servers each in own terminal from ~/kafka-training/lab2

~/kafka-training/lab2
$ ./start-1st-server.sh
...
$ ./start-2nd-server.sh
...
$ ./start-3rd-server.sh
Give the servers a minute to startup and connect to ZooKeeper.

Create Kafka Replicated Topic my-failsafe-topic

Now we will create a replicated topic that the console producers and console consumers can use.

~/kafka-training/lab2/create-replicated-topic.sh

#!/usr/bin/env bash
cd ~/kafka-training
kafka/bin/kafka-topics.sh --create \
    --zookeeper localhost:2181 \
    --replication-factor 3 \
    --partitions 13 \
    --topic my-failsafe-topic
Notice that the replication factor gets set to 3, and the topic name is my-failsafe-topic, and like before, it has 13 partitions.
Then we just have to run the script to create the topic.

Run create-replicated-topic.sh

~/kafka-training/lab2
$ ./create-replicated-topic.sh

Start Kafka Consumer That Uses Replicated Topic

Next, create a script that starts the consumer and then start the consumer with the script.

~/kafka-training/lab2/start-consumer-console-replicated.sh

#!/usr/bin/env bash
cd ~/kafka-training
kafka/bin/kafka-console-consumer.sh \
    --bootstrap-server localhost:9094,localhost:9092 \
    --topic my-failsafe-topic \
    --from-beginning
Notice that a list of Kafka servers is passed to the --bootstrap-server parameter. Only two of the three servers get passed that we ran earlier. Even though only one broker is needed, the consumer client will learn about the other broker from just one server. Usually, you list multiple brokers in case there is an outage, so that the client can connect.
Now we just run this script to start the consumer.

Run start-consumer-console-replicated.sh

~/kafka-training/lab2
$ ./start-consumer-console-replicated.sh

Start Kafka Producer That Uses Replicated Topic

Next, we create a script that starts the producer. Then launch the producer with the script you created.

~/kafka-training/lab2/start-consumer-producer-replicated.sh

#!/usr/bin/env bash
cd ~/kafka-training
kafka/bin/kafka-console-producer.sh \
--broker-list localhost:9092,localhost:9093 \
--topic my-failsafe-topic
Notice that we start the Kafka producer and pass it a list of Kafka Brokers to use via the parameter --broker-list.
Now use the start producer script to launch the producer as follows.

Run start-producer-console-replicated.sh

~/kafka-training/lab2
$ ./start-consumer-producer-replicated.sh

Send Messages

Now send a message from the producer to Kafka and see those messages consumed by the consumer.

Producer Console

~/kafka-training/lab2
$ ./start-consumer-producer-replicated.sh
Hi Mom
How are you?
How are things going?
Good!

Consumer Console

~/kafka-training/lab2
$ ./start-consumer-console-replicated.sh
Hi Mom
How are you?
How are things going?
Good!

Start Two More Consumers and Send More Messages

Now start two more consumers in their own terminal window and send more messages from the producer.

Producer Console

~/kafka-training/lab2
$ ./start-consumer-producer-replicated.sh
Hi Mom
How are you?
How are things going?
Good!
message 1
message 2
message 3

Consumer Console 1st

~/kafka-training/lab2
$ ./start-consumer-console-replicated.sh
Hi Mom
How are you?
How are things going?
Good!
message 1
message 2
message 3

Consumer Console 2nd in new Terminal

~/kafka-training/lab2
$ ./start-consumer-console-replicated.sh
Hi Mom
How are you?
How are things going?
Good!
message 1
message 2
message 3

Consumer Console 2nd in new Terminal

~/kafka-training/lab2
$ ./start-consumer-console-replicated.sh
Hi Mom
How are you?
How are things going?
Good!
message 1
message 2
message 3
Notice that the messages are sent to all of the consumers because each consumer is in a different consumer group.

Change Consumer to Be in Their Own Consumer Group

Stop the producers and the consumers from before, but leave Kafka and ZooKeeper running.
Now let’s modify the start-consumer-console-replicated.sh script to add a Kafka consumer group. We want to put all of the consumers in the same consumer group. This way the consumers will share the messages as each consumer in the consumer group will get its share of partitions.

~/kafka-training/lab2/start-consumer-console-replicated.sh

#!/usr/bin/env bash
cd ~/kafka-training
kafka/bin/kafka-console-consumer.sh \
    --bootstrap-server localhost:9094,localhost:9092 \
    --topic my-failsafe-topic \
    --consumer-property group.id=mygroup
Notice that the script is the same as before except we added --consumer-property group.id=mygroup which will put every consumer that runs with this script into the mygroup consumer group.
Now we just run the producer and three consumers.

Run this three times - start-consumer-console-replicated.sh

~/kafka-training/lab2
$ ./start-consumer-console-replicated.sh

Run Producer Console

~/kafka-training/lab2
$ ./start-consumer-producer-replicated.sh
Now send seven messages from the Kafka producer console.

Producer Console

~/kafka-training/lab2
$ ./start-consumer-producer-replicated.sh
m1
m2
m3
m4
m5
m6
m7
Notice that the messages are spread evenly among the consumers.

1st Kafka Consumer gets m3, m5

~/kafka-training/lab2
$ ./start-consumer-console-replicated.sh
m3
m5
Notice the first consumer gets messages m3 and m5.

2nd Kafka Consumer gets m2, m6

~/kafka-training/lab2
$ ./start-consumer-console-replicated.sh
m2
m6
Notice the second consumer gets messages m2 and m6.

3rd Kafka Consumer gets m1, m4, m7

~/kafka-training/lab2
$ ./start-consumer-console-replicated.sh
m1
m4
m7
Notice the third consumer gets messages m1, m4 and m7.
Notice that each consumer in the group got a share of the messages.

Kafka Consumer Failover

Next, let’s demonstrate consumer failover by killing one of the consumers and sending seven more messages. Kafka should divide up the work to the consumers that are running.
First, kill the third consumer (CTRL-C in the consumer terminal does the trick).
Now send seven more messages with the Kafka console-producer.

Producer Console - send seven more messages m8 through m14

~/kafka-training/lab2
$ ./start-consumer-producer-replicated.sh
m1
...
m8
m9
m10
m11
m12
m13
m14
Notice that the messages are spread evenly among the remaining consumers.

1st Kafka Consumer gets m8, m9, m11, m14

~/kafka-training/lab2
$ ./start-consumer-console-replicated.sh
m3
m5
m8
m9
m11
m14
The first consumer got m8, m9, m11 and m14.

2nd Kafka Consumer gets m10, m12, m13

~/kafka-training/lab2
$ ./start-consumer-console-replicated.sh
m2
m6
m10
m12
m13
The second consumer got m10, m12, and m13.
We killed one consumer, sent seven more messages, and saw Kafka spread the load to remaining consumers. Kafka consumer failover works!

Create Kafka Describe Topic Script

You can use kafka-topics.sh to see how the Kafka topic is laid out among the Kafka brokers. The ---describe will show partitions, ISRs, and broker partition leadership.

~/kafka-training/lab2/describe-topics.sh

#!/usr/bin/env bash
cd ~/kafka-training
# List existing topics
kafka/bin/kafka-topics.sh --describe \
    --topic my-failsafe-topic \
    --zookeeper localhost:2181
Let’s run kafka-topics.sh --describe and see the topology of our my-failsafe-topic.

Run describe-topics

We are going to lists which broker owns (leader of) which partition, and list replicas and ISRs of each partition. ISRs are replicas that are up to date. Remember there are 13 topics.

Topology of Kafka Topic Partition Ownership

~/kafka-training/lab2
$ ./describe-topics.sh
Topic: my-failsafe-topic    PartitionCount: 13    ReplicationFactor: 3    Configs:
    Topic: my-failsafe-topic    Partition: 0    Leader: 2    Replicas: 2,0,1    Isr: 2,0,1
    Topic: my-failsafe-topic    Partition: 1    Leader: 0    Replicas: 0,1,2    Isr: 0,1,2
    Topic: my-failsafe-topic    Partition: 2    Leader: 1    Replicas: 1,2,0    Isr: 1,2,0
    Topic: my-failsafe-topic    Partition: 3    Leader: 2    Replicas: 2,1,0    Isr: 2,1,0
    Topic: my-failsafe-topic    Partition: 4    Leader: 0    Replicas: 0,2,1    Isr: 0,2,1
    Topic: my-failsafe-topic    Partition: 5    Leader: 1    Replicas: 1,0,2    Isr: 1,0,2
    Topic: my-failsafe-topic    Partition: 6    Leader: 2    Replicas: 2,0,1    Isr: 2,0,1
    Topic: my-failsafe-topic    Partition: 7    Leader: 0    Replicas: 0,1,2    Isr: 0,1,2
    Topic: my-failsafe-topic    Partition: 8    Leader: 1    Replicas: 1,2,0    Isr: 1,2,0
    Topic: my-failsafe-topic    Partition: 9    Leader: 2    Replicas: 2,1,0    Isr: 2,1,0
    Topic: my-failsafe-topic    Partition: 10    Leader: 0    Replicas: 0,2,1    Isr: 0,2,1
    Topic: my-failsafe-topic    Partition: 11    Leader: 1    Replicas: 1,0,2    Isr: 1,0,2
    Topic: my-failsafe-topic    Partition: 12    Leader: 2    Replicas: 2,0,1    Isr: 2,0,1
Notice how each broker gets a share of the partitions as leaders and followers. Also, see how Kafka replicates the partitions on each broker.

Test Broker Failover by Killing 1st Server

Let’s kill the first broker, and then test the failover.

Kill the first broker

 $ kill `ps aux | grep java | grep server-0.properties | tr -s " " | cut -d " " -f2`
You can stop the first broker by hitting CTRL-C in the broker terminal or by running the above command.
Now that the first Kafka broker has stopped, let’s use Kafka topics describe to see that new leaders were elected!

Run describe-topics again to see leadership change

~/kafka-training/lab2/solution
$ ./describe-topics.sh
Topic:my-failsafe-topic    PartitionCount:13    ReplicationFactor:3    Configs:
    Topic: my-failsafe-topic    Partition: 0    Leader: 2    Replicas: 2,0,1    Isr: 2,1
    Topic: my-failsafe-topic    Partition: 1    Leader: 1    Replicas: 0,1,2    Isr: 1,2
    Topic: my-failsafe-topic    Partition: 2    Leader: 1    Replicas: 1,2,0    Isr: 1,2
    Topic: my-failsafe-topic    Partition: 3    Leader: 2    Replicas: 2,1,0    Isr: 2,1
    Topic: my-failsafe-topic    Partition: 4    Leader: 2    Replicas: 0,2,1    Isr: 2,1
    Topic: my-failsafe-topic    Partition: 5    Leader: 1    Replicas: 1,0,2    Isr: 1,2
    Topic: my-failsafe-topic    Partition: 6    Leader: 2    Replicas: 2,0,1    Isr: 2,1
    Topic: my-failsafe-topic    Partition: 7    Leader: 1    Replicas: 0,1,2    Isr: 1,2
    Topic: my-failsafe-topic    Partition: 8    Leader: 1    Replicas: 1,2,0    Isr: 1,2
    Topic: my-failsafe-topic    Partition: 9    Leader: 2    Replicas: 2,1,0    Isr: 2,1
    Topic: my-failsafe-topic    Partition: 10    Leader: 2    Replicas: 0,2,1    Isr: 2,1
    Topic: my-failsafe-topic    Partition: 11    Leader: 1    Replicas: 1,0,2    Isr: 1,2
    Topic: my-failsafe-topic    Partition: 12    Leader: 2    Replicas: 2,0,1    Isr: 2,1

Notice how Kafka spreads the leadership over the 2nd and 3rd Kafka brokers.

Show Broker Failover Worked

Let’s prove that failover worked by sending two more messages from the producer console.
Then notice if the consumers still get the messages.
Send the message m15 and m16.

Producer Console - send m15 and m16

~/kafka-training/lab2
$ ./start-consumer-producer-replicated.sh
m1
...
m15
m16
Notice that the messages are spread evenly among the remaining live consumers.

1st Kafka Consumer gets m16

~/kafka-training/lab2
$ ./start-consumer-console-replicated.sh
m3
m5
m8
m9
m11
m14
...
m16
The first Kafka broker gets m16.

2nd Kafka Consumer gets m15

~/kafka-training/lab2
$ ./start-consumer-console-replicated.sh
m2
m6
m10
m12
m13
...
m15
The second Kafka broker gets m15.
Kafka broker Failover WORKS!

Kafka Cluster Failover Review

Why did the three consumers not load share the messages at first?

They did not load share at first because they were each in a different consumer group. Consumer groups each subscribe to a topic and maintain their own offsets per partition in that topic.

How did we demonstrate failover for consumers?

We shut a consumer down. Then we sent more messages. We observed Kafka spreading messages to the remaining cluster.

How did we show failover for producers?

We didn’t. We showed failover for Kafka brokers by shutting one down, then using the producer console to send two more messages. Then we saw that the producer used the remaining Kafka brokers. Those Kafka brokers then delivered the messages to the live consumers.

What tool and option did we use to show ownership of partitions and the ISRs?

We used kafka-topics.sh using the --describe option.