Performance Metrics

We compute several high-level metrics that indicate potential problems, or, if you will, optimization opportunities.

Generally, the higher the score, the more likely or severe the problem is. Of course, because each job is different, we want to stress that these are potential problems. If you examine your job, and decide it’s doing what it should, it’s fine to ignore that job’s problem metics in future.

We find it useful to review the trends of the problem metrics. Large jump might indicate a regression that require investigation.

Spill

The spill problem score shows the sum of the Spill (Disk) metric across all stages.

Each task of a Spark stage processes a given chunk of data. For example, for a join, each partition of each side of the join must be sorted. Ideally, the processing is done entirely in memory. If not enough execution memory is available, Spark can use disk as scratch space, and this metric shows exactly show much data had to be written to disk.

Spill is never a good thing, but spilling large amounts of data can considerably slow down the job, especially if you’re using cloud instances with network disks.

The recommended way to reduce spill is to decrease size of partitions by increasing parallelism

  • If the operation that generates spill happens after shuffle (so the Shuffle Read metric in the Spark UI is above zero), it is best to increase the spark.sql.shuffle.partitions configuration value.
  • If spliling happens immediately when reading data (so the stage has zero Shuffle Read metric and non-zero Input Size), this suggests that source data has too large partitions, and it’s recommended to adjust the producer of that data.

In some setups of Spark Performance Advisor, the spark.sql.shuffle.partitions configuration parameter is automatically adjusted using execution history to minimize spill.

Wasted task time

The wasted task time metric shows how much execution time was wasted due to job errors. When an application fails, all the task time is considered waisted. For a succeeded application, we consider wasted all task time of failed tasks, as well as all retries of a task, except the very last.

Typical issues that result in wasted task time are:

  • Outright code errors that prevent job execution.
  • Loss of executor, which in turn causes partial loss of already prepared data, and subsequent computation. The are two most common reason:
    • Spot interruption (when using spot instances). The Executor removed: spot interruption metric shows the number of executors removed due to spot interruption. Spot interruptions are caused by your environment, and generally can’t be solved at the job level. Instead, you might consider using different kinds of cloud instances.

    • Executor loss due to out-of-memory error, shown in the Executor removed: OOM column, can usually be addressed by changing the application settings. To do that, determine the exact OOM reason by reviewing the Spark UI, Jobs tab, the Event Timeline. section.

      • Memory overhead OOM means that the executor process used more physical memory than was allocated to it, which in turn is almost always caused by unexpected amount of native memory (outside JVM heap) allocated either by native libraries or by JVM itself. This condition can be diagnosed by message saying The executor with id N exited with exit code 137(SIGKILL, possible container OOM) ... termination reason: **OOMKilled** In this case, you must change the values of the spark.executor.memory and spark.executor.memoryOverhead to increase memory overhead. Their sum must still be less than allocatable memory in your envrionment.
      • Heap OOM means that the JVM has ran out of heap storage. In that case, Spark UI status for that executor will end with termination reason: Error, and the executor logs contain java.lang.OutOfMemoryError: Java heap space. This is a rate condition, and you can alleviate it by:
        • Decreasing the partition size, by increasing spark.sql.shuffle.partitions
        • Decreasing spark.executor.cores (the number of parallel tasks per executor)
        • Decreasing spark.memory.fraction
  • Driver OOM error. Similarly it can be caused by:
    • memoryOverhead OOM, where driver log or Kubernetes pod status might contain Final state: FAILED (driver container failed with ExitCode: 137, Reason: OOMKilled). To address it, you can increase driver memory overhead. The specific way to do it depends on how to run the Spark job.
    • heap OOM, where driver log might contain java.lang.OutOfMemoryError: Java heap space. In this case, it’s recommended to investigate the reason for memory consumption. For example, it can be caused by collecting too much data on the driver. In rare cases, it can be caused by having too many stages and tasks. If no optimization is possible, you can increase driver memory.
  • Running out of disk space, which is indicated by the No space left on device message. As a rule of thumb, Spark job input should not be more than executor_count * executor_disk_size / 2.
  • There might be accidental task failures, especially for tasks that interact with external services. They are retried, and so some time is wasted, but it is rarely important.

Unused executors

The unused executors metric shows what percentage of time the executors are running without actually executing any tasks.

More formally:

  • We compute maximum possible task time as total_executor_run_time * core_count
  • We decide there’s inefficiency if task_time / max_possible_task_time < 0.4 and max_possible_task_time > 560 min
  • If so, we report inefficiency score as max_possible_task_time - task_time

While the actual task time is always less than 100% utilization of all executors, if it is considerably less it means that for a long of time, executors or some of their cores are not doing anything, while still reserving the codes.

Typical reasons that can be optimized include:

  • Executes are idle while some synchronous operations are performed on driver. For example, a driver might collect() data and call external API on each row. In that case, it’s possible to either
    • Perform the operations on executors, for example using DataFrame.foreach
    • Reduce the number of executors by decreasing the spark.dynamicAllocation.maxExecutors parameter.
  • Large DataFrame is being saved using df.repartition(numPartitions = 1), usually to avoid creating many small files. In that case, it is recommended to dynamically compute the desired number of partitions given the desired size of Parquet files and input data.

Skew

Skew is a classic problem when one or several partitions take considerably more time than others, and the total job uptime is much higher that would be if time is equally distributed.

The metric contains the sum of taskTimeMax - taskTime95Percentile across all stage, in minutes, but only if the condition (taskTimeMax - taskTime95Percentile) > 5min and taskTimeMax/taskTime95Percentile > 10 holds. In other words, counting only skews that can seriously affect the run time.

If skew is found, it is necessary to determine which data cause it. Often, it is caused by nulls, or other placeholder values such as UNKNOWN in join/group columns. The salting technique can be used to address this problem.

Starting with Spark 3, the adaptive query execution (AQE) can often automatically correct skew for joins. However, it is limited and skew can happen in presense of window functions, grouping and partition coalesce.

App time problem

The app time problem field contains the number of Spark application which took too long to execute. The exact condition right now is "App uptime" > 8h or "Total task time" > 1000h.

While long-running jobs are sometimes necessary, they generally deserve investigation, because:

  • It might be a result of hangs, or accidental processing of excessive data (for example, recomputing all history every day)
  • Long-running job is fragile — a recoverable error might still cause a lot of wasted time. Therefore, long running jobs are better split into smaller incremental jobs.

Input size

The input size problem field contains the number of Spark application that read a lot of data. Specifically, the limit is 3TB.

While in some cases, reading a lot of data is necessary, and while it might happen as result of a large backfill job, for regular application it’s recommended to check if reading all the data is necessary, or whether it’s possible to make the job incremental, reading just one day and appending the data to the previously computed tables.