From Map/Reduce to Hive (and Power View) using HDInsight

Whilst Map/Reduce is very powerful in processing unstructured data, users (and most applications) still prefer to handle with structured data in familiar ways, and this is where the hive support and the HDInsight ODBC provider comes in very handy.

One could use Map/Reduce to process  un/semi-structured data into structured data in files which can then be exposed, through, hive, as tables to external systems.

I wanted to demonstrate an end to end scenario, but one that is simple enough not to cloud the principles, and with my love for all things aviation I thought I’d look at aviation weather reports – METARS.

As input I’ve downloaded a bunch of current reports for London’s Heathrow (EGLL), Gatwick (EGKK) and Stansted (EGSS) airports; these come in as strings that look like  –

EGLL 280820Z 21006KT 180V260 9999 SCT033 04/02 Q1017 NOSIG

Given the nature of the beast – METAR data format does have rules but a) they are quite flexible, b) they are not always followed to the letter – Map/Reduce would be very useful to extract the relevant information from the fairly flexible input format. thankfully I already built (for a Windows 8 app I’m working on) a library that parses METARs, so I could use that in my mapper (oh! the benefits of being able to use .net for M/R jobs!)

As an example I’ve decided to create a report to demonstrate the change of the cloud base and temperature over a particular airport over time (in this example there’s one layer of scattered clouds in 3,300 feet, represented by the SCT033 string and temperature of 04 in 04/02), but of course this can get as complicated as one wants it to be…

The idea is to use a mapper to convert this semi-structured format to a know format one of, say

[ICAO Code] \t [observation date/time] \t [cloudbase in feet] \t [temperature]\n\r

With this more structured format I could create a hive definition on top of it and consume that from, for example, Excel via the ODBC driver.

Let’s see what it takes –

The first step is the M/R layer – in this case I do not really need a reducer-combiner as I have no aggregation to do, I simply want to convert the source data to a more structure format, and that’s what the mapper is all about.

In .net I’ll create the following Mapper class –

    public class METARMap : MapperBase
    {
        public override void Map(string inputLine, MapperContext context)
        {
            context.Log("Processing " + inputLine);
            //my metar files have each two lines - first line is date in the format 2012/10/28 12:20
            //second line starts with ICAO code; I need to ignore the lines with the date, 
            //this will do for the next 988 years or so 
            if (!inputLine.Trim().StartsWith("2"))
            {
                Aviator.METAR.DecodedMETAR metar  = Aviator.METAR.DecodedMETAR.decodeMETAR(inputLine);
                context.EmitLine(string.Format("{0}\t{1}\t{2}\t{3}",
                    metar.ICAO,
                    calcObservationDateTime(metar), 
                    metar.Cloud.Count > 0 ? metar.Cloud[0].Height.ToString() : null,
                    metar.Temprature));
            }
        }

The METAR decoding logic is irrelevant here really, the important piece is that in the .net SDK, alongside the EmitKeyValue function of Context, you can also find EmitLine which gives you full control on the structure of the line emitted; in this case I chosen to stick to the tab-delimited approach, but added additional values to the ICAO code key. (calcObservationDateTime is a function that returns a date/time value based on the first portion of the METAR (280820Z means 28th day of current month and year, at 08:20 UTC)

the result of the map for the input I’ve provided above is

EGLL 28/10/2012 09:20 33 02

now – I did say I did not really need a reducer-combiner, and that is true, but as my input comes in many small files, with just a map, the output will also be created as many small files, so I created  a simple combiner to bring them together  – it doesn’t really do anything – it get’s the ICAO code as a key and the output from the map (all the fields, tab delimited) as a single value in the array, so it looks over the array emitting the key and each value separately, but now to a single file

    public class MetarReducer : Microsoft.Hadoop.MapReduce.ReducerCombinerBase
    {
        public override void Reduce(string key, IEnumerable<string> values, Microsoft.Hadoop.MapReduce.ReducerCombinerContext context)
        {
            foreach (string value in values)
                context.EmitKeyValue(key, value);
        }
    }

Either way – single file for all metars or many files in a folder, the result are in a consistent format, with only the data I need,  so I can now create a hive external table using the following statement in the hive interactive console –

create external table metars(icao string, obs_datetime string,cloudbase int, temperature smallint) row format delimited fields terminated by ‘\t’ stored as textfile location ‘/user/yossidah/metarsoutput’

which in turn allows me to query the table –

select * from metars

and get  –

EGKK 28/10/2012 09:20 30 5

EGLL 28/10/2012 11:20 15 8

Now I can use the hive add-in to excel and read that data –

image

..and if it’s in Excel it can be in any other data-based system, including PowerPivot and Power View, here’s one with a bit more data (3 airfields, 6 METARS for each) –

image

And so there you go – from unstructured to Power View, all on Windows and in .net Smile

Enhancing the word count sample a little bit

Since starting to play with Hadoop on Windows/Azure (now HDInsight) I wanted to improve the word count sample slightly so that it ignores punctuation and very common list, but as it involved Eclipse and Java it never quite made it to the top of the list; now that it’s Visual Studio and .net I really had no excuses, so here are the two changed I’ve made to what I’ve started with in my previous post

Firstly – to remove all punctuation, i’ve added the following function –

private string removePuncuation(string word)
        {
            var sb = new StringBuilder();
            foreach (char c in word.Trim())
            {
                if (!char.IsPunctuation(c))
                    sb.Append(c);
            }

            return sb.ToString();
        }

I then added it to my map function as you can see below –

        public override void Map(string inputLine, Microsoft.Hadoop.MapReduce.MapperContext context)
        {
            string[] words = inputLine.Split(' ');

            foreach (string word in words)
            {
                string newWord = removePuncuation(word);
                context.EmitKeyValue(newWord, "1");
            }
        }

simples.

To support ignoring common words I wanted to keep the list of words outside the code, as an HDFS file, so firstly I added an initialize method override to load that list –

        private List<string> ignoreList = new List<string>();
        public override void Initialize(MapperContext context)
        {
             const string IGNORE_LIST_FILENAME = "/user/yossidah/input/ignoreList.txt";
            base.Initialize(context);
            context.Log("WordCountMapper Initialized called");
            context.Log("looking for file " + IGNORE_LIST_FILENAME);
            if (HdfsFile.Exists(IGNORE_LIST_FILENAME))
            {
                context.Log("ignore list file found");
                string[] lines = Microsoft.Hadoop.MapReduce.HdfsFile.ReadAllLines("ignoreList.txt");
                foreach (string line in lines)
                {
                    context.Log("ignore list line: " + line);
                    string[] words = line.Split(' ');
                    ignoreList.AddRange(words);
                    foreach (string word in words)
                    {
                        context.Log(string.Format("Adding {0} to ignore list", word));
                    }
                }
            }
            else
                context.Log("ignore list file not found");
        }

(I’ve added a bunch of logging I can track from the job log file)

I then added a call to the map list to consult the list of words to ignore to determine whether to call emit or not, here’s the complete map function again

public override void Map(string inputLine, Microsoft.Hadoop.MapReduce.MapperContext context) { string[] words = inputLine.Split(' '); foreach (string word in words) { string newWord = removePuncuation(word); if(!ignoreList.Contains(newWord))
context.EmitKeyValue(newWord, "1"); } }

Simples. not the most elaborate program in the world, but slightly better than my starting point.

I’ve got another, potentially more interesting, program I could use for demos in mind, but I need to grab some (big) data first, watch this space Smile

And again – a note – initially I ran this all from my domain user, and I had issues with accessing the ignoreList file; I’ve reported this and it’s being looked at, but basically there’s a problem for Hadoop (at the moment?) to validate domain users’ permissions.

There were two ways around it – I have uploaded the file from the web interactive console (using fs.put()) and then changed the path in my initalize method (in my case to /user/hadoop/input/ignoreList.txt); I’’m pretty sure that if I had done everything from a non-domain joined account I would not be facing this problem.

%d bloggers like this: