Implementing Pig streaming extension in c#

The next step after playing with basic Pig on HDInsight was to look at what it takes to bring c# to the picture.

Pig allows creating User Defined Functions (UDF) in various languages but unfortunately .net isn’t currently supported.

On the other hand, though, Pig also supports streaming, and we already know that .net plays very nicely in this context, so – how does that work and what can I do with it?

Looking back at my METAR processing sample, let’s imagine that we wanted to use Pig to query the results and the wanted to produce the average temperature per airfield in a given result set. sounds good?

Well – using standard Pig script we can produce the result set we want and then, given a variable RESULT, we could invoke a custom c# program on in using the following syntax (note: pay attention to the direction of the ‘tick’, it caught me!) :

grunt> AVERAGE = stream RESULT  through `c:\\test\\pigStreamSample.exe`;

grunt> dump AVERAGE;

In real life I would probably use DEFINE to alias the definition of the extension and use SHIP to point at the location of the executable  –

grunt> define average `pigStreamSample.exe` ship (‘c:\\test\\pigStreamSample.exe’);

grunt> B = stream A through average;

grunt> dump B;

the output of this would look as follows –

image

My initial result set contained 144 METAR records for 3 airfields, these have been averaged to produce 3 records using a simple c# program, but this program could have been doing anything. nice!

How does pigStreamSample.exe look like? well – it’s not pretty, but it’s good enough to demonstrate the work required –

class Program
    {
        static void Main(string[] args)
        {
            Stream stdin = Console.OpenStandardInput();
            Stream stdout = Console.OpenStandardOutput();
            StreamWriter outputWriter = new StreamWriter(stdout);
            StreamReader inputReader = new StreamReader(stdin);
            
            List<Result> results = new List<Result>();

            string line = inputReader.ReadLine();
            while (line != null)
            {
                //parse the input as received
                METAR metar = parseLine(line);
                //find if ICAO code already 'known'
                Result result = results.FirstOrDefault(r=>r.ICAO==metar.ICAO);
                if (result == null) //if ICAO code not alreayd known create a new one
                {
                    result = new Result(metar.ICAO);
                    results.Add(result);
                }
                result.count++;//increment count
                result.sum+=metar.Temperature;//add sum

                //outputWriter.WriteLine(line);
                line = inputReader.ReadLine();
            }

            foreach (Result r in results)
                outputWriter.WriteLine(string.Format("{0}\t{1}", r.ICAO, r.sum / r.count));//output the average per ICAO

            inputReader.Close();
            inputReader.Dispose();
            outputWriter.Close(); 
            outputWriter.Dispose();
        }

        private static METAR parseLine(string line)
        {
            string[] parts = line.Split('\t');
            METAR metar = new METAR();
            metar.ICAO = parts[0];
            metar.Date = parts[1];
            metar.Cloudbase = int.Parse(parts[2]) * 100;
            metar.Temperature = int.Parse(parts[3]);
            return metar;
        }
        
    }

with METAR being a simple class

    class METAR
    {
        public string ICAO { get; set; }
        public string Date { get; set; }
        public int Cloudbase { get; set; }
        public int Temperature { get; set; }
    }

and Result even simpler –

    class Result
    {
        public Result(string ICAO)
        {
            this.ICAO = ICAO;
            this.count = 0;
            this.sum = 0;
        }
        public string ICAO { get; set; }
        public int count { get; set; }
        public int sum { get; set; }
    }

Reflection on HDInsight

After a few days of ‘playing’ with HDInsight, both server and service, it was time to think back and take stock.

I had no exposure to Hadoop before I started playing with it on Azure a few months back, and I am still, by all accounts, a complete novice, but having spent the last few days on HDInsight there are a few interesting observations I thought I’d share –

Lowering the barrier of entry

Personally, and this is a very subjective view point, this is perhaps the greatest wow factor of all – until now, if I wanted to “get into the game” with Hadoop, I had to get comfortable with Java (nothing against it, but I’m not), and I had to have Eclipse, and everything had to be just right.
Iif I wanted to run anything locally, I had to get comfortable with running Hadoop in Cygwin and all sort of things (but frankly – tests clusters on Azure have been a great experience, more on that shortly)

Now – with HDInsight Server I can install Hadoop on Windows with a click from the web platform installer; I can get the latest of our distribution of Hadoop (currently in preview) on my laptop, in minutes, with zero config.

I can then use Visual Studio and .net, both I’m very familiar with, to do pretty much all the development I need, I no longer have Eclipse and I don’t really need to use Java.

This is bound to significantly lower the barrier of entry to handling big data for a lot of people. is this the beginning of Hadoop for the masses? Winking smile

Enabling reuse

The other thing that became apparent very quickly as I was building various scenarios in my tests, is that now that I’m developing in .net I can not only build on all the knowledge and experience I’ve accumulated over the years, I can actually build on a lot of code I already have.
When I processed the METAR data – I already had the parser I developed for my Windows 8 application, I did not need to re-write anything, it just slotted in.

Speaking to a few architects and developers in the last week or so these two points resonate very well – so many people want to get into the thick of things but are somewhat intimidated (as I was) or, quite simply, the cost of implementations is too high.

Choice between cloud and on-premise

When I started, I used the hadoponazure.com exclusively, because I did not want to run Hadoop on Cygwin, and I did not have access to our server deployment. Now everyone, pretty much, have access to both. This choice is quite powerful – I can see people like me running development instances locally to benefit from quick coding iterations, but running production clusters in the cloud, benefitting from the cost effectiveness of scaling in the cloud, not to mention on-demand clusters that can be removed when the processing is done.

I could also easily imagine the reverse – teams using the cloud as a test bed, on test data, before running on a production cluster, avoiding the need to maintain several environments on-premises.

Sticking with the community

Hadoop has a large eco-system, and it’s great that we can remain part of that. many projects are being worked on to stabilize and improve on Windows, with others to come in the future, but it seems that many ‘just work’ even now. it is simply great that the decision was taken not to re-invent the wheel here.

So far I’ve been using HDFS and Map/Reduce, but also Hive quite significantly and a bit of Mahout, and I know others have been trying out Oozie and other projects on it too.

I’ve been browsing the Apache repositories a little bit and it seems to me that the contributions made are very well received and go beyond benefiting just those who chose to run Hadoop on Windows, and that’s great too! Smile

Connecting with the rest of the stack

But of course, technology is here to serve a purpose, and when it comes to data, there are already protocols and interfaces that systems and users use very effectively.
People, fundamentally, don’t want to change they consume data – big or small – they want to be able to leverage Hadoop but remain in the familiar tools and technologies such as SQL Server with Analysis Services, Reporting Services, Power View and tools such as Excel with Pivot Tables, data mining plug-ins etc.

It was great to see how easy it is to get from Hadoop to PowerPivot and Power View (even better told by Denny Lee) as well as read a white paper discussing how to leverage Hadoop from SSIS 

Of course there are many other benefits, I’m being really selfish here and put down the 4 that struck me most for my immediate needs. I suspect many organisations will value the ability to run on Windows with all the management story that comes with it (whilst others, I’m sure, won’t care), and there are some very important capabilities that still need covering – Active Directory integration, for example, for better security or System Centre integration for better monitoring. but these are for another day Smile

From Map/Reduce to Hive (and Power View) using HDInsight

Whilst Map/Reduce is very powerful in processing unstructured data, users (and most applications) still prefer to handle with structured data in familiar ways, and this is where the hive support and the HDInsight ODBC provider comes in very handy.

One could use Map/Reduce to process  un/semi-structured data into structured data in files which can then be exposed, through, hive, as tables to external systems.

I wanted to demonstrate an end to end scenario, but one that is simple enough not to cloud the principles, and with my love for all things aviation I thought I’d look at aviation weather reports – METARS.

As input I’ve downloaded a bunch of current reports for London’s Heathrow (EGLL), Gatwick (EGKK) and Stansted (EGSS) airports; these come in as strings that look like  –

EGLL 280820Z 21006KT 180V260 9999 SCT033 04/02 Q1017 NOSIG

Given the nature of the beast – METAR data format does have rules but a) they are quite flexible, b) they are not always followed to the letter – Map/Reduce would be very useful to extract the relevant information from the fairly flexible input format. thankfully I already built (for a Windows 8 app I’m working on) a library that parses METARs, so I could use that in my mapper (oh! the benefits of being able to use .net for M/R jobs!)

As an example I’ve decided to create a report to demonstrate the change of the cloud base and temperature over a particular airport over time (in this example there’s one layer of scattered clouds in 3,300 feet, represented by the SCT033 string and temperature of 04 in 04/02), but of course this can get as complicated as one wants it to be…

The idea is to use a mapper to convert this semi-structured format to a know format one of, say

[ICAO Code] \t [observation date/time] \t [cloudbase in feet] \t [temperature]\n\r

With this more structured format I could create a hive definition on top of it and consume that from, for example, Excel via the ODBC driver.

Let’s see what it takes –

The first step is the M/R layer – in this case I do not really need a reducer-combiner as I have no aggregation to do, I simply want to convert the source data to a more structure format, and that’s what the mapper is all about.

In .net I’ll create the following Mapper class –

    public class METARMap : MapperBase
    {
        public override void Map(string inputLine, MapperContext context)
        {
            context.Log("Processing " + inputLine);
            //my metar files have each two lines - first line is date in the format 2012/10/28 12:20
            //second line starts with ICAO code; I need to ignore the lines with the date, 
            //this will do for the next 988 years or so 
            if (!inputLine.Trim().StartsWith("2"))
            {
                Aviator.METAR.DecodedMETAR metar  = Aviator.METAR.DecodedMETAR.decodeMETAR(inputLine);
                context.EmitLine(string.Format("{0}\t{1}\t{2}\t{3}",
                    metar.ICAO,
                    calcObservationDateTime(metar), 
                    metar.Cloud.Count > 0 ? metar.Cloud[0].Height.ToString() : null,
                    metar.Temprature));
            }
        }

The METAR decoding logic is irrelevant here really, the important piece is that in the .net SDK, alongside the EmitKeyValue function of Context, you can also find EmitLine which gives you full control on the structure of the line emitted; in this case I chosen to stick to the tab-delimited approach, but added additional values to the ICAO code key. (calcObservationDateTime is a function that returns a date/time value based on the first portion of the METAR (280820Z means 28th day of current month and year, at 08:20 UTC)

the result of the map for the input I’ve provided above is

EGLL 28/10/2012 09:20 33 02

now – I did say I did not really need a reducer-combiner, and that is true, but as my input comes in many small files, with just a map, the output will also be created as many small files, so I created  a simple combiner to bring them together  – it doesn’t really do anything – it get’s the ICAO code as a key and the output from the map (all the fields, tab delimited) as a single value in the array, so it looks over the array emitting the key and each value separately, but now to a single file

    public class MetarReducer : Microsoft.Hadoop.MapReduce.ReducerCombinerBase
    {
        public override void Reduce(string key, IEnumerable<string> values, Microsoft.Hadoop.MapReduce.ReducerCombinerContext context)
        {
            foreach (string value in values)
                context.EmitKeyValue(key, value);
        }
    }

Either way – single file for all metars or many files in a folder, the result are in a consistent format, with only the data I need,  so I can now create a hive external table using the following statement in the hive interactive console –

create external table metars(icao string, obs_datetime string,cloudbase int, temperature smallint) row format delimited fields terminated by ‘\t’ stored as textfile location ‘/user/yossidah/metarsoutput’

which in turn allows me to query the table –

select * from metars

and get  –

EGKK 28/10/2012 09:20 30 5

EGLL 28/10/2012 11:20 15 8

Now I can use the hive add-in to excel and read that data –

image

..and if it’s in Excel it can be in any other data-based system, including PowerPivot and Power View, here’s one with a bit more data (3 airfields, 6 METARS for each) –

image

And so there you go – from unstructured to Power View, all on Windows and in .net Smile

%d bloggers like this: