The National Retail Federation reported more than 189.6 million Americans shopped from Thanksgiving through Cyber Monday 2019. The survey found that 124 million people made purchases in stores while 142.2 million shopped online; demonstrating today’s seamless shopping world, 75.7 million participated in both. Given Optimizely’s partnership with retail online businesses, the Thanksgiving holiday is one of the most important periods of the year. It was crucial that our internal systems and infrastructure could handle high traffic volume without downtime or performance degradation. This was necessary to ensure customers could reliably run experiments, audience targeting and feature rollouts, and measure the metrics that matter to them using our experimentation, targeting, and feature management platform.
On a normal day, Optimizely collects and processes billions of signals (referred to as events) across customer experiences all over the world. On peak days during the Thanksgiving holidays, the platform processes in excess of 10+ billion events, delivering 3.3 billion unique experiences (impressions) to over 693 million visitors.
In this blog post, we present a high level overview of Optimizely’s Data Platform and detail how we planned, prepared, and scaled the ingestion and streaming components of our data platform to meet the high demands of Black Friday and Cyber Monday traffic without compromising stability or availability.
The data platform at Optimizely is responsible for the systems and APIs that collect, process, store, and query customers’ experiment data. In addition, it also supports the product features and user experiences presented in the product UI. Data Platform at Optimizely can be broken down into two primary areas:
- Data Processing: The infrastructure that collects, processes, and stores data
- Data Analytics: The infrastructure that enables experiment analysis
Preparation and planning for Black Friday and Cyber Monday started several weeks prior to the shopping season. It was a cross-disciplinary effort that involved distributed systems engineers, site reliability engineers, technical support engineers, and customer success managers.
Our goal was to scale the platform in accordance with traffic needs to maintain our internal service level objectives (SLO) while balancing the increased hosting costs. In order to do so, we needed to collect several different metrics from our customers and our internal systems. Event data from retail customers makes up nearly half of the typical event volume which is 8-10 billion events per day. Most retail customers expected to see increased visitor interaction on their retail websites related to Black Friday/Cyber Monday sales. Our Customer Success Managers worked with our retail customers to gather the expected event volume statistics. Based on that, we ascertained that the infrastructure needed to handle the following:
- 2 – 3x increase incurrent daily event volume
- 2x increase in peak event throughput
- 1.5x increase in unique daily visitors
- 2x increase of daily targeting requests offering personalized experiences
The first line of data ingestion happens at our event logging endpoint where the events are buffered into Embedded-Flume (In-memory channel). The flume agents are co-located with the Event Logging Tier and they consume the events from E-flume, filter the events and write them to respective Kafka topics and to S3. This component is referred to as LogTier (Event-Api + Flume). We use Apache Kafka extensively as a message bus to transport data and power real time streaming services such as partitioning, enrichment, aggregation and attribution, ultimately enabling businesses to deliver continuous experimentation and personalization across websites, mobile apps and connected devices.
As LogTier and Kafka are the backbone of our infrastructure, we needed to ensure the ingestion pipeline could handle 2 – 3x of the current volume. Our LogTier has an Service Level Objective (SLO) of maintaining 99.98% availability. To identify our scaling needs, we took into consideration the following key metrics of LogTier and Kafka over many months of data:
- ELB request count: number of requests going through ELB
- E-flume channel size: Size of three in-memory channels that buffer events for the co-located flume agents to consume
- Flume agent channel size: Sizes of file channels that buffer events to disk before finally producing to kafka and s3
- Kafka produce timing: Kafka produce timings must be small in order to maintain low latency for the event logging endpoint
- Disk Usage/IOPS: Disk usage must remain below the provisioned EBS volume as the events are stored in file channels of sidecar flume agents for durability
- Messages received: Number of messages received by each broker
- Message produce request rate: Number of produce requests sent to the broker by clients wanting to write messages to kafka topics
- Disk utilization: Portion or Percentage of disk storage that is currently in use
- CPU utilization: Amount of work handled by the CP
- Heap used: Size in bytes of heap used by the process
- Request produce time (in ms): The time taken to write the message to Kafka by the producer.
- Replication lag: The time it takes for the followers of the Kafka partition to be in sync with the leader of the partition.
After looking into historical data for these metrics and forecasting expected increases, we expanded the LogTier cluster by roughly 30% to a total of 250 m5.xlarge instances. Similarly we expanded one of our Kafka clusters by almost doubling the number of brokers to cope with the increased CPU and disk needs. Our primary Kafka cluster was already equipped to handle the increased event volume. We have dashboards highlighting all the key metrics and also a playbook to react and respond quickly in case of any blockages in our event ingestion pipeline. Following is the LogTier availability metrics dashboard during the Black Friday & Cyber Monday holidays with an availability of 99.98%.
Apache Samza has been a great asset to our event ingestion and streaming pipeline. It enables us to perform large-scale, real-time stream computing such as data enrichment and session aggregations over billions of events per day. More information on how we leverage Samza can be found in the following blogs: From batching to streaming at Optimizely – part 1 and From batching to streaming at Optimizely – part 2.
We identified key metrics to estimate our scaling needs. The key metrics vary for different streaming processing jobs per its functionalities but the common ones were as follows:
- Processing rate: Number of messages processed per second
- Kafka consumer lag: Delta between the consumer offset and the latest offset of the kafka topic
- RocksDB gets vs puts: This is used as a key-value store for state management by Samza. Our sessionization and attribution tasks utilize RocksDB to perform stateful stream processing functionalities.
Our Samza deployment relies on YARN (Yet Another Resource Negotiator) to manage resources and containers, each running an application process. Containers are scheduled to run on Node Managers (Workers), which are m5.xlarge instances in our setup. We estimated the peak processing rate by stopping all stream consumption. This caused a lag to build up in our Kafka consumers. After some time, we started processing again taking note of how quickly we could catch back up to real time.
With the historical dataset, our pre-processing and sessionization jobs had enough headroom to handle 4x the traffic. Our stateful streaming job, attribution, enriches each event with layer states (what kind of experiment/variation the event is attributed to) by fetching the information from the attribution database backed by HBase. We wanted to prepare for the following scenarios: 1) If containers are lost, the task should quickly bootstrap the local state from external store. 2) If containers are not lost, then the processing rate should keep up the event ingestion rate.. To prevent containers from being killed, we increased the container memory from 2GiB to 4GiB; To keep the Kafka consumer lag as close to zero as possible, we doubled the container count. Our streaming pipeline handled the event volume without any hiccups and our results freshness was not impacted.
Monitoring and Alerting
Most of our application and system metrics are collected via tcollector or datadog-agent and are pushed to OpenTSDB or DataDog respectively. We have defined monitors and dashboards to track application and system level metrics. We made sure that there were playbooks for every component of the Data Platform so that our on-call engineers knew how to react appropriately and resolve incidents without impact to availability.
Black Friday and Cyber Monday ‘19 was a huge success for Optimizely as the platform gracefully handled the 2-3x increase in normal peak traffic. In the next blog post, we will detail the scaling story of our Storage and Reporting Infrastructure. Thanks to everyone’s hard work and preparation throughout this journey who helped in scaling up the platform to meet the scale of Black Friday and Cyber Monday.