Computing Aggregation with Apache Spark and APIs
At Apigee, we serve analytics for our customers that range from basic measurements of things like API traffic by time, to more sophisticated capabilities including custom dashboards and deep end-to-end analysis of a company's operations and business.
When we made the decision on the current architecture, we were sure that a relational database management system such as Postgres would fit the bill, especially given the “structured” query nature of what we were trying to do.
However, with thousands of “orgs” (our unit of separation, or, in general terms, a tenant), we could not afford (from a manageability perspective, mainly) to have one database per tenant. At the other extreme, we also couldn’t intermingle all tenants in one or a few database tables (the exception being our Apigee Edge free offering) because each tenant has a few common attributes of interest, but almost every tenant also looks for things that are unique to its APIs. We came up with a good compromise: separating by tables (each tenant has a separate table), with 10s of tenants sharing a database.
This has worked well. Yet the job of computing aggregates has posed CPU and I/O pressures on the system. SQL is great for many things, including computing aggregates, but scheduling these jobs and ensuring that they complete in time was turning out to be a burden.
So we switched to the large-scale data processing engine Apache Spark. This post describes how we made that switch and how we handle Spark to connect with, compute, and store data in multi-tenant databases.
The current analytics architecture is multi-tenanted, with separation of data for each tenant at the database. The database is Postgres. In order to serve the dashboards faster, we compute a set of predefined aggregations on raw facts at defined intervals (five minutes, by default).
The spectrum of multi-tenant deployment is wide. There are heavily multi-tenanted databases (as in the infrastructure where our Apigee Edge Free offering hosts many thousands of tenants) with relatively little traffic for each tenant. But there are also a few tenants hosted in a single database with very high traffic (upwards of 2,000 TPS).
Along with the variation in the number of tenants, there are different-sized servers starting from four cores and 8GB to 32 cores and 64GB. Also, the analytics stack isn’t just limited to cloud deployments, but also is deployed in customer data centers. The challenging problem is to develop a framework that enables efficient computation of aggregates across all kinds of deployments and for large traffic tenants as well as in deployments where there are large numbers of tenants.
The data separation for every tenant is at the table level, which allows for clean separation of data as well better management of schemas. Each fact table is partitioned by day to allow for efficient processing and purging. The aggregations are pre-computed from raw facts to serve the dashboards faster.
The computation of aggregation involves running queries (“INSERT INTO,” “SELECT FROM,” “GROUP BY”) in parallel in Postgres for each tenant at regular intervals (five minutes by default). This method of computation is quite inefficient because it could involve multiple passes of the same data set. Also, it could stress out the databases when there are multiple tenants trying to compute.
Aggregation involves combining different dimensions, and, for each such combination, deriving an “aggregate.” The aggregate can be the sum or the minimum or maximum of the various metrics in the combination. There are other functions including average, standard deviation, or percentile that can be computed on the aggregate dataset.
The aggregations in the current analytics involve API combination, developer, and computing on target, among others. The current aggregation also involves computing percentile (50th, 90th, 95th) on API response time. The cardinality of the aggregates also varies, from very low to very high (close to 13,000 for some of the aggregated data).
The problems—and the solution
Given the wide spectrum of deployment schemes, the computation of aggregation in Postgres doesn’t scale well, nor does scheduling parallel tasks.
A new way to compute should be:
- efficient and timely, such that aggregation can be completed in a cycle
- easy to deploy and manage
- extensible, because new aggregation schemes will be added as required
The new way of computation of aggregation introduces a new “aggregator service,” which is responsible for computing the aggregates reading data (raw facts) from a file. The aggregator service is a metadata drive computing framework, built atop Apache Spark; the service embeds the Spark framework and provides APIs to compute aggregation. It defines a schema to compute every aggregation; this schema contains groups organized by fields, metrics, and functions (sum, minimum, maximum, percentile, and a few custom functions).
In the next post in this two-part series, we’ll detail the new method of computing aggregation, the results of employing it, and the lessons we learned from this process.
Image: Mister Pixel/The Noun Project