Azure Data Lake Custom Extractor

Posted by Alan Barr on Sun 19 August 2018

Recently I had someone ask me if I could help them with querying data out of Azure Data Lake using U-SQL. ADL serves as cloud file/database storage with a way to query massive amounts of that data. ADL is based on YARN the Enterprise Hadoop architecture. ADL features multiple ways to query the data primarily their own analytic language called U-SQL. It also features the ability to query with Apache Spark. The purpose of a big data query language is to facilitate runnning map reduce jobs across many files using many machines to utilize as much parallelism as possible. One weakness of ADL is that it is cloud only and seems to depend on Cosmos DB vs spinning up one's own local hadoop cluster.

U-SQL

According to the getting started documentation, "U-SQL is a language that combines declarative SQL with imperative C#". In practice it feels more like C# with some SQL semantics but if you are accustomed to any level of database utilities you will likely be disappointed. U-SQL also supports Python and R extensions though with limitations. This appears to be an alternative to using Spark(Scala/Java/Python) scripts.

Built Ins

Most of the built in tooling that comes with Azure Data Lake will accomplish what we are trying to achieve for typical log files. Using the Getting Started example of a common tweet log file with four columns we can extract the data and select what we want to output and use some functions to transform it.

 @t = EXTRACT date string
       , time string
       , author string
       , tweet string
 FROM "/input/MyTwitterHistory.csv"
 USING Extractors.Csv();

@res = SELECT author
            , COUNT(*) AS tweetcount
    FROM @t
    GROUP BY author;
OUTPUT @res TO "/output/MyTwitterAnalysis.csv"
ORDER BY tweetcount DESC
USING Outputters.Csv();

For the most part the built in tools will solve most of the common challenges. Except when your data does not conform to typical log structures such as stuffing a json object in a set of logs. Using the imperative C# tooling also comes in handy when cleaning up data if the datetime format is done in a non standard way.

The Challenge

For the most part U-SQL did not seem like an extreme amount to learn it definitely has its own concepts baked into it but most log files do not necessitate extra complication. The challenge we ran into came froma log file with an embedded json object with its own embedded giant xml string. My painpoint with many of the Azure tooling and cloud offerings come from design decisions around the C# ecosystem. Strings being UTF-16 and causing memory issues due to this size in ADL. Also binding redirects and how dlls are handled internally. I'm not sure how avoidable these things are but its pretty tedious to deal with.

Our original strategy to deal with a log file with an embedded json object was to use the TextExtractor to split on pipes and then use Microsoft.Analytics.Sample.Format library to grab and parse the json directly. It worked flawlessly until we ran into our giant objects. The provided json parsing library assumes to parse the entire object and luckily in our case we only want a section of the meta data. We found a stackoverflow post that had an answer to our problem. After implementing a solution it is clearer now that the only option available is updating the extractor to return a different kind of data. It does not appear to be a U-SQL specific command.

The Extractor

The main element that any extractor requires is implementing the IExtractor interface. Which requires a overridden Extract method.

    using Microsoft.Analytics.Interfaces;
    using Microsoft.Analytics.Interfaces.Streaming;
    using Microsoft.Analytics.Types.Sql;
    using System;
    using System.Collections.Generic;
    using System.IO;
    using System.Linq;
    using System.Text;
    using Newtonsoft.Json;
    using Newtonsoft.Json.Linq;
    using Utilities;

    namespace Utilities
    {
        //Use Atomic if we want to operate on the whole file vs line by line
        [SqlUserDefinedExtractor(AtomicFileProcessing = false)]
        public class CustomTextExtractor : IExtractor
        {
            //Contains the row
            private readonly Encoding _encoding;
            private readonly byte[] _row_delim;
            private readonly char _col_delim;

            public CustomTextExtractor(Encoding encoding = null, string row_delim = "\r\n", char col_delim = '|')
            {
                _encoding = Encoding.UTF8;
                _row_delim = _encoding.GetBytes(row_delim);
                _col_delim = col_delim;
            }

            public override IEnumerable<IRow> Extract(IUnstructuredReader input, IUpdatableRow output)
            {

                //Read the input line by line
                foreach (Stream current in input.Split(_row_delim))
                {
                    using (StreamReader streamReader = new StreamReader(current, _encoding))
                    {
                        string[] parts = streamReader.ReadToEnd()
                            .Trim().Split(new char[] { _col_delim }, StringSplitOptions.None);

                        int count = 0; // start with first column
                        foreach (string part in parts)
                        {
                            if (count == 3)
                            {
                                string json = part;
                                var log = JsonConvert.DeserializeObject<RootObject>(json);
                                var items = new SqlArray<string>(new string[] 
                                {
                                    log.header.appName,
                                    log.header.currentContext,
                                    log.header.documentID,
                                    log.header.endTime,
                                    log.header.errorCode,
                                    log.header.message,
                                    log.header.rootContext,
                                    log.header.server,
                                    log.header.startTime,
                                });

                                output.Set<SqlArray<string>>(count, items);
                                count += 1;
                            }
                            else
                            {
                                //    // keep the rest of the columns as-is
                                output.Set(count, part);
                                count += 1;
                            }

                        }
                    }
                    yield return output.AsReadOnly();
                }
                yield break;
            }

        }

    }

In our extract method we are using .Set on output to set the content of the column's value. In our logs we know the fourth column is always our json object. There are specifically supported data types that USQL is able to send back and a couple special ones are SqlArray and SqlMap (key value pair). Once this code is done we can compile it and grab the .dll and upload it to azure data lake. Another option is to use a codebehind essentially a file named similarly to our usql script but ending with .cs however this will upload the code on every run.

U-SQL Script

Our U-SQL script. I recommend downloading the Azure Data Lake tools and running the jobs locally at first as each run has a cost.

// A. CREATE ASSEMBLY: Register assemblies (if they do not already exist).
CREATE ASSEMBLY IF NOT EXISTS [Newtonsoft.Json] FROM @"adl://alandatalakeadls.azuredatalakestore.net/Newtonsoft.Json.dll";
CREATE ASSEMBLY IF NOT EXISTS [Microsoft.Analytics.Samples.Formats] FROM  @"adl://alandatalakeadls.azuredatalakestore.net/Microsoft.Analytics.Samples.Formats.dll";
DROP ASSEMBLY IF EXISTS [Utilities];
CREATE ASSEMBLY IF NOT EXISTS [Utilities] FROM @"adl://alandatalakeadls.azuredatalakestore.net/Utilities.dll";
// B. REFERENCE ASSEMBLY: Load assemblies for compile time and execution.
REFERENCE ASSEMBLY [Newtonsoft.Json];
REFERENCE ASSEMBLY [Microsoft.Analytics.Samples.Formats];
REFERENCE ASSEMBLY [Utilities];

// C. USING: Specify namespace to shorten function names (e.g. Microsoft.Analytics.Samples.Formats.Json.JsonExtractor)
USING Microsoft.Analytics.Samples.Formats.Json;

// 1. Initialise variables for Input (e.g. JSON) and Output (e.g. CSV).
DECLARE @InputFile string = @"adl://alandatalakeadls.azuredatalakestore.net/BreakDownOfCode.txt";
DECLARE @OutputFile string = @"adl://alandatalakeadls.azuredatalakestore.net/output.csv";

@RawExtract  = EXTRACT 
        Date string,              
        Status  string,           
        PriceNotification  string,
        header SqlArray &lt;string&gt;
FROM @InputFile 
USING new Utilities.CustomTextExtractor(col_delim:'|',row_delim:"\n");

//Change the data in some way if we need to such as if an item is null coalesce to an empty string. Call C# methods.
@IntermediaryData =
SELECT
    Date,
    Status,
    PriceNotification,
    header
FROM @RawExtract;

@OutputData =
SELECT
    Date,
    Status,
    PriceNotification,
    header[0] AS appName,
    header[1] AS currentContext,
    header[2] AS documentID,
    header[3] AS endTime,
    header[4] AS errorCode,
    header[5] AS message,
    header[6] AS rootContext,
    header[7] AS server,
    header[8] AS startTime 
FROM @IntermediaryData;

OUTPUT @OutputData
TO @OutputFile
USING Outputters.Csv(outputHeader:true, quoting : false);

The built in tools and the provided JSON user defined functions in Samples.Formats especially JsonTuple can do most of the heavy lifting. Writing a custom extractor is not much extra work and may be necessary depending on how funky your logs are.