Implementing Pig streaming extension in c#

The next step after playing with basic Pig on HDInsight was to look at what it takes to bring c# to the picture.

Pig allows creating User Defined Functions (UDF) in various languages but unfortunately .net isn’t currently supported.

On the other hand, though, Pig also supports streaming, and we already know that .net plays very nicely in this context, so – how does that work and what can I do with it?

Looking back at my METAR processing sample, let’s imagine that we wanted to use Pig to query the results and the wanted to produce the average temperature per airfield in a given result set. sounds good?

Well – using standard Pig script we can produce the result set we want and then, given a variable RESULT, we could invoke a custom c# program on in using the following syntax (note: pay attention to the direction of the ‘tick’, it caught me!) :

grunt> AVERAGE = stream RESULT  through `c:\\test\\pigStreamSample.exe`;

grunt> dump AVERAGE;

In real life I would probably use DEFINE to alias the definition of the extension and use SHIP to point at the location of the executable  –

grunt> define average `pigStreamSample.exe` ship (‘c:\\test\\pigStreamSample.exe’);

grunt> B = stream A through average;

grunt> dump B;

the output of this would look as follows –


My initial result set contained 144 METAR records for 3 airfields, these have been averaged to produce 3 records using a simple c# program, but this program could have been doing anything. nice!

How does pigStreamSample.exe look like? well – it’s not pretty, but it’s good enough to demonstrate the work required –

class Program
        static void Main(string[] args)
            Stream stdin = Console.OpenStandardInput();
            Stream stdout = Console.OpenStandardOutput();
            StreamWriter outputWriter = new StreamWriter(stdout);
            StreamReader inputReader = new StreamReader(stdin);
            List<Result> results = new List<Result>();

            string line = inputReader.ReadLine();
            while (line != null)
                //parse the input as received
                METAR metar = parseLine(line);
                //find if ICAO code already 'known'
                Result result = results.FirstOrDefault(r=>r.ICAO==metar.ICAO);
                if (result == null) //if ICAO code not alreayd known create a new one
                    result = new Result(metar.ICAO);
                result.count++;//increment count
                result.sum+=metar.Temperature;//add sum

                line = inputReader.ReadLine();

            foreach (Result r in results)
                outputWriter.WriteLine(string.Format("{0}\t{1}", r.ICAO, r.sum / r.count));//output the average per ICAO


        private static METAR parseLine(string line)
            string[] parts = line.Split('\t');
            METAR metar = new METAR();
            metar.ICAO = parts[0];
            metar.Date = parts[1];
            metar.Cloudbase = int.Parse(parts[2]) * 100;
            metar.Temperature = int.Parse(parts[3]);
            return metar;

with METAR being a simple class

    class METAR
        public string ICAO { get; set; }
        public string Date { get; set; }
        public int Cloudbase { get; set; }
        public int Temperature { get; set; }

and Result even simpler –

    class Result
        public Result(string ICAO)
            this.ICAO = ICAO;
            this.count = 0;
            this.sum = 0;
        public string ICAO { get; set; }
        public int count { get; set; }
        public int sum { get; set; }

Pig on HDInsight Server

@Slodge had prompted me to look in more detail into running Pig on HDInsight.

I’ve played with it the past using the interactive javascript console and the word count sample, but haven’t gone into much more detail and so I thought it would be nice to do something at the back of my ‘fancy’ aviation weather report using Pig and Hadoop on Windows.

In that previous post I described how I took semi-structured METAR reports, ran a M/R program on them to extract the cloudbase and temperature and then create a hive table on top; in this post I’ll use some basic Pig to examine the data in the hive table and extract the 10 reports with the highest cloudbase.

To get started I open a hadoop command shell and browse to c:\Hadoop\pig-0.9.3-SNAPSHOT\bin

I then run the pig.cmd which takes me to the grunt> prompt


to start with, I’ll simply read the contents of the table (as it’s not too big at this point) –

grunt>everything = LOAD ‘metarsoutput’;

grunt>dump everything;

and I get a bunch of results, here’s an extract –


To work with the results better I could provide details about the schema –

grunt>everything = LOAD ‘metarsoutput’ as (icao, datetime, cloudbase: int, temperature: int);

grunt> describe everything;

produces –



dump everything;

still produces the same results as before, but now I can ask the records to be sorted –

grunt> sorted = order everything by cloudbase desc;

grunt> dump sorted;

which produces a nicely ordered results –


I can also limit the number of records I want to get back –

grunt> top = limit sorted 10;

grunt> dump top;



Ok –so all of these are pretty basic examples, but as such show the basic operation of Pig on HDInsight.

To find out some more about what’s possible with PIG take a look here

@Slodge actually asked me about being able to run custom functions (UDFs) for Pig in c#, which is not currently possible, but Pig does support streaming, and that should provide a handy way ‘in’, which I’ll try to look at next.

Reflection on HDInsight

After a few days of ‘playing’ with HDInsight, both server and service, it was time to think back and take stock.

I had no exposure to Hadoop before I started playing with it on Azure a few months back, and I am still, by all accounts, a complete novice, but having spent the last few days on HDInsight there are a few interesting observations I thought I’d share –

Lowering the barrier of entry

Personally, and this is a very subjective view point, this is perhaps the greatest wow factor of all – until now, if I wanted to “get into the game” with Hadoop, I had to get comfortable with Java (nothing against it, but I’m not), and I had to have Eclipse, and everything had to be just right.
Iif I wanted to run anything locally, I had to get comfortable with running Hadoop in Cygwin and all sort of things (but frankly – tests clusters on Azure have been a great experience, more on that shortly)

Now – with HDInsight Server I can install Hadoop on Windows with a click from the web platform installer; I can get the latest of our distribution of Hadoop (currently in preview) on my laptop, in minutes, with zero config.

I can then use Visual Studio and .net, both I’m very familiar with, to do pretty much all the development I need, I no longer have Eclipse and I don’t really need to use Java.

This is bound to significantly lower the barrier of entry to handling big data for a lot of people. is this the beginning of Hadoop for the masses? Winking smile

Enabling reuse

The other thing that became apparent very quickly as I was building various scenarios in my tests, is that now that I’m developing in .net I can not only build on all the knowledge and experience I’ve accumulated over the years, I can actually build on a lot of code I already have.
When I processed the METAR data – I already had the parser I developed for my Windows 8 application, I did not need to re-write anything, it just slotted in.

Speaking to a few architects and developers in the last week or so these two points resonate very well – so many people want to get into the thick of things but are somewhat intimidated (as I was) or, quite simply, the cost of implementations is too high.

Choice between cloud and on-premise

When I started, I used the exclusively, because I did not want to run Hadoop on Cygwin, and I did not have access to our server deployment. Now everyone, pretty much, have access to both. This choice is quite powerful – I can see people like me running development instances locally to benefit from quick coding iterations, but running production clusters in the cloud, benefitting from the cost effectiveness of scaling in the cloud, not to mention on-demand clusters that can be removed when the processing is done.

I could also easily imagine the reverse – teams using the cloud as a test bed, on test data, before running on a production cluster, avoiding the need to maintain several environments on-premises.

Sticking with the community

Hadoop has a large eco-system, and it’s great that we can remain part of that. many projects are being worked on to stabilize and improve on Windows, with others to come in the future, but it seems that many ‘just work’ even now. it is simply great that the decision was taken not to re-invent the wheel here.

So far I’ve been using HDFS and Map/Reduce, but also Hive quite significantly and a bit of Mahout, and I know others have been trying out Oozie and other projects on it too.

I’ve been browsing the Apache repositories a little bit and it seems to me that the contributions made are very well received and go beyond benefiting just those who chose to run Hadoop on Windows, and that’s great too! Smile

Connecting with the rest of the stack

But of course, technology is here to serve a purpose, and when it comes to data, there are already protocols and interfaces that systems and users use very effectively.
People, fundamentally, don’t want to change they consume data – big or small – they want to be able to leverage Hadoop but remain in the familiar tools and technologies such as SQL Server with Analysis Services, Reporting Services, Power View and tools such as Excel with Pivot Tables, data mining plug-ins etc.

It was great to see how easy it is to get from Hadoop to PowerPivot and Power View (even better told by Denny Lee) as well as read a white paper discussing how to leverage Hadoop from SSIS 

Of course there are many other benefits, I’m being really selfish here and put down the 4 that struck me most for my immediate needs. I suspect many organisations will value the ability to run on Windows with all the management story that comes with it (whilst others, I’m sure, won’t care), and there are some very important capabilities that still need covering – Active Directory integration, for example, for better security or System Centre integration for better monitoring. but these are for another day Smile

Moving my .net Word Count Map/Reduce to Azure

Of course the next step after running my first .net Map Reduce job on HDInsight Server and then the effort to get from unstructured to Power View must be to try it out on Azure as well so I provisioned myself a test cluster on HadoopOnAzure and initiated a remote desktop into it.

I need to get my files onto it (my WordCountSample.dll and all the files from theMRLib folder in the project), so I zipped them up, stored them in skydrive and downloaded on the other end (love SkyDrive)

After extracting the folder on the Azure node, in order to test the word count sample, it was simply a case of uploading the source (I used the interactive web console with fs.put() for that), opening the hadoop command prompt (on the desktop) and running the job as I did previously.

As expected – the two work just the same and I can easy move my jobs between them untouched.

To test the aviation weather report scenario I wanted more data than I cared to upload manually, so I wrote a small Worker role for Azure that downloaded the latest METAR reports hourly and stored in them as blobs in a contained on Azure.

I then configured my cluster with my Azure account and was able to run the job direcly on the data in Azure Storage, which is really cool in my view.

I could store lots of data cheaply, then provision a cluster, run the processing, store the result in Azure as well and get rid of the cluster.

From Map/Reduce to Hive (and Power View) using HDInsight

Whilst Map/Reduce is very powerful in processing unstructured data, users (and most applications) still prefer to handle with structured data in familiar ways, and this is where the hive support and the HDInsight ODBC provider comes in very handy.

One could use Map/Reduce to process  un/semi-structured data into structured data in files which can then be exposed, through, hive, as tables to external systems.

I wanted to demonstrate an end to end scenario, but one that is simple enough not to cloud the principles, and with my love for all things aviation I thought I’d look at aviation weather reports – METARS.

As input I’ve downloaded a bunch of current reports for London’s Heathrow (EGLL), Gatwick (EGKK) and Stansted (EGSS) airports; these come in as strings that look like  –

EGLL 280820Z 21006KT 180V260 9999 SCT033 04/02 Q1017 NOSIG

Given the nature of the beast – METAR data format does have rules but a) they are quite flexible, b) they are not always followed to the letter – Map/Reduce would be very useful to extract the relevant information from the fairly flexible input format. thankfully I already built (for a Windows 8 app I’m working on) a library that parses METARs, so I could use that in my mapper (oh! the benefits of being able to use .net for M/R jobs!)

As an example I’ve decided to create a report to demonstrate the change of the cloud base and temperature over a particular airport over time (in this example there’s one layer of scattered clouds in 3,300 feet, represented by the SCT033 string and temperature of 04 in 04/02), but of course this can get as complicated as one wants it to be…

The idea is to use a mapper to convert this semi-structured format to a know format one of, say

[ICAO Code] \t [observation date/time] \t [cloudbase in feet] \t [temperature]\n\r

With this more structured format I could create a hive definition on top of it and consume that from, for example, Excel via the ODBC driver.

Let’s see what it takes –

The first step is the M/R layer – in this case I do not really need a reducer-combiner as I have no aggregation to do, I simply want to convert the source data to a more structure format, and that’s what the mapper is all about.

In .net I’ll create the following Mapper class –

    public class METARMap : MapperBase
        public override void Map(string inputLine, MapperContext context)
            context.Log("Processing " + inputLine);
            //my metar files have each two lines - first line is date in the format 2012/10/28 12:20
            //second line starts with ICAO code; I need to ignore the lines with the date, 
            //this will do for the next 988 years or so 
            if (!inputLine.Trim().StartsWith("2"))
                Aviator.METAR.DecodedMETAR metar  = Aviator.METAR.DecodedMETAR.decodeMETAR(inputLine);
                    metar.Cloud.Count > 0 ? metar.Cloud[0].Height.ToString() : null,

The METAR decoding logic is irrelevant here really, the important piece is that in the .net SDK, alongside the EmitKeyValue function of Context, you can also find EmitLine which gives you full control on the structure of the line emitted; in this case I chosen to stick to the tab-delimited approach, but added additional values to the ICAO code key. (calcObservationDateTime is a function that returns a date/time value based on the first portion of the METAR (280820Z means 28th day of current month and year, at 08:20 UTC)

the result of the map for the input I’ve provided above is

EGLL 28/10/2012 09:20 33 02

now – I did say I did not really need a reducer-combiner, and that is true, but as my input comes in many small files, with just a map, the output will also be created as many small files, so I created  a simple combiner to bring them together  – it doesn’t really do anything – it get’s the ICAO code as a key and the output from the map (all the fields, tab delimited) as a single value in the array, so it looks over the array emitting the key and each value separately, but now to a single file

    public class MetarReducer : Microsoft.Hadoop.MapReduce.ReducerCombinerBase
        public override void Reduce(string key, IEnumerable<string> values, Microsoft.Hadoop.MapReduce.ReducerCombinerContext context)
            foreach (string value in values)
                context.EmitKeyValue(key, value);

Either way – single file for all metars or many files in a folder, the result are in a consistent format, with only the data I need,  so I can now create a hive external table using the following statement in the hive interactive console –

create external table metars(icao string, obs_datetime string,cloudbase int, temperature smallint) row format delimited fields terminated by ‘\t’ stored as textfile location ‘/user/yossidah/metarsoutput’

which in turn allows me to query the table –

select * from metars

and get  –

EGKK 28/10/2012 09:20 30 5

EGLL 28/10/2012 11:20 15 8

Now I can use the hive add-in to excel and read that data –


..and if it’s in Excel it can be in any other data-based system, including PowerPivot and Power View, here’s one with a bit more data (3 airfields, 6 METARS for each) –


And so there you go – from unstructured to Power View, all on Windows and in .net Smile

Enhancing the word count sample a little bit

Since starting to play with Hadoop on Windows/Azure (now HDInsight) I wanted to improve the word count sample slightly so that it ignores punctuation and very common list, but as it involved Eclipse and Java it never quite made it to the top of the list; now that it’s Visual Studio and .net I really had no excuses, so here are the two changed I’ve made to what I’ve started with in my previous post

Firstly – to remove all punctuation, i’ve added the following function –

private string removePuncuation(string word)
            var sb = new StringBuilder();
            foreach (char c in word.Trim())
                if (!char.IsPunctuation(c))

            return sb.ToString();

I then added it to my map function as you can see below –

        public override void Map(string inputLine, Microsoft.Hadoop.MapReduce.MapperContext context)
            string[] words = inputLine.Split(' ');

            foreach (string word in words)
                string newWord = removePuncuation(word);
                context.EmitKeyValue(newWord, "1");


To support ignoring common words I wanted to keep the list of words outside the code, as an HDFS file, so firstly I added an initialize method override to load that list –

        private List<string> ignoreList = new List<string>();
        public override void Initialize(MapperContext context)
             const string IGNORE_LIST_FILENAME = "/user/yossidah/input/ignoreList.txt";
            context.Log("WordCountMapper Initialized called");
            context.Log("looking for file " + IGNORE_LIST_FILENAME);
            if (HdfsFile.Exists(IGNORE_LIST_FILENAME))
                context.Log("ignore list file found");
                string[] lines = Microsoft.Hadoop.MapReduce.HdfsFile.ReadAllLines("ignoreList.txt");
                foreach (string line in lines)
                    context.Log("ignore list line: " + line);
                    string[] words = line.Split(' ');
                    foreach (string word in words)
                        context.Log(string.Format("Adding {0} to ignore list", word));
                context.Log("ignore list file not found");

(I’ve added a bunch of logging I can track from the job log file)

I then added a call to the map list to consult the list of words to ignore to determine whether to call emit or not, here’s the complete map function again

public override void Map(string inputLine, Microsoft.Hadoop.MapReduce.MapperContext context) { string[] words = inputLine.Split(' '); foreach (string word in words) { string newWord = removePuncuation(word); if(!ignoreList.Contains(newWord))
context.EmitKeyValue(newWord, "1"); } }

Simples. not the most elaborate program in the world, but slightly better than my starting point.

I’ve got another, potentially more interesting, program I could use for demos in mind, but I need to grab some (big) data first, watch this space Smile

And again – a note – initially I ran this all from my domain user, and I had issues with accessing the ignoreList file; I’ve reported this and it’s being looked at, but basically there’s a problem for Hadoop (at the moment?) to validate domain users’ permissions.

There were two ways around it – I have uploaded the file from the web interactive console (using fs.put()) and then changed the path in my initalize method (in my case to /user/hadoop/input/ignoreList.txt); I’’m pretty sure that if I had done everything from a non-domain joined account I would not be facing this problem.

From Zero to Map Reduce in .net on Windows in 10 minutes

Back in May I posted about what it took to create a development environment to be able to build map/reduce programs for Hadoop on Azure (HoA), now officially Azure HDInsight Service

Yesterday Microsoft published the Microsoft .net SDK for Hadoop which makes it easier to build map/reduce jobs in .net. to me (and many other developers, I suspect) this makes it all the more approachable, which is awesome!

To begin with – it means being able to use Visual Studio and not having to have (the correct version of), which to me is clearly a great advantage, but it gets better as the SDK, like many others these days, was made available through NuGet, so – to be able to develop my map/reduce program in VS I simply open it and in the package manager type

install-package Microsoft.Hadoop.MapReduce


Doing so had added a few things to my project – I can now see a reference to Microsoft.Hadoop.MapReduce and Newtonsoft.Json as well as an additioanl ‘MRLib’ folder containing several useful resources –


the added namespace provides, amongst other things, a MapperBase class I could use to define my mapper with a Map function for me to override –

 public class WordCountMapper : Microsoft.Hadoop.MapReduce.MapperBase
        public override void Map(string inputLine, MapperContext context)
            throw new NotImplementedException();

and so, if I was to implement the Hadoop’s equivalent to Hello World, the infamous WordCount, I would do something along the lines of –

public override void Map(string inputLine, MapperContext context)
            string[] words = inputLine.Split(' ');
            foreach (string word in words)
                context.EmitKeyValue(word, "1");

The reduce would then look something like –

 public class WordCountReducer : ReducerCombinerBase
public override void Reduce(string key, IEnumerable<string> values, ReducerCombinerContext context)
              context.EmitKeyValue(key, values.Count().ToString());//each value is always one, so count is as good as sum 

To run this I can create a job configuration –

 public class WordCountJob : HadoopJob<WordCountMapper,WordCountReducer>
        public override HadoopJobConfiguration Configure(ExecutorContext context)
            HadoopJobConfiguration config = new HadoopJobConfiguration();
            config.InputPath = context.Arguments[0];
            config.OutputFolder = context.Arguments[1];
            return config;

At this point I was going to try and run this on Windows Azure, but then I figured – what the heck – I’ll run this on my laptop, after all – HDInsight Server is on the web platform installer –


after installing it I can see the services running –


and I can see a couple of web sites –


So – with Hadoop now installed, I need to get some files into it –

I’ll start put uploading to HDFS a book from Gutenberg by opening the hadoop command shell (shortcut on the desktop goes to “C:\windows\system32\cmd.exe /k pushd “c:\hadoop\hadoop-1.1.0-SNAPSHOT” && “c:\hadoop\hadoop-1.1.0-SNAPSHOT\bin\hadoop.cmd””

and the use the following command to create a books folder

hadoop fs –mkdir books

and the following command to upload a file to that folder –

hadoop fs -copyFromLocal "<path>\Ulysses.txt" books/ulysses.txt

I could verify that the file has been uploaded to hdfs using

hadoop fs -ls books

and ran the job using MRRunner providing the two parameters – input folder and output filename

mrrunner –dll  WordCountSample.dll -- books output

With that done, I could verify my results using

Hadoop fs -cat output/part-00000

Admittedly this is only scratching the surface, and a very basic sample. a slightly more elaborate one to follow, my intention here was really just to show how easy it is to get started with Hadoop and .net on Windows, and I hope this point was made….

Note – for some reason, when I tried this in the office Hadoop insisted on using an unfamiliar host name when accessing log file and the results of the map phase making the job get ‘stuck’ at 100% map and 0% reduce. I could work around it initially by adding a host file entry point this unknown domain to, but this morning I learnt from a colleague that Hadoop does a reverse DNS lookup to find the name node and job tracker and it just happened to find a more responsive machine on the network than mine! rebooting my machine last night at home prevented this from happening and everything worked very smoothly

%d bloggers like this: