This is a series of posts aiming at explaining how and why to set up compressed avro in hdfs. It will be divided in a few posts, more will be coming if relevant.
- Why avro?
- How to set up avro in flume
- How to use avro with hive (this post)
- Problems and solutions
Use avro in Hive
Once your table is created, and data is loaded, there is nothing extra to do, you can just query it as you would any other table.
Create the table
Creating the table can be done as follow, with some comments:
-- table name CREATE EXTERNAL TABLE IF NOT EXISTS table_name -- Partition according to the end of the path you set in the flume sink (hdfs.path option). -- Following the example form previous post, we would have PARTITIONED BY (key STRING) -- Avro! ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat' -- Matches the first part of hdfs.path set up in the flume sink -- Following the example of the previous post, we would have LOCATION '/datain/logs' -- Other options here are to hardcode the schema or use a file on your local filesystem instead. TBLPROPERTIES ('avro.schema.url'='hdfs:///schemas/schema.avsc');
More information can be found on the cloudera documentation about hive and avro.
Load the snappy jar
To load data, you need to tell Hive that the data files will be compressed, and Hive needs to know how to decompress. For this, you need to add the snappy jar to the list of extra jars loaded by Hive. This is done by adding the path to the snappy jar to the value to the hive.aux.jars.path property of your hive-site.xml. For instance:
<property> <name>hive.aux.jars.path</name> <value>file:////usr/lib/hive/lib/hive-contrib.jar,...,file:////usr/lib/hive/lib/auxlib/snappy-java-1.0.4.1.jar</value>; </property>
Actually load data
You need to tell hive to use snappy, which is done the following way:
SET hive.exec.compress.output=true; SET mapred.output.compression.codec=org.apache.hadoop.io.compress.SnappyCodec; SET mapred.output.compression.type=BLOCK;
Then loading data means creating a new partition when a new directory is created with another key. Run this after having told hive to use snappy:
ALTER TABLE table_name ADD IF NOT EXISTS PARTITION /datain/logs/key=some_new_key LOCATION '/datain/logs';
Using data with the default schema
If you use a custom schema, tailored to your data, you can then enjoy the full speed of Hive, as not much parsing will be needed by Hive to access your data.
If you use the default schema, then Hive does not know (yet) about the columns in your table. This can be fixed by the decode() function. For instance,
SELECT hour, decode(body,'UTF-8') as body FROM my_table
Pingback: Avro end to end in hdfs – part 1: why avro? | This DWH guy
Pingback: Avro end to end in hdfs – part 2: Flume setup | This DWH guy
Pingback: Avro end to end in hdfs – part 4: problems and solutions | This DWH guy
getting following error after following the guide , can you please help.
Error: java.lang.RuntimeException: java.lang.IllegalStateException: Ambiguous input path hdfs://centosvm:9000/user/hive/warehouse/test_yaseen/key=test_part/FlumeData.1501758333514.avro
at org.apache.hadoop.hive.ql.exec.mr.ExecMapper.map(ExecMapper.java:169)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:54)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:453)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
Caused by: java.lang.IllegalStateException: Ambiguous input path hdfs://centosvm:9000/user/hive/warehouse/test_yaseen/key=test_part/FlumeData.1501758333514.avro
at org.apache.hadoop.hive.ql.exec.AbstractMapOperator.getNominalPath(AbstractMapOperator.java:114)
at org.apache.hadoop.hive.ql.exec.MapOperator.cleanUpInputFileChangedOp(MapOperator.java:452)
at org.apache.hadoop.hive.ql.exec.Operator.cleanUpInputFileChanged(Operator.java:1106)
at org.apache.hadoop.hive.ql.exec.MapOperator.process(MapOperator.java:482)
at org.apache.hadoop.hive.ql.exec.mr.ExecMapper.map(ExecMapper.java:160)
… 8 more