Where exactly is this block on HDFS?

This post will show you how to find out where a specific hdfs block is: on which server and on which disk of this server.

Context

I needed to decommission a directory from hdfs (updating dfs.datanode.data.dir). This is not a big deal because the default replication factor is 3. Removing a disk would just trigger a rebalance.

Safety checks

Just for safety, I first wanted to check if all blocks were properly replicated. This is easy to check with the following command:

 hdfs fsck / -files -blocks -locations | grep repl=1 -B1

What does it do?

  • hdfs fsck /
    • run hdfs checks from the root
  •  -files -blocks -locations
    • Display file names, block names and location
  • | grep repl=1
    • show only blocks with replication 1
  • -B1
    • But please display the previous line as well to get the actual file name

If you’re good (all files are properly replicated) you would get an empty output. Otherwise, you get a bunch of those lines in the output:

/a/dir/a/file 2564 bytes, replicated: replication=1, 1 block(s): OK
0. BP-1438592571-10.88.112.28-1502096897275:blk_1077829561_4348908 len=2564 Live_repl=1 [DatanodeInfoWithStorage[10.1.2.3:9866,DS-f935a126-2226-4ef8-99a6-20d700f06110,DISK]]
--
/another/dir/another/file 2952 bytes, replicated: replication=1, 1 block(s): OK
0. BP-1438592571-10.88.112.28-1502096897275:blk_1077845856_4366930 len=2952 Live_repl=1 [DatanodeInfoWithStorage[10.2.3.4:9866,DS-1d065d48-f887-4ed5-be89-5e9c79633519,DISK]]

Technically, for me this was an error, which I could fix by forcing the replication to 3:

hdfs dfs -setrep 3 /a/dir/a/file

Where are my blocks?

In other words, are there unreplicated blocks on the disk I am about to remove?

There might be good reasons to have a replication factor of 1, and you then want to be sure that none of the blocks are on the disk you will remove.  How can you do that?

Looking at the output of the previous command, specifically the DatanodeInfoWithStorage bit, you can find out some interesting information already:

10.2.3.4:9866,DS-1d065d48-f887-4ed5-be89-5e9c79633519,DISK
  • 10.2.3.4:9866 this is the server where the block is, 9866 is the default datanode port,
  • DISK: good, the data is stored on disk,
  • DS-1d065d48-f887-4ed5-be89-5e9c79633519: this looks like a disk ID. What does it mean?

Looking at the source on github does not help much: this is a string, named storageID. What now?

It turns out that this storage ID is in a text file on every directory listed in dfs.datanode.data.dir. Look at one of those, you will find the file current/VERSION, which looks like:

#Tue Apr 07 13:49:10 CEST 2020
storageID=DS-dc5bed87-addb-4575-b2e3-6cbb114e4700
clusterID=cluster16
cTime=0
datanodeUuid=b0d3af53-3320-4833-8063-a13720f84bae
storageType=DATA_NODE
layoutVersion=-57

And there you are, there is the storageID, which matches what was displayed via the hdfs command.

This was the missing link to exactly know on which disk you block was.

Avro end to end in hdfs – part 4: problems and solutions

This is a series of posts aiming at explaining how and why to set up compressed avro in hdfs. It will be divided in a few posts, more will be coming if relevant.

  1. Why avro?
  2. How to set up avro in flume
  3. How to use avro with hive
  4. Problems and solutions (This post)

Invalid/non standard schemas

The avro tools available for different languages are not all exactly equivalent. The default one for java used in Hadoop, for instance, has issues when some fields can be set to null. Nested array are another issues in a lot of cases. The default avro parser from java cannot handle them properly. Furthermore, if you end up finding a way to generate avro files with nested arrays, some tools will not be able to read them. Hive will be fine, but Impala (as of version 1.2) is not able to read them.

I can only urge you to use simple schemas, this will make your life a lot easier.

Hive partitions and schema changes

If you use Hive partitions (and you should), all data in one specific partition must have the same schema. We used to have partitions per hour when loading some logs, but now we are actually adding the avro schema version in the partition path. That way, data encoded in a new schema will end up in a different partition even if data is related to the same hour.

Faster encoding and flexibility

We started loading data the standard way, via flume. This created a lot of issues as explained earlier (nested arrays mostly), and flume was actually using a lot of resources. We ended up using the json2avro C tool, which is very fast and can handle nested arrays (but this bit us later because of impala). This tool generates avro files which we load in hdfs via a hdfs fuse mount point. This improved performance drastically. Since we are using this fuse mountpoint, we had no data loading issues or delay, whereas we had trouble every other week while using flume.

Default values

We started with writing a schema with default values. Sadly, we ended up noticing that JSON is only a convenient representation of data useful for debugging but is not the main purpose of avro.

This means that representing a missing source field in an avro schema can be done that way:

{"valid": {"boolean": true}, "source": null}

but a JSON document actually missing this field is not valid.

Avro end to end in hdfs – part 3: Hive

This is a series of posts aiming at explaining how and why to set up compressed avro in hdfs. It will be divided in a few posts, more will be coming if relevant.

  1. Why avro?
  2. How to set up avro in flume
  3. How to use avro with hive (this post)
  4. Problems and solutions

Use avro in Hive

Once your table is created, and data is loaded, there is nothing extra to do, you can just query it as you would any other table.

Create the table

Creating the table can be done as follow, with some comments:

-- table name
CREATE EXTERNAL TABLE IF NOT EXISTS table_name

-- Partition according to the end of the path you set in the flume sink (hdfs.path option).
-- Following the example form previous post, we would have
PARTITIONED BY (key STRING)

-- Avro!
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'

-- Matches the first part of hdfs.path set up in the flume sink
-- Following the example of the previous post, we would have
LOCATION '/datain/logs'

-- Other options here are to hardcode the schema or use a file on your local filesystem instead.
TBLPROPERTIES ('avro.schema.url'='hdfs:///schemas/schema.avsc');

More information can be found on the cloudera documentation about hive and avro.

Load the snappy jar

To load data, you need to tell Hive that the data files will be compressed, and Hive needs to know how to decompress. For this, you need to add the snappy jar to the list of extra jars loaded by Hive. This is done by adding the path to the snappy jar to the value to the hive.aux.jars.path property of your hive-site.xml. For instance:

<property>
  <name>hive.aux.jars.path</name>
  <value>file:////usr/lib/hive/lib/hive-contrib.jar,...,file:////usr/lib/hive/lib/auxlib/snappy-java-1.0.4.1.jar</value>;
</property>

Actually load data

You need to tell hive to use snappy, which is done the following way:

SET hive.exec.compress.output=true;
SET mapred.output.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
SET mapred.output.compression.type=BLOCK;

Then loading data means creating a new partition when a new directory is created with another key. Run this after having told hive to use snappy:

ALTER TABLE table_name
ADD IF NOT EXISTS PARTITION /datain/logs/key=some_new_key
LOCATION '/datain/logs';

Using data with the default schema

If you use a custom schema, tailored to your data, you can then enjoy the full speed of Hive, as not much parsing will be needed by Hive to access your data.

If you use the default schema, then Hive does not know (yet) about the columns in your table. This can be fixed by the decode() function. For instance,

SELECT
hour, decode(body,'UTF-8') as body
FROM my_table