This is a situation often popping up here and there and which I had recently. Given a bunch of files on hdfs, how to get the total line count?
It turns out that there are a lot of options which I will explain here. The goal is to count the lines of all csv files in a a specific directory, $d.
After ll the solution I will show some benchmarking.
Options
The sysadmin way
Set up a fuse mountpoint to hdfs (I always do that, it is very stable, uses no resource and helps for many things) at $mountpoint.
wc -l $mountpoint$d/*.csv | grep total
Easy and sweet, but has the drawback that all data is downloaded locally before being processed.
The dutiful way
We are using hadoop, so let’s use hadoop command line.
hdfs dfs -ls $d/\*.csv | awk '{print $8}' | xargs hdfs dfs -cat | wc -l
Note that this use xargs. If you have too many files you might need to build a loop in, but invoking the hdfs command many times will incur a lot of jvm startup time penalty.
The issue here is as well that all data is downloaded locally.
The old school way
Let’s just use a streaming job:
hdfs hadoop jar $HADOOP_MAPRED_HOME/hadoop-streaming.jar \ -Dmapred.reduce.tasks=1 \ -input $d/\*.csv \ -output /tmp/streamout \ -mapper "bash -c 'paste <(echo "count") <(wc -l) '" \ -reducer "bash -c 'cut -f2 | paste -sd+ | bc'"
The only issue with a streaming job is that the output is written in a file on hdfs (the -output parameter) not on stdout.
The tormented soul
Streaming jobs are nice but this bash is ugly and python is cool for data, right?
import sys cnt=0 for l in sys.stdin: cnt+=1 print("count\t{}".format(cnt))
import sys cnt=0 for l in sys.stdin: cnt+=l.split('\t')[1] print(cnt)
And then just run.
hadoop jar $HADOOP_MAPRED_HOME/hadoop-streaming.jar \ -Dmapred.reduce.tasks=1 \ -input $d/\*.csv \ -output /tmp/streamout \ -mapper "/usr/bin/python3 mapper.py $*" \ -reducer "/usr/bin/python3 reducer.py $*"
Connoisseur
Hey do not forget about pig!
csvs = LOAD '$d/*.csv'; csvgroup = GROUP csvs ALL; cnt = FOREACH csvgroup GENERATE COUNT(csvs); dump cnt;
Run with:
pig cnt.pig
Hipster
Pyspark is all the rage
f=sc.textFile("$d/*.csv") cnt=f.count() print cnt
Benchmarking
Ok this was fun, but what works best?
I ran 2 sets of test, one on a directory with a few (~5) big files, one on a directory with many (250) tiny files. I ran the count 10 times for each test, the results are in seconds and lower is better.
Few big files | Many small files | ||||||
Mean | Median | Stdev | Mean | Median | Stdev | ||
Sysadmin | 49.6 | 46.5 | 12.17 | 1 | 1 | 0 | |
Dutiful | 49.8 | 46.5 | 11.71 | 5.1 | 5 | 0.32 | |
Connoisseur | 25.7 | 25 | 2.31 | 17.2 | 17 | 0.42 | |
Old school | 21.5 | 21.5 | 0.53 | 47.7 | 48 | 1.06 | |
Tormented | 28.3 | 28 | 0.82 | 39.9 | 39.5 | 1.1 | |
Hipster | 20.2 | 20 | 1.03 | 9.4 | 9 | 0.52 |
A few things are apparent:
- Different use cases will call for different options,
- When data is downloaded (sysadmin via the mountpoint or dutiful via hdfs -cat) the variance is higher. Weirdly enough for my tests the both started at about 37 seconds, and increased at every run by 5 seconds to stay stable at 63 seconds,
- pyspark is the clear winner.