NLP with Lexis Nexis; One man’s trip into the Dark Heart of EMR pipeline builds. / by Tacuma Solomon

This is write up of my first NLP pipeline. For context, this initially started as a writeup for work, and I decided to repurpose it for my blog. It gives a lot of the context to steps involved in making a pipeline, challenges I’ve overcome, and an overview of the work I’d done. Enjoy!

What was the aim of the NLP project?

This project aim was to resolve ambiguity in how the LN data is processed. A previous engineer had worked on it who left recently, leaving no documentation.

The objective here was to understand how processes worked, identify and/or resolve problems, and to begin building out the pipeline so that we can at least achieve a milestone where stakeholders downstream can obtain processed data.

Hopefully.

How were things when we started?

There was a partially coded python script that we could potentially use to spin up an EMR (Elastic Map Reduce) cluster, which would run some Scala scripts that we had. This used the spark library to process LN XML files. Nothing automated or deployed, very touch and go.

What is the Lexis Nexis data?

LN data consists of two dumps of publisher data, sorted by publisher ID and in xml format. Each XML file contains several xml objects, with each object containing an instance of an article. These files live in a s3 bucket.

The EMR cluster would ingest and process these files, returning parquet files to a s3 bucket.

What were the steps of solving this? What did I do?

This was quite the learning experience for me. I chose this project because of the potential for growth and experience. It was my first time working with Scala, Spark, EMR clusters, XML, parquet files, or anything big data.

Here are the steps:

1. Getting the EMR cluster up and running for one publisher.

  • Making sure that Scala file were built and fat jar was sent to s3 bucket

Scala is similar to many programming languages in that it has its own build process. In order to run Scala code using Spark, it needs to be packaged with its dependencies into a jar file, also known as a ‘Fat Jar’. This fat jar lives in s3, where it is referenced by amazon EMR’s library code for use.

  • Making sure sbt requirements were in order

Scala, Spark and Hadoop consist of a complex web of dependencies that need to be managed properly in order to prevent build errors. Maven was really helpful for this. Specific versions of Spark are only compatible with specific versions of Hadoop. This can be particularly tricky in a local setup.

  • Setting up boto config

Boto is the library responsible for executing AWS in python. The particular facet of the library we used was boto’s emr client. The client uses a detailed config file for executing EMR jobs. Each publisher’s config contains a series of steps, each one representing a logical unit of work in the pipeline. These steps are:

  • S3 to HDFS

  • XML to parquet

  • NLP

  • Reprocessing

  • HDFS to S3

  • Having the script run for one Publisher

It seemed like the first logical step. Getting this up and running would require ensuring proper Scala build and boto configuration.

2. When trying to run multiple Publisher IDs, spark jobs weren’t being finished because of some sort of error in the pipeline

The run progressed to Kelly’s LN matching code, but the script would fail due to what we initially thought was an arbitrary Pandas Dataframe error. Somewhere in the script the code attempted to merge two Dataframes and would fail. They were empty, and we would have to work backward from there.

When individual steps in Spark jobs failed, debugging was difficult; we weren’t able to observe files as they were passed between steps. One method might have been to keep the cluster up and retrieve files but spinning up the cluster took 5 – 10 minutes per run, so we decided to build a local version instead.

3. What exactly was going wrong with the Spark Run? How were we able to investigate the pipeline?

Using the local setup of Spark, we were able to run the spark jobs, and note their intermediate outputs. Instead of copying the XML files to HDFS, they were now all local. We can now see what the output of one step was, which also served as input for the next.

I needed to find out what the discrepancies were between steps; why XML files for some publishers failed, while others didn’t. The Scala code is always the same, so the issue must have been with the format of the XML files.

I eventually found out that the body text wasn’t being extracted past the parsing stage for some publishers, which explained the lack of output after the NLP step; the parsed xml input.

But again, why the differences? I looked at the parsing more closely. Eventually we figured out that the schema used to parse the XML files was generated from inference. 

Given this method of providing the schema, there was always the possibility that inferred schema, while compatible for most XML files being publisher ids, would fail on a few. 

Now that we knew what this problem was, we put in a pin in things for now, and moved on to the next step

4. Running ln_code part 1 as an airflow task

This step involves running Kelly’s code in our workflow management system, airflow. This section was relatively simple; create a new airflow DAG, and using a KubernetesPodOperator, run the code in airflow. This code ingested Parquet files from s3, converted them to pandas dataframes using Pyarrow, and performed a number of functions to transform the data which was then output to a redshift table.

Things got hairy fast here though. Running the script in Kubernetes often failed for large publishers. We had to find a way around this.

One method we tried was increasing the memory size of the KubernetesPod to its maximum. It failed. Memory requirements were still too large.

We tried yet another trick. Code cleanup and manual garbage collection of pandas Dataframes

5. Code Reorganization, functionalization, Dataframe Mapping and Garbage Collection

The premise here was fairly simple. If we reorganized this code into functions, the flow would be easier to read. Functionalization would then allow us to understand how data is passed between functions, and between DataFrames.

 In knowing this, we can make decisions on which DataFrames were no longer utilized and were suitable for garbage collection.

The way we decided to monitor memory consumption was with the hpy library, which served to validate assumptions we made with memory use.

I rewrote the script, for a more streamlined flow via functionalization, creating a DataFrame dict as a class value that stored all of the DataFrames in the script. This provided one central data structure for all DataFrames in the script.

All references to DataFrames were then added to the ‘dataframe’ dict, their original names used as the dict key.

Next was mapping. For this step, I wrote out all of the DataFrames that were referenced per function in a text file, and from there, it was simple to know which DataFrame were being re-referenced in successive functions and which were not.

Finally, I dropped the no longer utilized DataFrames using the `del` command and garbage collected them with the gc.collect() function.

In the end, these steps worked. Our largest publisher, the New York Times, was successfully pushed through this task without memory failures.

Slack messaging and logging features were then added.

7. Adding another script to the pipeline

We later we found out we needed another script added. Not particularly big deal here; all we needed to do here was rejig how arguments were passed into the script and add logging

8. Automating scala SBT builds using circle CI

Here, I wrote a script in CircleCI, with a little help from their template and tutorials, so what whenever a branch was merged into the master repo where the scala project existed, the scala code would be built into a fat jar file and moved into a s3 bucket. I set aws permissions using circleCI’s built in UI feature to accept environment variables.

7. Automating EMR opeartions in Airflow

Again, this was a new one for me. Previously, I ran a the script that set the jobs and spun up the EMR cluster locally. To cap this project off, I wanted this portion of the pipeline to be automated. What we decided to do was to add these tasks in airflow using the following operators:

  • EmrCreateJobFlowOperator

  • EmrAddStepsOperator

  • EmrJobFlowSensor

  • EmrTerminateJobFlowOperator

These three operators served to spin up the cluster, add jobs in the form of steps, monitor the cluster as these steps executed, and terminate when the job is done.

I wanted to add slack messaging functionality during cluster monitoring as well the ability to parse environment variables from airflow. I created a subclass of EmrJobFlowSensor called RippleEmrJobFlowSensor, naming it after our product. I created logging handler and adapter classes that would output messages both to the console, and to slack.

This new logger is set in the new subclass, RippleEmrJobFlowSensor. Cluster updates are now sent to slack!

Once these were all set up, we were good to go!

Finishing this project was pretty gratifying. I took what previously was a bunch of scala scripts, and half of a python script, and transirmed the project into a pipeline that anyone can use. Hooray!





feature-image.png