01 Optimization of TaskManager memory model

0 25
Author: Kang Qi, JD LogisticsThis article comprehensively discusses the optimiza...

Author: Kang Qi, JD Logistics

This article comprehensively discusses the optimization methods for large-scale Flink stream jobs, combining the principles of Apache Flink with the background of JD Real-time Computing Platform (JRC). By reading this article, readers can understand the general optimization measures for Flink stream jobs and apply them to production environments.

Preface

As the industrial-level implementation of Google Dataflow Model, Apache Flink has become a de facto standard in the open-source stream computing field after many years of development. It features high throughput, low latency, native stream-batch integration, high consistency, high availability, and high scalability, while providing rich hierarchical APIs, time windows, stateful computation, and other semantic features, making it convenient for users to quickly get started with real-time development and build a real-time computing system.

As the old saying goes, 'If you want to do a good job, you must first sharpen your tools.' To ensure the efficient operation of large-scale and high-throughput Flink jobs, optimization is inevitable, and understanding the underlying principles is crucial. This article is a guide for professionals on optimizing Flink stream jobs, produced based on the author's past experience and optimization practices, combined with the background of JD Real-time Computing Platform (JRC). It mainly includes the following four aspects:

  • TaskManager Memory Model Optimization
  • Network Stack Tuning
  • RocksDB and State Tuning
  • Other tuning items

This article is based on Flink version 1.12. Before reading, it is recommended that readers have a deep understanding of Flink's basic components, programming model, and runtime.


01 TaskManager Memory Model Optimization

1.1 TaskManager Memory Model and Parameters

The current Flink TaskManager memory model was determined in version 1.10, and the following diagram is given in the official document. You can also see this diagram in the Web UI of higher version Flink.

1678852458_6411416aa321debbc7851.png!small?1678852459351


Figure 1 TaskManager Memory Model

Let's talk about this with pictures, giving a more detailed introduction than the official document for each area. `t.m.` is the abbreviation for `taskmanager. memory.` prefix.

1678852474_6411417aad9531a0142c3.png!small?1678852475533


1.2 Platform-specific Parameters

In addition to the parameters related to the TaskManager memory model, there are also other parameters provided by the platform, listed as follows.

1678852487_641141871d47b5a789a2b.png!small?1678852487889


1.3 Relationship between TM/Platform Parameters and JVM

The above parameters correspond to the following parameters of the TaskManager JVM itself:

  • -Xms | -Xmx → t. m. framework. heap. size + t. m. task. heap. size
  • -Xmn → -Xmx * apus. taskmanager. heap. newsize. ratio
  • -XX: Max Direct Memory Size → t. m. framework. off- heap. size + t. m. task. off- heap. size + $network
  • -XX: Max Metaspace Size → t. m. jvm- metaspace. size

Additionally, you can set additional parameters for the JM and TM JVM separately through the env.java.opts.{jobmanager | taskmanager} configuration items.

1.4 Memory Allocation Example

Taking the 8C / 16G TaskManager running in a production environment as an example, calculate the quotas for each memory partition manually according to the above rules. Note that some parameters are not using default values.

t.m.process.size = 16384
t.m.flink.size 
  = t.m.process.size * apus.memory.incontainer.available.ratio 
  = 16384 * 0.9 = 14745.6
t.m.jvm-metaspace.size 
  [t.m.process.size - t.m.flink.size] * apus.metaspace.incutoff.ratio 
  = [16384 - 14745.6] * 0.25 = 409.6
$overhead 
  = MIN{t.m.process.size * t.m.jvm-overhead-fraction, t.m.jvm-overhead.max} 
  = MIN{16384 * 0.1, 1024} = 1024
$network 
  = MIN{t.m.flink.size * t.m.network.fraction, t.m.network.max} 
  = MIN{14745.6 * 0.3, 5120} = 4423.68
$managed 
  = t.m.flink.size * t.m.managed.fraction 
  = 14745.6 * 0.25 = 3686.4
t.m.task.off-heap.size 
  = t.m.flink.size * apus.taskmanager.memory.task.off-heap.fraction 
  = 14745.6 * 0.01 = 147.4
t.m.task.heap.size 
  = t.m.flink.size - $network - $managed - t.m.task.off-heap.size - t.m.framework.heap.size - t.m.framework.off-heap.size 
  = 14745.6 - 4423.68 - 3686.4 - 147.4 - 128 - 128 = 6232.12

By comparing with the memory quota displayed in the Web UI, it can be found that they are completely consistent.


1678852503_6411419733e1df3115b6c.png!small?1678852503723

Figure 2 shows the memory allocation situation displayed in the Web UI

1.5 Overview of Optimization

Understanding the TaskManager memory model is the prerequisite for optimization, and the purpose of optimization is to: allocate reasonably, avoid waste, and ensure performance. First, a brief explanation of the three areas that are more likely to have problems is given below.

1. About off-heap memory of tasks

The platform's explanation is that some users' tasks require this part of memory, but from the perspective of Flink Runtime, it is mainly batch tasks (such as Sort-Merge Shuffle process) that actively use it. Conversely, stream tasks rarely involve this part, unless user code or third-party libraries referenced by the user directly operate on DirectByteBuffer or Unsafe and the like. Therefore, it is generally preferable to ensure heap memory, that is, to try to convert
Reduce apus.t.m.task.off-heap.fraction further (such as 0.05) and observe whether the job runs normally.

2. About Managed Memory

If using the RocksDB state backend and the state data volume is large or the read and write frequency is high, it is recommended to appropriately increase t.m.managed.fraction, such as 0.2~0.5, which can be coordinated with RocksDB monitoring. If not using the RocksDB state backend, it can be set to 0, because local states under other state backends will exist in the TaskManager heap memory. The next section will give a detailed explanation of the tuning items related to RocksDB.

3. About Network Buffer

It should be especially noted that the occupation of network buffers is related to parallelism and job topology, rather than the actual network traffic, so it cannot be simply set by the amount of data in the job. Roughly speaking, for simple topologies, it is recommended to start the job with the default value, observe the utilization of this area, and then make adjustments; for complex topologies, it is recommended to first appropriately increase t.m.network.fraction and max to ensure that the exception `IOException: Insufficient number of network buffers` does not occur, and then make adjustments. In addition, please do not set t.m.network.min and max to be equal values, as this will directly ignore fraction, and such direct settings are often not scientific. The next section will give a detailed explanation of the tuning of Flink's network stack.


02Network Stack Tuning

2.1 Network Stack and Network Buffer

1678852519_641141a75f1e88b49df65.png!small?1678852519895


Figure 3 Flink Network Stack

Flink's network stack is built on top of Netty. As shown in the figure above, each TaskManager can be either a Server (sender) or a Client (receiver), and the TCP connections between them will be reused to reduce resource consumption.

The small color blocks in the picture are the network buffers (NetworkBuffer), which are the most basic units of data transmission. They are allocated in the form of direct memory and carry serialized StreamRecord data. The size of a Buffer is equal to the size of a MemorySegment (t.m.segment-size, default 32KB). Each Sub-task in TM creates a network buffer pool (NetworkBufferPool) for allocating and recycling Buffers. The following will explain the allocation rules of network buffers.

2.2 Network Cache Allocation Rules

The execution plan of Flink stream jobs is represented by three-layer DAGs, namely: StreamGraph (logical plan) → JobGraph (optimized logical plan) → ExecutionGraph (physical plan). When the ExecutionGraph is actually scheduled to be executed on the TaskManager, the structure shown in the figure below is formed.

1678852531_641141b3ec17b11fa80cf.png!small?1678852532768


Figure 4 Flink Physical Execution Graph Structure

Each Sub-task has a set of components for data exchange, the output side is called ResultPartition (RP), and the input side is called InputGate (IG). In addition, they are also divided into sub-blocks according to the parallelism and upstream and downstream DistributionPattern (POINTWISE or ALL_TO_ALL), respectively called ResultSubpartition (RS) and InputChannel (IC). Note that the ratio of upstream and downstream RS and IC is strictly 1:1. The network cache is allocated at the level of ResultPartition and InputGate, and the specific allocation rules are:

  • #Buffer-RP = #RS + 1 && #Buffer-RS <= t.network.m.max-buffers-per-channel (10)
  • #Buffer-IG = #IC * t.network.m.buffers-per-channel (2, exclusive) + t.network.m.floating-buffers-per-gate (8, floating)

Translation:

  • The total number of buffers allocated by the sender RP is the number of RS + 1, and to prevent skew, the number of buffers each RS can obtain cannot be more than taskmanager.network.memory.max-buffers-per-channel (default value 10);
  • The number of buffers each IC has exclusively at the receiving end is taskmanager.network.memory.buffers-per-channel (default value 2), and the additional floating buffers that IG can provide are taskmanager.network.memory.floating-buffers-per-gate (default value 8).

Saying more, the mechanism shown in the figure above is also the foundation for Flink's implementation of Credit-based flow control (backpressure). Think about the `**PoolUsage` parameter when diagnosing backpressure, and you will understand. Backpressure is a relatively basic topic, and there is no need to expand on it here.

To repeat the sentence from the previous section: The amount of network cache usage is related to parallelism and job topology, rather than the actual network traffic. Particularly, due to the ALL_TO_ALL distribution (such as Hash, Rebalance), it can produce O(N^2) level of RS and IC, so the demand for Buffer is also greater. Of course, it is almost impossible to calculate the number of Buffers by looking at complex topology diagrams with the naked eye, so the best method is to quickly try and error, and here is an example.

2.3 Network Cache Optimization Example

This section takes an example job from the test environment (hereinafter referred to as 'example job') as an example.

This job has 54 TM with specifications of 8C / 16G, with a parallelism of 400, running 4330 sub-tasks, and including a large number of keyBy operations. The initial setting was t.m.network.fraction = 0.2 & t.m.network.max = 3GB, which resulted in an IOException: Insufficient network buffers exception; the job was started normally after setting t.m.network.fraction = 0.3 & t.m.network.max = 5GB, with an actual allocation of 4.32GB, and the occupancy rate fluctuated between 73%~78% (refer to the previous Web UI diagram). This allocation is clearly better than the original job's fraction = 0.5 & min = max = 8GB.

Some may ask: Can the idle Network area memory be used for other purposes? The answer is no. At the start of the job, all the memory in the Network area will be initialized into Buffer and allocated to RP and IG according to the quota mentioned in the previous section. The Buffer in the 'Netty Shuffle Buffers → Available' column in the Web UI can basically be considered wasted. Therefore, when the job encounters a bottleneck, blindly increasing the network cache is harmful and does not help with throughput.

2.4 The Easily Overlooked Cache Timeout

There are three opportunities for the network cache to be flushed to the downstream in the sending end: when the buffer is full, when the timeout time is reached, or when encountering a special marker (such as Checkpoint Barrier). The reason for designing cache timeouts is to avoid the situation where the buffer is always unable to be filled, leading to downstream processing delays. This can be achieved by using `
The `StreamExecutionEnvironment#setBufferTimeout` method or the `execution.buffer-timeout` parameter can be used to set the buffer timeout, with a default of 100ms, which is generally not required to be changed.

1678852544_641141c0964ad8bc63090.png!small?1678852545113


Figure 5: Filling and sending of the cache

However, considering jobs with high parallelism and a large number of ALL_TO_ALL exchanges, the data is relatively scattered, and the buffer of each ResultSubpartition will not be filled up quickly. A large number of Flush operations will unnecessarily consume CPU. At this point, it is advisable to increase the cache timeout and reduce the Flush frequency, which can effectively reduce CPU Usage. Taking the aforementioned job as an example, setting the cache timeout to 500ms and keeping other parameters unchanged, the average CPU Usage during the stable consumption stage of TM was reduced by 40%, with remarkable results. Of course, this is still a trade-off with downstream latency, so jobs with extremely sensitive timeliness are not suitable for this optimization.

2.5 Network fault tolerance

The platform uses the deployment method of Flink on Kubernetes, but Kubernetes network virtualization (Calico, Flannel, etc.) may result in a loss of network performance. Therefore, for high traffic or complex jobs, it is necessary to improve network fault tolerance. The following are three related parameters.

1. taskmanager.network.request-backoff.max

The default value is 10000 (Community Edition) / 60000 (Platform), representing the maximum exponential backoff duration for downstream InputChannel requests to upstream ResultSubpartition, measured in milliseconds. If the request fails, it will throw an error.
PartitionNotFoundException: Partition xx@host not found, should be adjusted appropriately, such as to 240000. Note that this error is unrelated to Kafka Partition, and should not be confused with it.

2. akka.ask.timeout

The default value is 10s (Community Edition) / 60s (Platform), representing the timeout for Akka Actor's Ask RPC to return a result. If there is network congestion or a complex topology, an error message such as AskTimeoutException: Ask timed out on Actor akka://xx after xx ms may appear. It is advisable to increase this value, such as to 120s. Note that long-time GC may also cause this problem, so attention should be paid to troubleshooting.

3. heartbeat.timeout

The default value of 50000 indicates the timeout for the sending/receiving of heartbeat signals between the JobManager and TaskManager, measured in milliseconds. Similar to akka.ask.timeout, if a TimeoutException: Heartbeat of TaskManager with id xx timed out occurs, it is recommended to adjust this value appropriately.


03RocksDB and State Tuning

3.1 FRocksDB in Flink


1678852559_641141cfe8a3248e833d8.png!small?1678852560481

Figure 6: FRocksDB Read and Write Process

Flink RocksDB state backend uses a branch version named FRocksDB, maintained by Ververica. Its read and write process is basically the same as the original version, as shown in the figure above, where MemTable and BlockCache are respectively the read and write cache. Particularly, since Flink will persist the RocksDB data snapshot to the file system at each Checkpoint cycle, there is no need to write the Write-Ahead Log (WAL).

Each Slot in TM has a RocksDB instance, and under the traditional method, each Column Family (CF) corresponds to a set of MemTable, BlockCache, and SST. In a Flink job, a StateHandle applied for - that is, `Runtime Context# get... State (State Descriptor)` - corresponds to a Column Family named after the StateDescriptor. It is obvious that the names of StateDescriptors within the same job cannot be duplicated.

3.2 RocksDB Managed Memory Mechanism

The above traditional method has a significant drawback, that is, the memory of RocksDB is almost uncontrolled (because Flink does not limit the number of StateHandle that users can apply for). Therefore, Flink in version 1.10 utilizes the WriteBufferManager and LRUCache cooperative mechanism proposed by RocksDB 5.6+ to achieve full managed RocksDB memory management, as shown in the following figure.


1678852574_641141dee3a908c63f1c5.png!small?1678852575467

Figure 7: Full Managed RocksDB Memory Management

The托管内存机制默认启用(state.backend.rocksdb.memory.managed = true),此时TM会将整块Managed Memory区域作为所有RocksDB实例共用的BlockCache,并通过WriteBufferManager将MemTable的内存消耗向BlockCache记账(即写入只有size信息的dummy块),从而BlockCache能够感知到全部的内存使用并施加限制,避免OOM发生。SST索引和Bloom Filter块则会进入BlockCache的高优先级区。需要注意,由于历史原因以及Iterator-pinned Blocks的存在,BlockCache在少数情况下不能严格限制内存,故有必要配置一些JVM Overhead作为后备。

The managed memory is evenly distributed among various Slots by default, and users can also adjust it through}}
The s.b.r.memory.fixed-per-slot parameter is used to manually set the managed memory quota for each Slot, but it is generally not recommended. In addition to this, the two adjustable parameters are as follows.

  • s.b.r.memory.write-buffer-ratio: The ratio of MemTable memory to managed memory, default value 0.5;
  • s.b.r.memory.high-prio-pool-ratio: The ratio of high-priority area memory to managed memory, default value 0.1.

The remaining part (default 0.4) is allocated for the data BlockCache. Users generally do not need to change them, and if the job status is particularly heavy for reading or writing, they can be appropriately adjusted, but it must be ensured that the managed memory is sufficient first.

3.3 Other RocksDB parameters

1.s.b.r.checkpoint.transfer.thread.num (default 1)

The number of threads that each stateful operator transmits data during Checkpoint, increasing this value will have higher requirements on network and disk throughput. Generally, it is recommended to be 4~8, and it has been changed to 4 by default in version 1.13.


2.s.b.r.timer-service.factory (default ROCKSDB for the community version, default HEAP for the platform)

The storage location of Timer-related status, including Timers registered by users and Timers registered internally by the framework (such as Window, Trigger). If stored in the heap, the Timer status cannot perform asynchronous Snapshot when doing CP, so it is better to store Timers in RocksDB in many cases. However, the drawback is that setting it to ROCKSDB may have a very rare serialization bug, which leads to the inability to recover the state from Savepoint. If it cannot be accepted, it is recommended to use HEAP.


3.s.b.r.predefined-options (default DEFAULT)

The predefined RocksDB optimization parameter sets provided by the community, which include 4 types: DEFAULT, SPINNING_DISK_OPTIMIZED,
SPINNING_DISK_OPTIMIZED_HIGH_MEM, FLASH_SSD_OPTIMIZED (the names are all self-explanatory). This parameter is easy to ignore, but it is strongly recommended to set it, as it offers good performance benefits compared to the default values. If the status volume of a single slot reaches the GB level and the managed memory is sufficient, setting it to SPINNING_DISK_OPTIMIZED_HIGH_MEM is the best. For other cases, setting it to SPINNING_DISK_OPTIMIZED is sufficient.

In addition to the above parameters, it is recommended to follow the advice of the RocksDB Wiki ("No need to tune it unless you see an obvious performance problem"), and do not manually adjust the advanced parameters of RocksDB (such as s.b.r.{block | writebuffer | compaction}.*), unless there are problems that cannot be solved by the memory management mechanism. The author has also listed some advanced parameters for reference.

1678852587_641141ebcd0d348252d28.png!small?1678852588464


Figure 8: Advanced Parameters of RocksDB

The items marked with a line will be overridden by the memory management mechanism. If, after careful consideration, it is necessary to fine-tune RocksDB, then s.b.r.memory.managed should be set to false, and the user must bear the possible OOM risk.

3.4 RocksDB Monitoring & Tuning Example

Before the large-scale task is officially launched, it is necessary to open some necessary RocksDB monitoring to observe whether there are performance bottlenecks. Enabling monitoring has a certain impact on the state read and write performance, and the following 6 items are generally recommended:

  • s.b.r.metrics.{block-cache-capacity | block-cache-usage | cur-size-all-mem-tables | mem-table-flush-pending | num-running-flushes | num-running-compactions} = true

After observing and solving the problem, please make sure to close them.

1678852603_641141fb7b8f4cf46590f.png!small?1678852604045


Figure 9: RocksDB Monitoring of Example Task

The above figure shows a part of the RocksDB Metrics charts for the example task, which is normal. If heavy operations such as Flush and Compaction are particularly frequent during the stable consumption phase, to the extent that the points in the chart are connected into lines, it generally indicates that RocksDB has encountered a bottleneck. However, it is a normal phenomenon for the托管内存 (i.e., BlockCache) to occupy 100%, and there is basically no need to worry.

As a reference, the incremental Checkpoint size of this job is about 15G, with the daily intake of tens of billions of state data, the parameter settings are: t. m. managed. fraction = 0.25 (actual allocated managed memory is 3.6G), s. b. r. predefined- options = SPINNING_ DISK_ OPTIMIZED, s. b. r. checkpoint. transfer. thread. num = 8. It performs well. Before optimization, the t. m. managed. fraction of the job was the default 0.1, and some unnecessary modifications were made to the advanced parameters of RocksDB, resulting in poor performance.

3.5 State TTL

The state TTL of RocksDB needs to be implemented with the help of CompactionFilter, as shown in the following figure.

1678852613_641142059be470530b17c.png!small?1678852614287


Figure 10 State TTL Principle

By calling the method `State Ttl Config# cleanupIn Rocksdb Compact Filter (N)`, users can set the timestamp for updating the CompactionFilter record after accessing the state N times. When the SST performs a Compaction operation, it will check whether the state key-value pairs have expired based on the timestamp and delete them. Note that if the state is accessed very frequently, the N value should be appropriately increased (the default is only 1000) to prevent affecting the Compaction performance.

3.6 State Scaling and Maximum Parallelism

When the parallelism of a job changes and recovers from CP / SP, it involves the issue of state scaling. Flink organizes Keyed State data by KeyGroup, with each key calculated by two Murmur Hashes to determine which KeyGroup it should fall into, and each Sub-task is assigned to one or more KeyGroups. As shown in the following figure, changes in parallelism only affect the allocation of KeyGroups, and the state recovery process can be approximated as sequential read to improve efficiency.

1678852623_6411420fcef2fabfcb465.png!small?1678852624711


Figure 11 Scaling of Keyed State

The number of KeyGroups is the same as the maximum parallelism, and changes in the maximum parallelism can cause the job to fail to recover from CP / SP, so it should be set cautiously. If the user does not explicitly set it, it will be calculated according to the following rules:

128 <= round Up To Power Of Two (operator Parallelism * 1.5) <= 32768

It is obvious that this is not safe. Assuming the parallelism of a task is 200, the calculated maximum parallelism is 512; if the parallelism is increased to 400, the calculated maximum parallelism will become 1024. Therefore, it is always recommended to explicitly set a reasonable maximum parallelism.

3.7 State local recovery

State local recovery is disabled by default, and it can be enabled by setting
The state.local-recovery = true is enabled, but it can only act on Aligned Checkpoint and Keyed State. After enabling, two snapshots are generated each time a CP is produced: Primary (remote DFS) and Secondary (local disk), and the failure of Secondary CP will not affect the entire CP process. When the job recovers, it first tries to recover the state from the valid Secondary snapshot, which can significantly improve the recovery speed. If the Secondary snapshot is not available or incomplete, it will fallback to Primary recovery. As shown in the figure below.

1678852633_64114219921b7ae3c4369.png!small?1678852634165


Figure 12 State local recovery

Local state recovery will introduce additional disk consumption: non-incremental CP will double the disk usage; incremental CP does not consume more space due to the native reference counting mechanism, but because the data is more scattered, IOPS will increase accordingly.


04Other tuning items

4.1 Checkpoint related

Readers should be familiar with the configuration items related to Checkpoint, here only two points are mentioned: first, checkpointTimeout is set according to the characteristics of the job, but it should not be too long to prevent CP from being stuck and covering up the problems of the job itself (such as data skewness); second, it is necessary to set
minPauseBetweenCheckpoints, to avoid the performance degradation caused by operators being in the CP process all the time. The settings of the example job are: checkpointInterval = 3min / checkpointTimeout = 15min / minPauseBetweenCheckpoints = 1min.

In addition, a common phenomenon encountered in large state jobs is that after all Checkpoints are acked, it gets stuck in IN_PROGRESS and takes about 1 to 3 minutes to become COMPLETED, as shown in the figure below.


1678852652_6411422c5b0d47c9f99aa.png!small?1678852652876

The phenomenon of Checkpoint stuck in IN_PROGRESS state in Figure 13

This is because the communication between TaskManager and HDFS is not smooth, or the pressure on HDFS itself leads to the failure of data block writing. Flink must ensure the integrity of Checkpoint, that is, it can be marked as COMPLETED only when all snapshot data are successfully written after retrying. Readers can find exception information in the TM log, such as Exception in createBlockOutputStream: Connect timed out.

4.2 Object Reuse

Object reuse is not very prominent in Flink configuration, but it is quite useful. Flink will combine operators that meet certain conditions into operator chains (OperatorChain) when generating JobGraph, and all Sub-tasks chained together will be executed in the same TM Slot. The essence of object reuse is to use the shallow copy of the object emitted by the upstream operator directly in the downstream operator within the operator chain.


1678852666_6411423a7137af9d3d7b8.png!small?1678852666979

Figure 14: Schematic of operator chain

As shown in the figure, if object reuse is not enabled, the dashed line in the operator chain is the default CopyingChainingOutput (deep copy). By enabling object reuse through `ExecutionConfig#enableObjectReuse()` or pipeline.object-reuse = true, CopyingChainingOutput will be replaced with ChainingOutput (shallow copy). The following figure shows the difference between the two.


1678852677_6411424580071fd1d8509.png!small?1678852678058

Figure 15: Difference between object reuse and non-reuse

Generally, it is not recommended to enable object reuse for DataStream API jobs unless it is very certain that there is no situation where the downstream operator directly modifies the object emitted by the upstream operator. Moreover, the benefit of enabling object reuse in DataStream API jobs is not high, and only when there are complex data type definitions, there will be a performance improvement of about 20%.

However, it is strongly recommended to enable SQL jobs because Flink SQL's type system is different from DataStream API, and the deep copy cost of StringData, MapData, etc., is very high, and the code generator of Flink SQL can ensure the safety of mutable objects. Test results show that object reuse SQL jobs can achieve an average performance improvement of doubling.

4.3 Don't forget about JobManager

Compared to TaskManager, the configuration of JobManager is often more worry-free, as it seems that just giving a configuration of 2C / 4G can ensure peace of mind. In fact, JobManager internally maintains many components, such as: job DAG, that is, {Job | Execution}Graph, SlotPool & Scheduler, the mapping relationship of <TaskManagerLocation, TaskExecutorGateway>, CheckpointCoordinator, HeartbeatManager, ShuffleMaster, PartitionTracker, and so on.

Therefore, if the job has many Slot / Sub-task, large Checkpoint, or heavy Shuffle batch jobs, it is necessary to appropriately increase the resources of the JobManager. Recently, there have been two jobs in the author's department that frequently report the exception message 'ResourceManager leader changed to new address null', which is due to the excessive pressure on the JM and too long GC time, leading to the failure of ZooKeeper Session. Taking the example job's JM (4C / 8G) as an example, its memory allocation is as follows.

1678852690_64114252775ccdcb8c8cf.png!small?1678852690952


Figure 16 Example Job JobManager Memory Allocation

4.4 Other Tips

  • Starting from Flink 1.12, the default time semantics have become event time. If the job is processing time semantics, you can disable watermark emission, i.e., `Execution Config# set Auto WatermarkInterval (0)`.
  • Setting metrics.latency.interval (in milliseconds) can periodically insert LatencyMarker, which is used to measure the latency of each operator and the entire pipeline. Processing LatencyMarker will consume resources, so it is not necessary to do it too frequently; about 60000 is appropriate.
  • User-registered Timers will be deduplicated by <key, timestamp> and stored internally using a min-heap. Therefore, it is best to avoid onTimer storms, that is, a large number of Timers with the same timestamp trigger at the same time, causing performance fluctuations.
  • If you need to exchange data types that Flink does not have native Serializer support for (such as HyperLogLog, RoaringBitmap), you should register a custom Serializer in the code to avoid falling back to Kryo, which may lead to performance degradation.
  • POJO types support state schema changes, and adding or deleting fields will not affect recovery (new fields will be initialized with default values). However, please note that you cannot modify the data type of the fields or the class name of the POJO.

05 References

  • Flink official documentation:
  • https://nightlies.apache.org/flink/flink-docs-release-1.12/
  • Flink source code:
  • https://github.com/apache/flink
  • FRocksDB source code:
  • https://github.com/ververica/frocksdb
你可能想看:

2. The International Criminal Police Organization arrests more than 1,000 network criminals from 20 countries, seize 27 million US dollars

A Brief Discussion on the Establishment of Special Security Management Organizations for Operators of Key Information Infrastructure

In today's rapidly developing digital economy, data has become an important engine driving social progress and enterprise development. From being initially regarded as part of intangible assets to now

Analysis of Linux memory leak cases and sharing of memory management

Exploration and practice of optimizing the file size of Android dynamic link libraries

(3) Is the national secret OTP simply replacing the SHA series hash algorithms with the SM3 algorithm, and becoming the national secret version of HOTP and TOTP according to the adopted dynamic factor

As announced today, Glupteba is a multi-component botnet targeting Windows computers. Google has taken action to disrupt the operation of Glupteba, and we believe this action will have a significant i

d) Adopt identification technologies such as passwords, password technologies, biometric technologies, and combinations of two or more to identify users, and at least one identification technology sho

Enterprise Data Compliance for International Expansion: The Difference Between Anonymization and Anonymity

Article 2 of the Cryptography Law clearly defines the term 'cryptography', which does not include commonly known terms such as 'bank card password', 'login password', as well as facial recognition, fi

最后修改时间:
admin
上一篇 2025年03月24日 23:29
下一篇 2025年03月24日 23:52

评论已关闭