Server log parsing with Spark and schema extraction from query string parameters

I needed to parse server logs and create Spark DataFrames to query information from the query string parameters. My naive version kept throwing errors about mismatched number of fields in schema and those in the row being queried.

It turns out I was dealing with over 350 different query string params across the logs. This could change over time and there was no way I was going to add these programmatically by hand.

The technique I used was to in the first pass, parse out all the various parameters and then as a second job, make sure that every row had all the fields that were present. I am not sure if there is a different Spark way of doing this.

Here is what worked in my particular case.

Parse the logs normally using a regex to split the fields and parameters normally

from pyspark.sql import SQLContext, Row
from pyspark.sql.types import *
from pyspark import SparkContext

sc = SparkContext("local", "Parse Logs")
sqlContext = SQLContext(sc)

def parseSchema(line):
 return (' '.join(line.asDict().viewkeys())).split()
def parseLogs():
    """ Read and parse log file """
    parsed_logs = (sc
    schema = (access_logs
                .map(lambda s: (s,1))
                .reduceByKey(lambda a, b: a + b)
    return parsed_logs, schema

Make sure the parsed logs are cached by using the cache() directive. The parseSchema method takes each key value pair in the parsed_logs RDD Row.

The schema RDD accumulates all possible parameter keys as it processes each line of the log file, the reduceByKey action builds the final list with counts of occurences of each parameter.

As a next step – once we have all the possible keys parsed out, we create a dictionary of all possible parameters via dictionary comprehension.

parsed_logs, access_logs, failed_logs, schema = parseLogs()
schemaDict = {pair[0]: '' for pair in schema.collect()}

The next step is to modify the log Rows so that each row has all the possible elements, albeit empty for ones that don’t exist in that particular row.

By this point we have (1) all the rows parsed, rows have a variable number of parameters and (2) the entire schema in a dicitonary.
The tricky bit here is to realize that since Spark is splitting the work to multiple workers on different Hadoop nodes, the schema will need to be somehow shared. The Spark way of doing this is via BroadcastVars. These are read-only serializable values that are cachedĀ on each worker node. To do this we create a new BroadcastVar with with schema dictionary we created above.

broadcastVar = sc.broadcast(schemaDict)

The final job is to enrich the log Rows with all the missing parameters so that the DataFrame can be created.

def enrichRow(row):
    """Add missing fields for DataFrame to work
    rowdata = row.asDict()
    newRow = broadcastVar.value.copy()
    return (Row(**newRow))

def finalLogs():
    final_logs = (access_logs
    return final_logs

final_logs = finalLogs()

At this point we have (1) The parsed rows where every row has all the possible parameters ready to create a DataFrame.

schemaAccess = sqlContext.createDataFrame(final_logs)


res = sqlContext.sql("select client_identd, endpoint, count(*) from access group by client_identd, endpoint order by count(*) desc")

You will notice that we are letting Spark infer the schema itself by parsing the first row typically. If there are Types that are specific, you will need to create those as StructType and StructField constructs.

Note that this not production ready code and mostly explains the concept behind creating DataFrames from data that may not be complete.


LogParser – Log extraction and visualizing

Or How to use the Log Parser to create a message sequence chart from protocol logs.

My call processing application generates copious amounts of logs that are irreplaceable in debugging problems with the system but its a pain to read and sift through thousands of lines to track logs for a particular call.
Log Parser is a great but overlooked tool that can help. Although there is a wide variety of built in input log formats that LogParser knows about, I found the CSV format to be more most convenient for my purpose.
I also found this Log Parser ToolkitBORDER=0 book to be a great resource for real world examples, diving right in.

An excerpt version of my logfile shows comma separated fields of information, I have added the header row for log parser to parse out the fields.

Time, Type, Port, Direction, Message, Extra
05:28.2,PROTO,0x410113fa , Recv, INVITE, From 4152484012 To 8886938437 IIDigits 0 CallId 5a87905440af7d8d2d677b8f6a4999ab@ User2 is 0
05:28.2,PROTO,0x410113fa , Sent, 200 OK for INVITE, User2 is 0
05:28.4,FYI,0x410113fa ,, Playing prompt(s) c:\hello\Ivr\Prompts\NonWorkingNumber.pcm^,
05:28.4,PROTO,0x410113fa , Recv, ACK, User2 is 0
05:29.7,PROTO,0x410113fa , Recv, BYE, No Contents
05:29.7,PROTO,0x410113fa , Sent, 200 OK, for BYE
05:29.8,FYI,0x410113fa ,, Prompt play canceled,
08:14.6,FYI,0xffffffff , Initiating call to route 7, IP Address, Port 5060
08:14.6,PROTO,0x43f6e1a4 , Sent, INVITE, to User To – 14152484155 From – 8009751379 UserInfo – none CallId – 127bd67abc760140@REVWQURNSU4wMi5kZXYua2Vlbi5jb20.. User2 is 0
08:14.6,PROTO,0x43f6e1a4 , Recv,100, for INVITE
08:21.1,PROTO,0x43f6e1a4 , Recv,183, for INVITE
08:21.1,FYI,0x43f6e1a4 ,, Send Port Change of State. User2 is 0 ,
08:29.4,PROTO,0x43f6e1a4 , Recv,200, for INVITE
08:29.4,PROTO,0x43f6e1a4 , Sent, ACK, User2 is 0

The application dumps PROTO as part of the log statement where the SIP message is received or sent. We’ll use this field to identify the interesting lines.
Logparser lets you search log files by writing a SQL type query and the one I use looks something like this.

case Direction
when ‘Sent’ then ”
else ‘ ‘
end as Direction,
from input.csv to msc.out
where Type = ‘PROTO’

To generate a nice looking message sequence chart, I use the TPL output type and use this template file.


<TH align=”left”>Carrier</TH>
<TH align=”left”><B>Switch</B></TH>

<TD align=”center” colspan=”2″><TT>%Message%<BR>%Direction%</TT></TD>



Now all we have to do is C:\> logparser file:input.sql -i:CSV -o:TPL -tpl:html.tpl and here is what the output comes out looking like.

—————–> From 4152484012 To 8886938437 IIDigits 0 CallId 5a87905440af7d8d2d677b8f6a4999ab@ User2 is 0
200 OK for INVITE
User2 is 0
—————–> No Contents
200 OK
<—————– for BYE
—————–> for INVITE
—————–> for INVITE
<—————– User2 is 0


Finding with Find

Find is a really versatile utility that can be used to enumerate files of different types, narrow the list by file types, dates, sizes, access times and a whole list of expressions. The output can be formatted with various switches to be csv.

My goal was to list the sizes and access times for all video files in the system. I knew that there was over 3 TB of files but not how recently these were accessed/played. I also needed to know at what rate these files were being added to the system.

Here is a simple expression to list all these.

find . -iname '*.mp4' -print

This will print the names on each line. A good starting point for my list.

The -exec switch will let you execute another utility on each file that you enumerate with find. The ‘{}’ is substituted with the filenames that are found.

find . -iname '*.mp4' -exec stat --printf="%n, %s, %y, %x\n" '{}' \; | gawk '{ split($0, a, "_"); print $0,",", a[4] }'

The –printf allows me to format the output; the %n prints the name %s prints the size and %y and %x print the access and create dates.
The gawk lets me further split the name since the files are named a certain way to identify different types of videos.

Pretty powerful stuff – all in one line.
Gotta know your shell utilities.

Once I had the information into a CSV file – I opened it up in Excel and added a pivot table to see the rate at which these were added by grouping by month and quarter.