Implementing Pig streaming extension in c#

The next step after playing with basic Pig on HDInsight was to look at what it takes to bring c# to the picture.

Pig allows creating User Defined Functions (UDF) in various languages but unfortunately .net isn’t currently supported.

On the other hand, though, Pig also supports streaming, and we already know that .net plays very nicely in this context, so – how does that work and what can I do with it?

Looking back at my METAR processing sample, let’s imagine that we wanted to use Pig to query the results and the wanted to produce the average temperature per airfield in a given result set. sounds good?

Well – using standard Pig script we can produce the result set we want and then, given a variable RESULT, we could invoke a custom c# program on in using the following syntax (note: pay attention to the direction of the ‘tick’, it caught me!) :

grunt> AVERAGE = stream RESULT  through `c:\\test\\pigStreamSample.exe`;

grunt> dump AVERAGE;

In real life I would probably use DEFINE to alias the definition of the extension and use SHIP to point at the location of the executable  –

grunt> define average `pigStreamSample.exe` ship (‘c:\\test\\pigStreamSample.exe’);

grunt> B = stream A through average;

grunt> dump B;

the output of this would look as follows –

image

My initial result set contained 144 METAR records for 3 airfields, these have been averaged to produce 3 records using a simple c# program, but this program could have been doing anything. nice!

How does pigStreamSample.exe look like? well – it’s not pretty, but it’s good enough to demonstrate the work required –

class Program
    {
        static void Main(string[] args)
        {
            Stream stdin = Console.OpenStandardInput();
            Stream stdout = Console.OpenStandardOutput();
            StreamWriter outputWriter = new StreamWriter(stdout);
            StreamReader inputReader = new StreamReader(stdin);
            
            List<Result> results = new List<Result>();

            string line = inputReader.ReadLine();
            while (line != null)
            {
                //parse the input as received
                METAR metar = parseLine(line);
                //find if ICAO code already 'known'
                Result result = results.FirstOrDefault(r=>r.ICAO==metar.ICAO);
                if (result == null) //if ICAO code not alreayd known create a new one
                {
                    result = new Result(metar.ICAO);
                    results.Add(result);
                }
                result.count++;//increment count
                result.sum+=metar.Temperature;//add sum

                //outputWriter.WriteLine(line);
                line = inputReader.ReadLine();
            }

            foreach (Result r in results)
                outputWriter.WriteLine(string.Format("{0}\t{1}", r.ICAO, r.sum / r.count));//output the average per ICAO

            inputReader.Close();
            inputReader.Dispose();
            outputWriter.Close(); 
            outputWriter.Dispose();
        }

        private static METAR parseLine(string line)
        {
            string[] parts = line.Split('\t');
            METAR metar = new METAR();
            metar.ICAO = parts[0];
            metar.Date = parts[1];
            metar.Cloudbase = int.Parse(parts[2]) * 100;
            metar.Temperature = int.Parse(parts[3]);
            return metar;
        }
        
    }

with METAR being a simple class

    class METAR
    {
        public string ICAO { get; set; }
        public string Date { get; set; }
        public int Cloudbase { get; set; }
        public int Temperature { get; set; }
    }

and Result even simpler –

    class Result
    {
        public Result(string ICAO)
        {
            this.ICAO = ICAO;
            this.count = 0;
            this.sum = 0;
        }
        public string ICAO { get; set; }
        public int count { get; set; }
        public int sum { get; set; }
    }

Pig on HDInsight Server

@Slodge had prompted me to look in more detail into running Pig on HDInsight.

I’ve played with it the past using the interactive javascript console and the word count sample, but haven’t gone into much more detail and so I thought it would be nice to do something at the back of my ‘fancy’ aviation weather report using Pig and Hadoop on Windows.

In that previous post I described how I took semi-structured METAR reports, ran a M/R program on them to extract the cloudbase and temperature and then create a hive table on top; in this post I’ll use some basic Pig to examine the data in the hive table and extract the 10 reports with the highest cloudbase.

To get started I open a hadoop command shell and browse to c:\Hadoop\pig-0.9.3-SNAPSHOT\bin

I then run the pig.cmd which takes me to the grunt> prompt

image

to start with, I’ll simply read the contents of the table (as it’s not too big at this point) –

grunt>everything = LOAD ‘metarsoutput’;

grunt>dump everything;

and I get a bunch of results, here’s an extract –

image

To work with the results better I could provide details about the schema –

grunt>everything = LOAD ‘metarsoutput’ as (icao, datetime, cloudbase: int, temperature: int);

grunt> describe everything;

produces –

image

whilst

dump everything;

still produces the same results as before, but now I can ask the records to be sorted –

grunt> sorted = order everything by cloudbase desc;

grunt> dump sorted;

which produces a nicely ordered results –

image

I can also limit the number of records I want to get back –

grunt> top = limit sorted 10;

grunt> dump top;

image

 

Ok –so all of these are pretty basic examples, but as such show the basic operation of Pig on HDInsight.

To find out some more about what’s possible with PIG take a look here

@Slodge actually asked me about being able to run custom functions (UDFs) for Pig in c#, which is not currently possible, but Pig does support streaming, and that should provide a handy way ‘in’, which I’ll try to look at next.

%d bloggers like this: