Large amounts of daily delivered raw data lead into a constantly increasing load and transformation time in enterprise data warehouse environments realized with classical databases. In a current client project (telco) we encounter the problem that it already exceeds the daily processing timeframe and can only be handled by dropping older data every day. Also, ad hoc queries will take hours even on optimized R-OLAP models with daily partitioning and bitmap indexing (Oracle in this case) which will prevent users from doing self service analysis. The assumption is to replace the classical database with a Hadoop cluster to provide adequate load and query time for a large growing (maybe unlimited) data set.
The Use Case
- raw data from multiple delivery systems as csv files
- time slots (granularity 15 minutes) as partition criteria
- long term inspection within big data (one source data amount per day: 400 million rows, 120 GB volume, 96 partitions )
- possibility to create ‘free’ ad hoc queries through a well know query language (SQL)
- JDBC drivers to use raw data with third party tools like SQuirrel or QlikView
- extraction of data sets with more than 1 million rows for further in-memory analysis
Our Decision for Hadoop and Hive / Impala
- Hadoop HDFS can store an unlimited amount of data
- massive parallel processing through unlimited amount of nodes
- Hive query language (HQL) as SQL dialect with less learning
- Hive meta store for schema definition, used by Impala
- improved query performance through Impala (omit Map/Reduce overhead and job execution sequence)
- Capex and Opex (this would fill a long thread indeed..)
Pitfalls and Experiences
The transforming of an existing table into a partitioned one may be complicated by the limitation of Hive partitions and memory space. We tried to import 3 days (96 slices by 15 minutes per day) with time slots and device ids as partition and sub-partition criteria which would create about 14,400,000 partitions (3 * 96 * 50,000).
If one partition criteria equals to a column in the csv file you must either transform this file during import to remove this column or be careful to use the right one in your select to get Impala not to scan the whole table.
As Impala 0.6 does not support timestamp or date types you have to store the time as a big integer (eg. Unix epoche). Also there is no chance to create a where clause with human readable (literal) date and time values. You would need to use Hive function unix_timestamp but this would omit the use of partitions.
Also be careful on querying multiple partitions with an ‘or’ conjunction. It does not use the partitions any more, you have to transfer your query into a form like ‘select … where condition1 union all select … where condition2’.
We have set up an 8 node Hadoop cluster with Cloudera distribution CDH 4.2 over Cloudera Manager 4.5 and Impala 0.6. The node hardware configuration is:
- 4 x 3.3 GHz Intel Xeon E3-1230v2 (Ivy Bridge)
- 16 GB RAM
- 2 x 1,000 GB RAID 1
We have loaded three days of raw data from one source:
- 20,832 files
- 1.2 billion records
- 370 GB file size in total
- 289 partitions
Typical analysis query:
- extract raw data slice of 3 hrs per day (for the given 3 days) for one dimension (device), maybe additional filter conditions
Query execution time including transport of result set to client application via JDBC:
- classical database: some hrs
- Cloudera Impala: < 2 min
These results are based on Impala 0.6; Impala 0.7 contains several improvements to consider in the next steps..
One more thing..
As we noticed the Apache Thrift project used by Hive and Impala for communication between client and server has deficits on transporting big data volumes. We are currently working on a solution (eg. JDBC driver) which will bypass the protocol on large result sets.