jump to navigation

Technical Architect and the challenge of Big Data Architecture March 23, 2015

Posted by Mich Talebzadeh in Big Data.
trackback

In the previous articles I gave an introduction to Big Data and Big Data Architecture. What makes Big Data Architecture challenging is the fact that there are multiple sources of data (Big Data Sources) and one storage pool, namely the Apache Hadoop Distributed File System (HDFS) as part of Data conversion and storage layer as shown in diagram 1 below.

BigDataIngestion

This makes the job of Technical Architect very challenging. I believe Technical Architect is an appropriate combined term for someone who deals with Data Architecture and Technology Architecture simultaneously. I combined these two domains together because I personally believe that in almost most cases there is a strong correlation between these two domains! TOGAF defines Data Architecture as “the structure of an organization’s logical and physical data assets and data management resources”. Further it defines Technology Architecture as “the software and hardware capabilities that are required to support the deployment of business, data and application services, including IT infrastructure, middleware, networks, communications, processing, and standards”. Putting semantics aside, I have never come across a Data Architect who is ambivalent to the underlying technology and neither to a Technologist who does not care about the type of data and the network bandwidth that is required to deliver that data. Sure plenty of people who seem to spend most of their time creating nice looking diagrams and quoting a phrase from George Orwell’s Animal Farm as soon as these diagrams were ready they were burnt in the furnace.

Anyway back to the subject, the sources of data have been around for a long time. What is the new kid in class is Hadoop. To be sure Hadoop has not been around for long. There are who of the opinion that Big Data is still more hype than reality, but agree that Hadoop might be a keeper.  It would only be the 2nd example in the IT industry, though, of a standard promoted by a giant industry player becoming universal, the 1st being Java of course as Sun Microsystems was a giant in its time, amazing though that now seems.

Big Data Architecture is all about developing reliable and scalable data pipelines with almost complete automation. That is no small feat as the Technology Architect requires deep knowledge of every layer in the stack, beginning with identifying the sources and quality of data, Data ingestion and conversion processes, the volume of data and the requirement for designing the Hadoop cluster to cater for that data. Think about it as coming up for each data input what needs to be done to factorize the raw data into relevant information, the information which in all probability has to confirm to Master Data Model, has to be tailored to individual consumption layer, with all access and security needed for it.

It would be appropriate here to define broadly what Hadoop is all about.

Hadoop

Think about a relational database management system (RDBMS) first as it is more familiar. Examples are Oracle or SAP Sybase ASE. RDBMS manages resources much like an operating system. It is charged with three basic tasks:

  • It must be able to put data in
  • keep that data
  • Take the data out and work with it.

Unlike an RDBMS, Hadoop is not a database. For example, it has basic APIs to put data in. It certainly keeps the data but again has very basic APIs to take that data out. So it is an infra-structure, it is also called a framework or Ecosystem. Its native API commands are similar to Linux commands. Case in point, in Linux you get the list of root directories by issuing an ls / command. In HDFS (see below) you get the same list by issuing hdfs dfs -ls / command.

Hadoop consists of two major components. These are:

  1. A massively scalable Hadoop Distributed File System (HDFS) that can support large amount of data on a cluster of commodity hardware (JBOD)
  2. A massively Parallel Processing System called MapReduce Engine, based on MapReduce programming model for computing results using a parallel, distributed algorithm on a cluster.

In short Hadoop allows applications based on MapReduce to run on large clusters of commodity hardware scaled horizontally to speed up computations and reduce latency. Compare this architecture to a typical RDBMS which is scaled vertically using a Symmetric Multi processing (SMP) architecture in a centralised environment.

Understanding Hadoop Distributed File System (HDFS)

HDFS is basically a resilient and clustered approach to managing files in a big data environment. Since data is written once and read many times after, compared to the transactional databases with constant read writes, HDFS lend itself to supporting big data analysis.

NameNode

As shown in figure 1 there is a NameNode and multiple DataNodes running on a commodity hardware cluster. The NameNode holds the metadata for the entire storage including details of where the blocks for a specific file are kept, the whereabouts of multiple copies of data blocks and access to files among other things.

It is akin to dynamic data dictionary views in Oracle and SYS/MDA tables in SAP ASE. It is essential that the metadata is available pretty fast.  NameNode behaves like an in-memory database IMDB and uses a disk file system called the FsImage to load the metadata as startup. This is the only place that I see value for Solid State Disk to make this initial load faster. For the remaining period until HDFS shutdown or otherwise NameNode will use the in memory cache to access metadata. Like an IMDB, if the amount of metadata in NameNode exceeds the allocated memory, then it will cease to function. Hence this is where large amount of RAM helps on the host that NameNode is running. With regard to sizing of NameNode to store metadata, one can use the following rules of thumb (heuristics):

  • NameNode consumes roughly 1 GB for every 1 million blokes (source Hadoop Operations, Eric Sammer, ISBN: 978-1-499-3205-7). So if you have 128 MB block size, you can store 128 * 1E6 / (3 *1024) = 41,666 GB of data for every 1 GB. Number 3 comes from the fact that the block is replicated three times (see below). In other words just under 42TB of data. So if you have 10 GB of NameNode cache, you can have up to 420 TB of data on your DataNodes

DataNodes

Data nodes provide the work horse for HDFS. Within the HDFS cluster data blocks are replicated across multiple data nodes (default three, two on the same cluster, one on another) and the access is managed by name nodes. Before going any further let us look at data node block sizes.

HDFS Block size

Block size in HDFS can be specified per file by the client application. A client writing to HDFS can pass the block size to the NameNode during its file creation request, and it (the client) may explicitly change the block size for the request, bypassing any cluster-set defaults.

The default block size of a client, which is used when a user does not explicitly specify a block size in their API call can be defaulted to the value specified by the local configuration parameter fs.block.size (in bytes). By default this value is set to 128MB (it can be 64MB, 256MB and I have seen 512MB). This block size is only applicable to DataNodes and has no bearing on NameNode.

It would be interesting to make a passing comparison to a typical block size in RDBMS. Standard RDBMS (Oracle, SAP ASE, MSSQL) have 8K per block or page. I am ignoring multi-block size or larger pool sizes for now. In simplest operation every single physical IO will fetch 8K worth of data. If we are talking about Linux, the OS block size is 4K. In other words in every single physical IO, we will fetch two OS blocks from the underlying disk into memory. In contrast, with HDFS a single physical IO will fetch 128*1024K/4K = 32,768 OS blocks. This shows the extreme coarse granularity of HDFS blocks that is suited for batch reads. How about the use of SSD here? Can creating HDFS directories on SSD are going to improve performance substantially. With regard to data nodes, we are talking about few writes (say bulk-insert) and many more reads. Let us compare SSD response compared to magnetic spinning hard disks (HDD) for bulk inserts. Although not on HDFS, I did tests using Oracle and SAP ASE RDBMS for bulk inserts and full table scans respectively as described below

 SSD versus HDD for Bulk Inserts

 SSD faired reasonably better compared to HDD but not that much faster. The initial seek time for SSD was faster but the rest of the writes were sequential extent after extent which would not much different from what HDD does.

In summary:

  • First seek time for positioning the disk on the first available space will be much faster for SSD than HDD
  • The rest of the writes will have marginal improvements with SSD over HDD
  • The performance gains I got was twice faster for SSD compared to HDD for bulk inserts

SSD versus HDD for full table scans

Solid state devices are typically twice here. When the query results in table scans, the gain is not that much. The reason being that HDD is reading sequentially from blocks and the seek time impact is considerably less compared to random reads.

In summary Solid State Disks are best suited for random access via index scan as seek time disappears and accessing any part of the disks is just as fast. When the query results in table scans, the gain is not that much. In HDFS practically all block reads are going to be sequential so having Solid State Disks for data nodes may not pay.

How does HDFS keep track of its data

Let us take an example of HDFS configured with 128MB of block size. So for every 1GB worth of data we consume eight blocks to store data. However, we will need redundancy. In RDBMS redundancy comes from mirroring using SAN disks and having DR resiliency in case of data centre failure. In HDFS the same comes from replicating every block that makes up the file, throughout the cluster in case of single point of failure by a node. So now that blocks are replicated how do we keep track of them? Recall that I mentioned NameNode and metadata. That is where all this info comes. In short, NameNode records:

  • The entity life history of the files, when they were created, modified, accessed, deleted etc
  • The map of all blocks of the files
  • The access and security of the files
  • The number of files, the overall, used and remaining capacity of HDFS
  • The cluster, the number of nodes, functioning and failed
  • The location of logs and the transaction log for the cluster

Unlike NameNode, DataNodes are very much like the workers. However, they do provide certain functions as below:

  • DataNode stores and retrieves the blocks in the local file system of the node
  • Stores the metadata of the block in the local file system, based on the metadata info from the NameNode
  • Does periodic validation of file checksums
  • Sends heartbeat and availability of blocks to NameNode
  • Directly can exchange metadata and data with the client application programs
  • Can forward data to the other DataNodes

Hadoop MapReduce

The second feature of Hadoop is its MapReduce engine. Why it is called engine, it is because it is an implementation of MapReduce algorithm. It is designed to processing large data sets in parallel across a Hadoop cluster. MapReduce as name sounds is a two step process, namely map and reduce. The job configuration supplies map and reduce analysis functions and the Hadoop engine provides the scheduling, distribution, and parallelization services. During the map phase, the input data is divided into input splits for analysis by map tasks running in parallel across the Hadoop cluster. That is the key point in Massive Parallel Processing (MPP). By default, the MapReduce engine gets input data from HDFS. The reduce phase uses results from map tasks as input to a set of parallel reduce tasks. The reduce tasks consolidate the data into final results. By default, the MapReduce engine stores results in HDFS.  Although the reduce phase depends on output from the map phase, map and reduce processing is not necessarily sequential. That is, reduce tasks can begin as soon as any map task completes. It is not necessary for all map tasks to complete before any reduce task can begin. Let us see this in action.

In below example I have a table tdash in HDFS that I imported it from Oracle database via Sqoop and Hive (will cover them in detail later). Sqoop (SQL to Hadoop) uses JDBC drivers to access relational database like Oracle and SAP ASE and stores them in HDFS as text delimited files. Hive software facilitates creating, querying and managing large datasets in relational table format residing in HDFS. So with Sqoop and Hive combined, you can get a table and its data from RDBMS and create and populate it in HDFS in relational format. Hive has a SQL like language similar to Transact SQL in SAP ASE called HiveQL. Now if you run a HQL query on Hive, you will see that it will use MapReduce to get the results out. You can actually create an index in Hive as well but it won’t be a B-tree index though (and at current state it will not be used by the Optimizer).  Diagram two below shows the steps needed to import an RDBMS table into HDFS. I will explain this diagram in later sections.

RDBMS_import_into_Hadoop

This table has 4 million rows and the stats for this table in Hive can be obtained believe it or not from update statistics command (BTW very similar to old Oracle one)

hive> analyze table tdash compute statistics;
Hadoop job information for Stage-0: number of mappers: 121; number of reducers: 0
2015-03-24 21:10:05,834 Stage-0 map = 0%,  reduce = 0%
2015-03-24 21:10:15,366 Stage-0 map = 2%,  reduce = 0%, Cumulative CPU 6.68 sec
2015-03-24 21:10:18,522 Stage-0 map = 6%,  reduce = 0%, Cumulative CPU 24.72 sec
2015-03-24 21:10:28,896 Stage-0 map = 7%,  reduce = 0%, Cumulative CPU 34.02 sec
…………..
2015-03-24 21:19:04,265 Stage-0 map = 100%,  reduce = 0%, Cumulative CPU 326.5 sec
MapReduce Total cumulative CPU time: 5 minutes 26 seconds 500 msec
Ended Job = job_1426887201015_0016
Table oraclehadoop.tdash stats: [numFiles=4, numRows=4000000, totalSize=32568651117, rawDataSize=32564651117]
MapReduce Jobs Launched:
Stage-Stage-0: Map: 121   Cumulative CPU: 326.5 sec   HDFS Read: 32570197317 HDFS Write: 10164 SUCCESS
Total MapReduce CPU Time Spent: 5 minutes 26 seconds 500 msec

So it tells us that this table has 4 million rows (numRows=4000000) and is split into 4 files (numFiles=4). The total size is 30GB has shown below. Total size is quoted in bytes


0: jdbc:hive2://rhes564:10010/default> select (32568651117/1024/1024/1024) AS GB;

+--------------------+--+

|         gb         |

+--------------------+--+

| 30.33192001003772  |

+--------------------+--+

FYI this table in Oracle is 61GB without compression. However, it is good that we have compression in HDFS. If I work this one out this table should table around 30*8 HDFS blocks on the assumption that each block is 128MB and 1 GB data takes around 8 blocks

Now let us run a simple query and see MapReduce in action

0: jdbc:hive2://rhes564:10010/default> select count(1) from tdash
. . . . . . . . . . . . . . . . . . .> where owner = 'PUBLIC'
. . . . . . . . . . . . . . . . . . .> and object_name = 'abc'
. . . . . . . . . . . . . . . . . . .> and object_type = 'SYNONYM'
. . . . . . . . . . . . . . . . . . .> and created = '2009-08-15 00:26:23.0';
INFO  : Hadoop job information for Stage-1: number of mappers: 121; number of reducers: 1
INFO  : 2015-03-24 21:38:40,015 Stage-1 map = 0%,  reduce = 0%
INFO  : 2015-03-24 21:39:01,523 Stage-1 map = 1%,  reduce = 0%, Cumulative CPU 18.53 sec
INFO  : 2015-03-24 21:39:03,575 Stage-1 map = 2%,  reduce = 0%, Cumulative CPU 18.74 sec
…………….
NFO  : 2015-03-24 21:40:31,761 Stage-1 map = 18%,  reduce = 0%, Cumulative CPU 79.15 sec
INFO  : 2015-03-24 21:40:37,948 Stage-1 map = 19%,  reduce = 6%, Cumulative CPU 83.21 sec
INFO  : 2015-03-24 21:40:44,129 Stage-1 map = 20%,  reduce = 6%, Cumulative CPU 88.91 sec
INFO  : 2015-03-24 21:40:47,197 Stage-1 map = 20%,  reduce = 7%, Cumulative CPU 90.13 sec
…………..
INFO  : 2015-03-24 21:47:23,923 Stage-1 map = 97%,  reduce = 32%, Cumulative CPU 401.5 sec
INFO  : 2015-03-24 21:47:31,207 Stage-1 map = 98%,  reduce = 32%, Cumulative CPU 405.54 sec
INFO  : 2015-03-24 21:47:32,237 Stage-1 map = 98%,  reduce = 33%, Cumulative CPU 405.54 sec
INFO  : 2015-03-24 21:47:37,356 Stage-1 map = 99%,  reduce = 33%, Cumulative CPU 407.87 sec
INFO  : 2015-03-24 21:47:41,452 Stage-1 map = 100%,  reduce = 67%, Cumulative CPU 409.14 sec
INFO  : 2015-03-24 21:47:42,474 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 409.88 se
INFO  : MapReduce Total cumulative CPU time: 6 minutes 49 seconds 880 msec
INFO  : Ended Job = job_1426887201015_0017

+------+--+
| _c0  |
+------+--+
| 0    |
+------+--+

The query started with 121 mappers and one reducer. I expected zero rows back. However, you will notice that after doing 19% mapping, the reduce process started at 6% and finally both map and reduce processes finished at 100% after 6 minutes and 49 seconds. This shows that reduce process does not need for mappers to complete before it starts.

Planning for Hadoop clustering

The philosophy behind big data is horizontal scaling so this embraces dynamic addition of nodes to a cluster depending on the famous three Vs; namely Volume, Velocity and Variety of data. Planning Hadoop cluster is not much different from planning for Oracle RAC or SAP Shared Disk Cluster. In contrast to these two products where the databases are in one place, HDFS distributes blocks of data among Data Nodes. The same resiliency and DR issues apply to Hadoop as much as Oracle and SAP ASE classic databases.

Hadoop runs on commodity hardware. That does not necessarily mean that you can deploy any hardware. Commodity hardware is a relative term. Sure you will not need to invest in SAN technology as you can get away with using JBODs, so there is relative savings to be made in here. For Hadoop to function properly, you will need to consider the hardware for NameNodes (AKA masters) and DataNodes (AKA workers). The master nodes require very good resiliency as a failure of a master node can disrupts the processing. In contrast, worker nodes that hold data can fail as there is in-built redundancy of data across workers.

Hardware for NameNodes

NameNodes hold metadata for HDFS data plus job scheduler, job tracker, worker nodes heartbeat monitor and secondary NameNode. So it is essential that redundancy is built into NameNode hardware. Bear in mind that the metadata is cached in memory. To this end the NameNode host must have enough RAM and pretty fast disks for start-up load. In above I recommended having SSD disks for NameNodes which I think would be beneficial to performance. NameNode will be memory hungry and low on disk storage. The OS disk needs to be mirrored internally (a default configuration) in all cases. The metadata will include filename, distribution of blocks and the location of replicas for each block, permissions, owner and group data. Bear in mind that longer filenames will occupy more bytes in memory. In general the more files the NameNode needs to track the more metadata it needs to maintain and more memory will be consumed. Depending on the size of the cluster if we are talking about 10-20 DataNodes, and assuming that 1GB of NameNode can store up to 42 TB of metadata, then we are looking at 420 to 840 TB of data storage so any host with 24 GB of RAM with quad-core, 1Gbit Ethernet connection and two JBOD drives will do, in addition to two disks that will be used for Operating System and mirror. Larger clusters will require NameNode with 48, 96 up to 128 GB of RAM or more.

Data Ingestion

Once we have pinpointed the source of data, Technical Architect needs to classify the characteristics of the data that must be processed according to each application. Data regardless of its source will have common characteristics:

  • Data origin – front office, middle office, back office, external providers, machine generated, email, intranet, wiki, spreadsheets, logs, audio, legacy, ftp
  • Data format – structured, unstructured
  • Data frequency – end of day, intra-day (on demand), continuous, time series
  • Data type – company data, customer reference data, historical, transactional
  • Data size – the volume of data expected from the silos and their daily growth. Your friendly DBAs should be able to provide this information plus various other administrators. You will need this information to assess the size of Big Data repository and its growth
  • Data delivery – How the data needs to be processed at the consumption layer

There is a terminology used for manipulating data. It is referred to as Data Ingestion. Data ingestion is the process of getting and processing data for later use or storage. The key point is processing these disparate data. Specifically Data Ingestion includes the following:

  • Data Identification – identifying different data formats for structured data and default formats for unstructured data.
  • Data filtration – only interested in data relevant to master data management (MDM) needs
  • Data validation – continuous validation of data against MDM
  • Data pruning – removing unwanted data
  • Data transformation – migrating from local silo schema to MDM
  • Data Integration – integrating the transformed data into Big Data

Real time data Ingestion

Real time reporting is becoming increasingly important. This has been around as a continuous real time log based transactional data capture. One of the challenges would be to implement it to deliver real time data to Hadoop. I am currently actively involved in getting this tested and production ready for a solution and I hope to cover it later.

Batch data Ingestion

The most straight forward case is getting data from structured data sources. Think about RDBMS data as source. In all probability you are familiar from getting data from RDBMS to another RDBMS. For example you can text delimited files to get data out of say SAP ASE and import it into Oracle (BCP data out from ASE, put data in using SQLLDR).

The only difference here is that the destination is Hadoop. Currently the most widely used tool is called Sqoop that uses JDBC to get data out of RDBMS and import it into HDFS in different file formats. I will discuss this later. Sqoop can also do incremental import the so called incremental append by checking the last unique identifier of rows and importing the new ones. It will effectively use SQL like SELECT * FROM <TABLE> WHERE IDENTIFIER > SOME_VALUE. However, this will not address the deletes and updates unless the full data is overwritten which will be time consuming if the underlying table is very large.

To be continued …

Comments»

1. Owen - March 25, 2015

Mich sounds like bitcoin for data.

2. VV - March 30, 2015

This is the best I’ve ever read that would make sense to a database professional how the equivalent things are with hadoop. Thanks a ton !


Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

%d bloggers like this: