Computing Aggregation with Apache Spark & APIs, Pt. 2
In our previous post, we described Apigee’s multi-tenant analytics architecture. Here, we’ll detail the new method of computing aggregation by describing our switch to large-scale data processing engine Apache Spark. We’ll also discuss the results of employing it, and our learnings from this process.
The new way of computing aggregation involves three steps:
- Data preparation Raw facts are dumped for aggregation into a solid-state drive to an efficient I/O
- Aggregate computation The aggregator service runs locally in a Postgres server (where raw facts are ingested); the aggregator service is invoked with an API with raw facts as inputs and a context that has details of aggregates to be computed
- Results loading Load results from successful aggregate computation back to Postgres
To manage the above tasks, an orchestrator is developed which runs them for each scope at aggregation interval. The aggregation keeps track of metadata on aggregation states, thus on any failure it can recover and continue.
The input to aggregator service is a file (raw facts, which are text delimited files stored in local disk, S3, or HDFS) along with a context that contains the aggregates to be computed. The service on launch initializes a Spark context that is used to launch multiple map, transform, and reduce functions on the dataset to compute aggregates.
The dataset is loaded into memory and, in a single pass, all the aggregations are computed. This enables efficient data processing. Spark abstracts out the dataset as resilient distributed datasets (RDDs), which allows data and intermediary computations to be cached and multiple computation tasks to be run on the same loaded dataset.
Core aggregation is composed of three tasks:
- a transform task to derive new dimensions and metrics
- a map task that generates a combination
- a reduce task that combines the mapped data and computes the metrics
The functions sum(), min(), max(), count(), and avg() are simple to compute because they are commutative and associative, whereas computing percentiles is not straightforward.
Three different approaches were used to compute the percentile:
- use all the data
- use sketch structures (TDigest & QDigest) to allow approximate computation with very little memory
- use histograms and merge in final reduce
The second and third approaches are efficient in terms of space but suffer from being less accurate and slower in terms of computation time (a lot of time is spent merging sketches). Because Spark manages memory much more efficiently, using all the data points turns out to be faster compared to approaches two and three.
The computation scales because:
- data movement is limited to a single server, so it’s very fast; SSD drives enable faster access to all data operations
- a single pass over the dataset to compute is also fast and efficient
- aggregator service is based on Spark, which efficiently uses the cores and memory; Spark also has low overhead in launching tasks, so it handles smaller files well
- when Spark is run in local mode, most of the data is in memory (PROCESS_LOCAL) or is fetched from disk
Spark also simplifies deployment because all components are locally deployed. The aggregator service is a jar launched as a service (with the aid of Spring Boot), so it’s easy to manage and deploy. With no extra management of clusters, it’s a boon to on-premise setup. The aggregator-service is also self-sizing; it picks the number of cores to use and limits itself to memory based on servers deployed.
The framework is extensible because it can be metadata-driven, with many of the tasks abstracted out. This same framework can be used in other applications, like rollups of aggregated data and computations of other kinds of aggregates.
The aggregator service is not limited to reading data from a local disk. It can also be read from S3 or HDFS and it can load back the results into different sinks. The above described deployment is one where source and sink are one and the same, and data is processed locally.
In a case where the aggregation service is down, there will be “holes” in the aggregated data. These holes are filled when the aggregation services are restarted (using metadata from aggregation states). The design enables aggregations keep working on fresh data while processing the backlogs in the background, so customers have access to the latest data.
In one case we experienced, the aggregation service was down for two hours due to database maintenance; after restarting, the aggregation was completed in 48 minutes (for datasets of 4,000 tps), while allowing the computation on fresh data.
New aggregation computations are tested in various deployments (two cores to 32 cores, 4GB to 64GB) and small to large datasets (9,000 or more TPS in the test environment and 4,000 TPS in production). The aggregation cycle:
- takes about two minutes to complete a file size of about 1.5GB in 4,000 TPS, with every file generated at five-minute intervals
- takes three minutes to compute on 3.4GB generated by 9,000 TPS (five-minute intervals ~2.7 million records)
- completes within a cycle of five minutes, in various deployments, small to large tenants, and low- to high-traffic tenants
Aggregation time in seconds (median in 10 iterations)
The results of aggregations are validated in a test setup by comparing the counts and aggregated rows taken from this solution and the counts from the Postgres server where aggregation is run on the same datasets. In production, a lag detector is developed to keep track of lags along with a count-auditor to match the aggregated count with the counts from firing aggregated SQL on raw facts.
Embedding Spark and running it in local mode continuously isn’t a problem, but you need to be very careful about releasing cached RDDs and results, or else there will be memory bloat and application crawls.
We also found that:
parallel garbage collection (GC) for the old and young generation works well and much better than other GC schemas
shuffle in large cardinality datasets can be a tricky problem, as the executor memory needs to be set accordingly, or else computation fails
aggregator service with little tweaks runs in distributed mode; local mode runs show a performance gain of 23% compared to distributed mode
Deployment tweaks in a multi-tenant database
In order to dump the raw facts for a interval, each table needs to be scanned; this could be a tedious problem if there’s a huge number of tables. But in deployment, where there are large numbers of tenants, the traffic is very low.
An efficient mechanism can be developed to schedule the tenants based on the activity (incoming traffic): active tenants are scheduled first and then the dormant ones later. One of the problems we discovered was a huge number of child table partitions (about four million). This made the database inefficient for any query, because of the large metadata overhead.
The child tables are effective when traffic is high, but there's a quite a bit of overhead when traffic is low. So we cleaned up child table partitions and all the data was directed to the parent table. This allowed the scan times to shrink from two seconds to a few milliseconds, so that in a single cycle of five minutes, all 7,000+ scopes could be scanned.
Because the aggregator-service manages resources very well and has low overhead for small files, the same framework is pushed to compute aggregations, with a few tweaks to accommodate the large number of parallel tasks required in this deployment. With these changes deployed, within developer pods all the eight aggregations are computed for all scopes (with traffic) within a five-minute interval.
The generality of the Spark computing framework allows for different ways of adoption. Running in local mode allows the aggregations to be computed efficiently and makes the deployment and management much simpler.