TeraSort tuning
I have done a performance tuning on a 13 node cluster recently for running TeraSort. Here're some findings.
Hardware
The environment has 16 servers in total;
The specification of each node:
- CPU: 2 * 8 Core CPU, with HT enabled; total 32 cores
- Disk: 12 * 2T Disk in total;
- the first disk is spitted into 2 partition (100G + others), 100G used as system partition, others mounted as separate folder;
- 11 other disks are mounted separately
- Mem: 16 * 8G memory; total 128 G;
Cluster configuration
1 node used as ResourceManager/NameNode, 12 ndoes used as NodeManager/DataNode;
NodeManager allocates 120G as total resource (originally only allocates 8G);
DataNode uses the 11 separately mounted disk as working folder;
Mapreduce uses the 11 separately mounted disk as working folder;
Original status
300 GB data running at about 20 minutes;
Final Status
700 GB data sorted in 11 - 12 minutes.
Findings
Note: here the findings may be specific to the TeraSort benchmark in this specific scenario.
- Make sure all data can fit in memory at reduce phase. Spilling data onto disk will decrease the reducer performance.
- We have total 120G * 12 = 1.44 TB memory, considering some overhead of the JVM and system memory usage, 1 TB data cannot be fully fit in to the memory, so at last we get to sort 700GB data in this environment.
- To achieve this, we set the mapreduce.reduce.memory.mb to higher value (8192);
- Use the memory requirement to control how many map/reduce tasks can be ran at the same time; for example
- for map, set the value mapreduce.map.memory.mb to 4G will let the MR to run at most 30 (120G/4G=30) map task on each node at the same time;
- for reduce, set the value mapreduce.reduce.memory.mb to 8G will let the MR to run at most 15 (120G/8G=15) map task on each node at the same time;
- Aside from the above mb values, you have to also set mapreduce.map.java.opts/mapreduce.reduce.java.opts to tell the child java process to use more memory than default value;
- When tasks are running with large JVM memory, GC will be a big issue;
- During our tuning, we always facing the problem of reduce task timeout after 600 ms and the NodeManager thought it was dead and killed it; this is due to heavy GC is undergoing;
- The other issue is that the default GC strategy(-XX:+UseParallelGC) will start as many threads as the number of the CPU cores; in terasort case, each reduce task will start 32 thread for GC; use -XX:ParallelGCThreads=n to set it to smaller value;
- GC strategy should also be considered; in TeraSort case, data will not be discarded until the last phase;
- The data-size is pretty large and each task will have only one or two processors on average;
- It's recommended to enable incremental mode for the concurrent collector (-XX:+UseConcMarkSweepGC)
- In MapReduce code, the InMemoryMerger behaves strange
- If the data can be put in memory, it works good;
- if only 95% of the data can be put in memory, it will spill all data to disk before the last merge;
- This is also part of reason of the above #1
- MAPREDUCE-5649 is filed. We changed this part of code and re-compiled the code to use more than 2G data;
- If this code is not changed, we had better to make sure each reducer processes less than 2GB data;
- Reducer number should be the same as the number of CPU cores ( or similar)
- Since all data are put in memory, no need to consider disk numbers;
- Reducer number can be controlled by mapreduce.reduce.memory.mb
- For CPU efficiency consideration
- If CPU usage too high, normally we should reduce the concurrent task numbers (increase memory usage)
- If CPU usage too low, we should increate the concurrent task numbers;
- mapreduce.map.java.opts should be used to controll the child java process; this value not be a little less (300-400MB) than the value mapreduce.map.memory.mb;
- similar to the param mapreduce.reduce.java.opts;
- In order to sort all data in memory for each map task,
- the value mapreduce.task.io.sort.mb should be larger than block size;
- mapreduce.map.sort.spill.percent should also be used together with the one above;
- In the reducer side, the following params should be considered
- mapreduce.reduce.shuffle.parallelcopies (default 5) this value should be increased if the system cpu wait time is not too high;
- mapreduce.reduce.shuffle.input.buffer.percent should be increased if data size is big;
- mapreduce.reduce.shuffle.merge.percent and mapreduce.reduce.shuffle.input.buffer.percent should be used to increase memory allocation;