Azure Data Factory– using FTP server as input

A customer I’m working with is looking to put in place a process to reconcile large invoices (=several million line items) from various suppliers. 

The process requires obtaining the suppliers invoices (typically from an FTP server) and matching it against the customer’s own data, flagging any exceptions.

The approach I wanted to look into is how to use Azure Data Factory (ADF) to obtain both the invoices and the customer data, store a snapshot of both data sets in blob storage, use HDInsight to perform the reconciliation and push the outcome (invoices to pay and list of line item exceptions) back to a local database.

Reading and writing customer data is easily done using an on-premises data gateway, however at this point in time ADF does not have built in capability to obtain data from an FTP server location.

It does, however, support custom .net activities so I sat down to create one but before I go into the details of the (very simple!) implementation, it would be useful to explain the context, and for simplicity I’ll describe a simple set-up that simply uses the activity to download a file on an FTP server to blob storage.

The input table

A pipeline in ADF requires at least one input table. In this scenario that input table is largely irrelevant as the real input data is what I’ll obtain from the FTP server

To satisfy ADF, I’ve created a table that points at a blob I’ve created in my storage account. The blob’s contents aren’t actually important, I just have the digit ‘1’ and as I expect my process to run daily I set the table’s availability to 1 Day. A crucial configuration here, though, is to set the table’s waitOnExternal to an object (can be empty) which will indicate to ADF that the blob is created outside ADF.

{
    "name": "1DayTrigger",
    "properties": {
        "structure": [
            {
                "position": 0,
                "name": "dummy",
                "type": "Int"
            }
        ],
        "published": false,
        "location": {
            "type": "AzureBlobLocation",
            "fileName": "trigger",
            "folderPath": "trigger",
            "format": {
                "type": "TextFormat"
            },
            "linkedServiceName": "adftstorage"
        },
        "availability": {
            "frequency": "Day",
            "interval": 1,
            "waitOnExternal": {}
        }
    }
}

The output table

I’ve then created my output table; in my example it has a couple of properties, stored in the same blob account (but I’m creating a folder structure that uses the supplier name and the date the invoice was processed in, and, crucially set the waitOnExternal to null and the availability to daily. this availability will drive the pipeline execution and with it my custom activity

{
    "name": "DailyDownloadedInvoiceSupplier1",
    "properties": {
        "structure": [
            {
                "position": 0,
                "name": "ItemNumber",
                "type": "Int"
            },
            {
                "position": 0,
                "name": "Price",
                "type": "Decimal"
            }
        ],
        "published": false,
        "location": {
            "type": "AzureBlobLocation",
            "fileName": "invoiceDetailSmall.csv",
            "folderPath": "invoices/supplier1/{Slice}",
            "format": {
                "type": "TextFormat",
                "columnDelimiter": ","
            },
            "partitionedBy": [
                {
                    "name": "Slice",
                    "value": {
                        "type": "DateTime",
                        "date": "SliceStart",
                        "format": "yyyyMMddHH"
                    }
                }
            ],
            "linkedServiceName": "adftstorage"
        },
        "availability": {
            "frequency": "Day",
            "interval": 1,
            "waitOnExternal": null
        }
    }
}

The pipeline

Finally, I created a pipeline that uses these tables as input and output and contains my custom activity –

{
    "name": "GetInvoiceSupplier1",
    "properties": {
        "description": "GetInvoiceData",
        "activities": [
            {
                "type": "DotNetActivity",
                "transformation": {
                    "assemblyName": "ADF.FTP.dll",
                    "entryPoint": "ADF.FTP.FtpGet",
                    "packageLinkedService": "adftstorage",
                    "packageFile": "packages/ADF.FTP.zip",
                    "extendedProperties": {
                        "SliceStart": "$$Text.Format('{0:yyyyMMddHH-mm}', Time.AddMinutes(SliceStart, 0))",
                        "Supplier": "Supplier1",
                        "FtpServer": "yossi.cloudapp.net",
                        "FtpPath": "invoicedetailtiny.csv",
                        "FtpUser": "yossidahan",
                        "FtpPassword": "SomePassword"
                    }
                },
                "inputs": [
                    {
                        "name": "1DayTrigger"
                    }
                ],
                "outputs": [
                    {
                        "name": "DailyDownloadedInvoiceSupplier1"
                    }
                ],
                "policy": {
                    "timeout": "00:30:00",
                    "concurrency": 1,
                    "retry": 3
                },
                "name": "FtpGet",
                "linkedServiceName": "OnDemandHadoop"
            }
        ],
        "start": "2015-03-18T00:00:00Z",
        "end": "2015-03-28T00:00:00Z",
        "isPaused": false,
        "hubName": "datafactoryyd_hub"
    }
}

The input and output for the pipeline are configured to the tables created in the previous steps. I’ve added a policy to indicate a concurrency of 1 and 3 retries, 30 minutes apart

The diagram view for this pipeline looks like this –

image

Back to the pipeline itself, the most interesting thing in this simple pipeline is, of course, the configuration of the custom activity –

The custom activity is developed as a .net class library which is then zipped and uploaded to a linked storage account. the transformation section of the activity configuration provides the details of where to find the package(being a ZIP file containing the contents of the Debug or Release folder of my built project) and how to run it – the name of the storage linked service and the path to the package file in it, the name of the actual assembly within the package and the entry point to call (fully qualified name of the class)

The extended properties section includes a set of ‘custom’ properties I’ve defined which I need within my components, name – the connection details of the FTP server and the path to the file download.

The only other important thing to note is that, this custom activity is going to run on an HDInsight cluster, provided by the second linked service configuration, outside the transformation object.

The cluster can be an already-provisioned cluster or a configured linked service of an on-demand cluster.

In the latter case, in the custom activity, there are two linked services configured – the one within the transformation configuration is a storage account where to look for the package. The other is at the end of the activity configuration and that is a link to an HDInsight cluster, which can be either an already-provisioned one or an on-demand cluster 

In the latter case ADF will provision a cluster on schedule deploy the activity to it and schedule its execution. the cluster will be destroyed when there’s no more activity on it but, crucially, this means that if I have further processing steps (as I will n the real implementation) this can still run on the same provisioned cluster and ADF will only de-provision it once there are no more steps to execute on it.

The custom activity

So – what actually goes into the ADF.FTP package?

Well – in this sample case this is a simple class library produced more or less by following the tutorial.

I’ve created a class that implements IDotNetActivity’s only method – Execute – which accepts 4 parameters – 2 lists for the input  and output tables, a dictionary for the extended properties and a logger and returns a dictionary back.

The input table is the collection of inputs to the pipeline, in my case a single table pointing at the dummy blob and cab pretty much be ignored.

Similarly, the output table is a collection of the outputs to the pipeline. For simplicity I’m assuming a single blob output, more on this shortly.

The extended properties dictionary is a set of key value pairs providing the items I’ve configured in the pipeline.

The logger is straight forward – it allows writing log entries which, for each run, its contents can be access via the run details (in the diagram view – double click on the output table, select a completed (or failed) time slice, select the relevant run activity and then the user-0.log file to view its contents

image

(the file is actually in the blob storage linked to, in my case, the on-demand HDInsight cluster, in a contained called adfjobs)

The actual FTP code I wanted to get to looks like this  –

//create FTP request
FtpWebRequest request = (FtpWebRequest)WebRequest.Create(ftpfileUrl);
request.Method = WebRequestMethods.Ftp.DownloadFile;
request.Credentials = new NetworkCredential(username, password);

//get FTP stream
using (FtpWebResponse ftpResponse = (FtpWebResponse)request.GetResponse())
{
    Stream responseStream = ftpResponse.GetResponseStream();

    //upload FTP stream to blob
    outputBlob.UploadFromStream(responseStream);
}

In this example I’m getting the FTP input stream and using that directly to upload to the blob stream.

This is quite efficient but also drive the restriction to having only 1 output table. In any real code I’d check if I have more than one outputs and if so I’d have to read the contents of the FTP file into memory (or potentially local disk) before uploading to multiple destinations.

To get to the FTP stream I needed the properties I’ve configured in the pipeline, so this code was prefixed with

//read the expected custom properties from the extendedProperties dictionary
string ftpfileUrl;
string username;
string password;
readExtendedProperties(extendedProperties, logger, out ftpfileUrl, out username, out password);

with readExtendedProperties simply being –

private static void readExtendedProperties(IDictionary<string, string> extendedProperties, 
    IActivityLogger logger, out string ftpfileUrl, out string username, out string password)
{
    ftpfileUrl = string.Format("ftp://{0}/{1}", extendedProperties["FtpServer"], extendedProperties["FtpPath"]);
    logger.Write(TraceEventType.Information, "Ftp downloading from url {0}", ftpfileUrl);

    username = extendedProperties["FtpUser"];
    password = extendedProperties["FtpPassword"];
    logger.Write(TraceEventType.Information, "using credentials: {0}/{1}", username, password);
}

The other piece needed, before running the FTP code above is to be able to connect to the output blob storage.

The activity receives the output table as a list of ResovedTable. As discussed, in my case I check to confirm there’s only one and extract the first item in the list. I then have the same method outlined in the tutorial to extract the storage connection string from the ResovledTable’s Linked Service –

in my main function I call

//as we only have one output table, extract it from the list
ResolvedTable outputTable = outputTables.First();
//get the blob storage connection string from the linked service configuration
outputStorageConnectionString = GetConnectionString(outputTable.LinkedService);

In GetConnectionString if the output table is indeed an AzureStorageLinkedService it will have a ConnectionString I can return –

private static string GetConnectionString(LinkedService asset)
{
    AzureStorageLinkedService storageAsset;
    if (asset == null)
    {
        return null;
    }

    storageAsset = asset.Properties as AzureStorageLinkedService;
    if (storageAsset == null)
    {
        return null;
    }

    return storageAsset.ConnectionString;
}

I then follow the same approach to extract the output path from the outputTable’s Table configuration

folderPath = GetFolderPath(outputTable.Table);

with the implementation being

private static string GetFolderPath(Table dataArtifact)
{
    AzureBlobLocation blobLocation;
    if (dataArtifact == null || dataArtifact.Properties == null)
    {
        return null;
    }

    blobLocation = dataArtifact.Properties.Location as AzureBlobLocation;
    if (blobLocation == null)
    {
        return null;
    }

    return blobLocation.FolderPath;
}

And that is pretty much it…putting it all together, adding a few validation rules and log lines here and there and I’ve got a rudimentary FTP download activity for ADF. 

After building the project I zipped up the contents of the Debug folder and uploaded the ZIP to the container configured in the pipeline and when the pipeline ran I could see the contents of the file in the FTP server copied into my blob storage as requested.

Where to go from here?

There are a few things I’d add in real implementation – I’ve already discussed supporting multiple outputs, I think this would be quite important. In theory one could also support multiple output types, but actually I think that moving to blob storage and then using built-in ADF activities such as the copy activity makes most sense.

I also think that adding a property indicating whether the file on the FTP server should be deleted after the transfer completed could be useful if the source system cannot adhere to time-based paths (or otherwise  the time slice which I do include in the extended properties needs to be used when resolving the FTP path)

It might also be important to put in place something to confirm the file on the FTP server is fully written. many implementation that rely on FTP use a second flag file to indicate the main file is ready.