I have done a performance tuning on a 13 node cluster recently for running TeraSort. Here're some findings.

Hardware

The environment has 16 servers in total; 
The specification of each node:
  1. CPU: 2 * 8 Core CPU, with HT enabled; total 32 cores
  2. Disk: 12 * 2T Disk in total;
    1. the first disk is spitted into 2 partition (100G + others), 100G used as system partition, others mounted as separate folder;
    2. 11 other disks are mounted separately
  3. Mem: 16 * 8G memory; total 128 G;

Cluster configuration

1 node used as ResourceManager/NameNode, 12 ndoes used as NodeManager/DataNode;
NodeManager allocates 120G as total resource (originally only allocates 8G);
DataNode uses the 11 separately mounted disk as working folder;
Mapreduce uses the 11 separately mounted disk as working folder;

Original status

300 GB data  running at about 20 minutes;

Final Status

700 GB data sorted in 11 - 12 minutes.

Findings

Note: here the findings may be specific to the TeraSort benchmark in this specific scenario.
  1. Make sure all data can fit in memory at reduce phase. Spilling data onto disk will decrease the reducer performance.
    1. We have total 120G * 12 = 1.44 TB memory, considering some overhead of the JVM and system memory usage, 1 TB data cannot be fully fit in to the memory, so at last we get to sort 700GB data in this environment.
    2. To achieve this, we set the mapreduce.reduce.memory.mb to higher value (8192);
  2. Use the memory requirement to control how many map/reduce tasks can be ran at the same time; for example
    1. for map, set the value mapreduce.map.memory.mb to 4G will let the MR to run at most 30 (120G/4G=30) map task on each node at the same time;
    2. for reduce, set the value mapreduce.reduce.memory.mb to 8G will let the MR to run at most 15 (120G/8G=15) map task on each node at the same time;
  3. Aside from the above  mb values, you have to also set mapreduce.map.java.opts/mapreduce.reduce.java.opts to tell the child java process to use more memory than default value;
  4. When tasks are running with large JVM memory, GC will be a big issue;
    1. During our tuning, we always facing the problem of reduce task timeout after 600 ms and the NodeManager thought it was dead and killed it; this is due to heavy GC is undergoing;
    2. The other issue is that the default GC strategy(-XX:+UseParallelGC) will start as many threads as the number of the CPU cores; in terasort case, each reduce task will start 32 thread for GC; use -XX:ParallelGCThreads=n to set it to smaller value;
    3. GC strategy should also be considered; in TeraSort case, data will not be discarded until the last phase;
      1. The data-size is pretty large and each task will have only one or two processors on average;
      2. It's recommended to enable incremental mode for the concurrent collector (-XX:+UseConcMarkSweepGC) 
  5. In MapReduce code, the InMemoryMerger behaves strange
    1. If the data can be put in memory, it works good;
    2. if only 95% of the data can be put in memory, it will spill all data to disk before the last merge;
    3. This is also part of reason of the above #1
  6. MAPREDUCE-5649 is filed. We changed this part of code and re-compiled the code to use more than 2G data;
    1. If this code is not changed, we had better to make sure each reducer processes less than 2GB data;
  7. Reducer number should be the same as the number of CPU cores ( or similar)
    1. Since all data are put in memory, no need to consider disk numbers;
    2. Reducer number can be controlled by mapreduce.reduce.memory.mb
  8. For CPU efficiency consideration
    1. If CPU usage too high, normally we should reduce the concurrent task numbers (increase memory usage)
    2. If CPU usage too low, we should increate the concurrent task numbers;
  9. mapreduce.map.java.opts should be used to controll the child java process; this value not be a little less (300-400MB) than the value mapreduce.map.memory.mb;
    1. similar to the param mapreduce.reduce.java.opts;
  10. In order to sort all data in memory for each map task, 
    1. the value mapreduce.task.io.sort.mb should be larger than block size;
    2. mapreduce.map.sort.spill.percent should also be used together with the one above;
  11. In the reducer side, the following params should be considered
    1. mapreduce.reduce.shuffle.parallelcopies (default 5) this value should be increased if the system cpu wait time is not too high;
    2. mapreduce.reduce.shuffle.input.buffer.percent should be increased if data size is big;
    3. mapreduce.reduce.shuffle.merge.percent and mapreduce.reduce.shuffle.input.buffer.percent should be used to increase memory allocation;