I. Basic concept of data skew

0 22
Author: Jing Minglan, JD RetailI. Basic concept of data skew1. What is data skew...

Author: Jing Minglan, JD Retail

I. Basic concept of data skew

1. What is data skew?

I. Basic concept of data skew

To put it in the most straightforward and easy-to-understand way, data skew is nothing more than a large number of identical keys being distributed to a single partition by the partitioning, resulting in a situation where 'one person is exhausted while others are idle'. This situation is unacceptable to us and also goes against the original intention of parallel computing. Firstly, a node has to bear a huge pressure, and after other nodes finish their calculations, they have to keep waiting for this busy node, which also drags down the overall calculation time. It can be said that the efficiency is extremely low.

2. Phenomena when data skew occurs

(1) The vast majority of tasks are executed very quickly, but some tasks are extremely slow.

(2) Originally, the Spark job could run normally, but one day it suddenly encountered an OOM (Out of Memory) exception. Observing the exception stack, it turned out to be caused by our business code.

3. General conventional solutions:

(1) Increase JVM memory. This is applicable to the first scenario (there are very few unique values, and a very small number of values have a large number of record values (unique values less than a few thousand)). In this case, it is often necessary to optimize through hardware means, and increasing JVM memory can significantly improve the running efficiency

(2) Increase the number of reduces. This is applicable to the second scenario (there are many unique values, and some values of this field have much more records than others, but their proportion is also less than one percent or one thousandth). We know that in this case, the easiest result is that a large number of identical keys are partitioned into one partition, so that one reduce performs a large amount of work. If we increase the number of reduces, this situation will be alleviated relatively, as there are more computing nodes. Even if the workload is still uneven, it will be much smaller

(3) Custom partitioning. This requires the user to inherit the partition class and specify the partition strategy. This method has a significant effect

(4) Redesign the key. One approach is to add a random number to the key during the map phase. With the random number added to the key, it will not be distributed to the same node in large quantities (with a small probability). After the reduce phase, the random number can be removed

(5) Use combinner to merge. Combiner is an intermediate stage before map and reduce in the map phase. At this stage, a large number of identical key data can be merged selectively, which can be regarded as a local reduce, and then handed over to the reduce for processing. This is a good practice

4. General method to locate code with data skew?

(1) Data skew only occurs in shuffle. Below are the common operators that may trigger shuffle operations: distinct, groupByKey, reduceByKey, aggregateByKey, join, cogroup, repartition, etc. When data skew occurs, it may be due to the use of these operators in the code

(2) By observing the Spark UI, locate in which stage the data skew occurs. If submitted using the yarn-client mode, the logs can be directly viewed locally, and the current stage reached can be found in the logs; if submitted using the yarn-cluster mode, the current stage reached can be viewed through the Spark Web UI. In addition, whether using the yarn-client mode or the yarn-cluster mode, we can delve into the data volume of each task of the current stage on the Spark Web UI, thereby further determining whether the task data distribution is uneven, which leads to data skew

II. Hive data skewing

1. Hive's execution is stage-basedThe difference in the amount of data processed by map depends on the output of the reduce in the previous stage, so how to evenly distribute the data to each reduce is the fundamental solution to data skewing

2. Causes of data skewing:

1) Uneven distribution of keys

2) The inherent characteristics of business data

3) Inadequate consideration when creating tables

4) Some SQL statements themselves have data skewing

3. The manifestation of data skewing:

Data skewing occurs when SQL operators contain join/group by/and other aggregation operations, with a large number of identical keys distributed to a small number of reduces for processing. This leads to the vast majority of TASKs executing very quickly, but some TASKs executing extremely slowly. The job that could normally be executed suddenly breaks out with an OOM (memory overflow) exception one day. The task progress remains at 99% (or 100%) for a long time. On the task monitoring page, it is found that only a few (1 or several) reduce subtasks have not been completed because their data volume is significantly different from that of other reduces. The number of records in a single reduce is much different from the average number of records, usually reaching 3 times or even more. The longest runtime is much longer than the average runtime. You can view the specific job's reducer counter to assist in locating the problem.

4. Solutions for data skewing:

(1) Parameter adjustment:

hive.map.aggr=true (whether to perform aggregation at the Map stage, default is true), this setting can place the top-level aggregation operation in the Map stage, thereby reducing the data transmission time in the cleaning stage and the execution time in the Reduce stage, and improving overall performance

Set hive.groupby.skewindata=true (Hive automatically performs load balancing)

(2) Adjust the SQL statement

a. How to perform Join:

Regarding the selection of the driver table, choose the table with the most uniform distribution of join keys as the driver table

Perform column pruning and filter operations to achieve the effect of reducing the data volume when the two tables are joined, and avoid the Cartesian product

When performing table join queries in Hive, it is best to place the larger table after the join

b. Enable mapjoin for large and small table join

The principle of mapjoin: MapJoin reads all small tables into memory, matches the data of the other table with the data in memory directly in the map phase, and avoids the reduce phase due to the join operation in the map, which greatly improves the running efficiency. It is advisable to have no more than 20,000 rows in the small table to be joined, and the size should not exceed 25M.

Set parameters

set hive.auto.convert.join=true;

hive.mapjoin.smalltable.filesize=25000000 i.e., 25M

Manual specification

-- a table is a large table, with data volume in the millions

-- b table is a small table, with data volume in the hundreds

select

/+mapjoin(b)/

a.field1 as field1,

b.field2 as field2,

b.field3 as field3

from a left join b

on a.field1 = b.field1;

c. Large table Join large table:

Null values do not participate in the joinFor a simple example

select field1,field2,field3...

from log a left join user b on a.userid is not null and a.userid=b.userid

union select field1,field2,field3 from log where userid is null;

Distribute hot keysHowever, it is best to avoid using the rand() function when joining on related keys. Because in Hive, when a map task fails and needs to be recalculated, it can lead to data duplication (data loss) issues. The Spark engine using rand can easily cause sporadic inconsistency problems when tasks fail and need to be recalculated. You can use the md5 encryption of unique dimension values instead of rand(), for example: md5(concat(coalesce(sku_id, 0), '_', coalesce(dim_store_num, 0), '_', coalesce(store_id, 0), '_', coalesce(delv_center_id, 0))), where the concatenated fields are the unique granularity of the table; you can also use hash.

d. Use sum...group by instead of count(distinct) for a large number of identical special values

For example, select a, count(distinct b) from t group by a

It can be written as select a, sum(1) from (select a, b from t group by a, b) group by a;

select count (distinct key) from a

It can be written as Select sum(1) from (Select key from a group by key) t Special treatment in special cases: In some cases, it is possible to take the skewed data out and process it separately. Finally, union it back

e. Whether it is join or groupby, please do data filtering in the inner layer first, it is recommended to only keep the needed key values

f. Try to use min/max to get the maximum and minimum values; do not use row_number

g. Do not directly select *; do data filtering in the inner layer

h. Try to use sort by instead of order by

i. Clearly define the data source, do not use basic fdm or detail tables if there is an upper-level summary

J. Avoid many-to-many associations in join

When performing join link queries, confirm whether there is a many-to-many association, at least ensure that the association field of the result set of one table does not repeat

5. Typical business scenarios for example

(1) Data skew caused by null values

Scenario: In logs, there is often a problem of information loss, such as the user_id in logs. If the user_id is associated with the user_id in the user table, a data skew problem will occur.

Solution 1: Do not participate in association for user_id that is empty

select * from log a

join users b

on a.user_id is not null

and a.user_id = b.user_id union all select * from log a

where a.user_id is null;

(2) Data skew caused by association of different data types

Scenario: In the user table, the user_id field is int, and the user_id field in the log table has both string type and int type. When performing join operations on the two tables according to user_id, the default Hash operation will allocate according to the int type id, which will cause all string type id records to be allocated to one reducer.

Solution: Convert the numeric type to a string type

select * from users a

left outer join logs b

on a.usr_id = cast(b.user_id as string)

(3) A small table is neither too small nor too large, how to use map join to solve the skew problem

Use map join to solve the data skew problem when a small table (few records) is associated with a large table, this method is used very frequently, but if the small table is very large, to the extent that map join may cause bugs or exceptions, special handling is required

select * from log a

left outer join users b

on a.user_id = b.user_id;

The users table has over 6 million records, distributing users to all maps is also a significant cost, and map join does not support such a large small table. If a normal join is used, it will encounter data skew problems

Solution:

select /+mapjoin(x)/* from log a

left outer join (

select /+mapjoin(c)/d.*

from ( select distinct user_id from log ) c

join users d

on c.user_id = d.user_id

) x

on a.user_id = b.user_id;

There are over a million user_ids in the log, which leads back to the original map join problem. Fortunately, the daily member uv is not too much, there are not too many members with transactions, clicks, or commissions, etc. Therefore, this method can solve many data skew problems in various scenarios

(4) Handling of sudden hot keys in business logic (real online problem)

**Example of business scenario:** In the case of traffic data, multiple device ids correspond to a single installation id. Suddenly, the number of some installation ids is especially large. In the normalization process, distribute the reduce according to the installation id and then process it. Abnormal hot keys can cause a single node to handle a large amount of data, leading to task failure due to data skew.

**Solution:** Based on the hourly task, set up an abnormal range in advance, extract the abnormal installation id and the corresponding aid, and write them to the dimension table. According to the normalization logic, the aid value is given priority as the normalization result, so in the normalization task, read the abnormal values, randomly distribute them to the reduce, and assign the aid to the normalization field, thus avoiding hot spot processing

Summary:

1. For join, use map join when the small table is not greater than 1G

2. For group by or distinct, set hive.groupby.skewindata=true

3. Try to use the above SQL statements for optimization

6. Monitoring and prevention of data skew

(1) Pay attention to data distribution during testing, and focus on different dates, key indicators, key values, and enumerated values, etc.

(2) Increase data quality monitoring, add data quality monitoring to each layer of data calculation tasks.

(3) L0 task, big data platforms need to have health inspection, score the health of tasks for resources, parameter configuration, data skew, stability, etc., to find the trend of data skew and check the task early

Third, Spark data skew

The idea of optimizing data skew in Spark, changing the join method from SMJ to BMJ, but it is only suitable for the case of large and small tables. The optimization idea is generally: change the join method, enable the spark adaptive framework, and optimize SQL.

1. Enable adaptive association optimization for data skew in sparksql

spark.shuffle.statistics.verbose=true --After opening, MapStatus will collect the number of each partition, which is used for skew processing

2. Change Sortmergejoin to BroadcastHashJoin. Increase the threshold of BroadcastHashJoin,

In some scenarios, SortMergeJoin can be converted to BroadcastHashJoin to avoid data skew caused by shuffle

Add parameters: spark.sql.autoBroadcastJoinThreshold=524288000 --Increase the threshold of BHJ to 500M 3. Optimize SQL with Hive

4. Skewed KEY search

It is necessary to combine actual business code to find the operators that cause Shuffle and find large KEYs in the following two ways.

Method one: Sample skewed KEYs through SQL.

Applicable scenarios: If the data volume is relatively small, it is convenient to verify through SQL.

Operation steps:

1. Perform quantity statistics on KEY.

2. Sort by quantity from large to small.

3. Take limit N directly.

Method two: Sample skewed KEYs through sample.

Applicable scenarios: If the data volume is large, large KEYs can be extracted through sampling. Whether large KEYs can be extracted is generally related to the proportion of data extracted.

Operation steps:

(1) Assign a value of 1 to KEY for easy counting in the next step.

(2) Accumulate KEY.

(3) Swap KEY and VALUE.

(4) Reverse sort the KEY according to the dictionary.

(5) Swap the positions of KEY and VALUE to restore to the real <KEY, VALUE>.

(6) Directly take the first N lines from the sorted RDD.

Data skew is generally caused by uneven data during Shuffle, and there are generally three types of operators that can cause Shuffle: Aggregation (groupBy), Join, and Window.

Aggregation

It is recommended to disperse the key for secondary aggregation: using hash modulo on non-constant values, unrelated to the keyDo not use rand class functions

For example using DataFrame API:

dataframe

.groupBy(col("key"), pmod(hash(col("some_col")), 100)).agg(max("value").as("partial_max"))

.groupBy(col("key")).agg(max("partial_max").as("max"))

Window

Currently supports skewed window under this mode (only supports 3.0)

select (... row_number() over(partition by ... order by ...) as rn) where rn [==|<=|<] k and other conditions spark.sql.rankLimit.enabled=true (currently supports topK calculation logic based on row_number)

Shuffled Join

Spark 2.4 parameter

spark.sql.adaptive.enabled=true

spark.shuffle.statistics.verbose=true

spark.sql.adaptive.skewedJoin.enabled=true

spark.sql.adaptive.allowAdditionalShuffle=true

If it cannot be handled, it is recommended that users locate hot data themselves for processing

Spark 3.0

spark.sql.adaptive.enabled=true

spark.sql.adaptive.skewJoin.enabled=true

spark.sql.adaptive.skewJoin.enhance.enabled=true (general skewed algorithm, can handle more scenarios)

spark.sql.adaptive.forceOptimizeSkewedJoin=true (allows additional shuffle, can handle more scenarios)

Other parameters:

spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes (default is 256MB, partitions larger than this threshold are recognized as skewed partitions, if you want to adjust the skewed partitions smaller than this threshold, you can reduce it accordingly)

spark.sql.adaptive.skewJoin.skewedPartitionFactor (default 5, a partition size must exceed the median by Xfactor to be identified as a skewed partition, generally no adjustment is needed)

spark.sql.adaptive.skewJoin.enhance.maxJoins (default 5, in the general skew algorithm, if the shuffled join exceeds this threshold, it will not be processed, generally no adjustment is needed)

spark.sql.adaptive.skewJoin.enhance.maxSplitsPerPartition (default 1000, in the general skew algorithm, try to ensure that the division of each skewed partition does not exceed this threshold, generally no adjustment is needed)

Data Inflation (Join)

spark.sql.adaptive.skewJoin.inflation.enabled=true (default false, as sampling calculation may lead to performance regression, this feature should not be enabled for normal tasks)

spark.sql.adaptive.skewJoin.inflation.factor=50 (default 100, the estimated partition output size must exceed the median by Xfactor to be identified as an inflated partition, as there may be errors in the estimation algorithm, it is generally not recommended to be less than 50)

spark.sql.adaptive.shuffle.sampleSizePerPartition=500 (default 100, the number of samples per Task, based on this sample data to estimate the partition size after Join, which can be adjusted according to the task size if it is not large)

Skew Key Detection (Join)

Due to the semantic constraints of Join, in scenarios such as A left join skewed B, it is not possible to partition the data for B, otherwise it may cause data correctness issues, which is also a difficult problem faced by the Spark project. If the above feature cannot handle data skewing, the skew key detection feature can be enabled to locate which keys cause skewing or inflation, and then proceed with filtering and other treatments.

spark.sql.adaptive.shuffle.detectSkewness=true (default false, as sampling calculation may lead to performance regression, this feature should not be enabled for normal tasks)

Other parameters:

spark.sql.adaptive.shuffle.sampleSizePerPartition=100 (default 100, the number of samples per Task, which can be adjusted according to the task size if it is not large)

你可能想看:
最后修改时间:
admin
上一篇 2025年03月30日 02:01
下一篇 2025年03月30日 02:24

评论已关闭