Hadoop and Map Reduce – A Small Introduction with Python : Part I

So, you want to know what is Hadoop and how can Python be used to perform a simple task using Hadoop Streaming?

HADOOP is a software framework that mainly is used to leverage distributed systems in an intelligent manner and also, perform efficiently operations on big datasets without letting the user worry about nodal failures (Failure in one among the ‘n’ machines performing your task). Hadoop has various components :

  • Hadoop Common – contains libraries and utilities needed by other Hadoop modules
  • Hadoop Distributed File System (HDFS) – a distributed file-system that stores data on commodity machines, providing very high aggregate bandwidth across the cluster
  • Hadoop YARN – a resource-management platform responsible for managing compute resources in clusters and using them for scheduling of users’ applications
  • Hadoop MapReduce – a programming model for large scale data processing.

I will walk you through the Hadoop MapReduce component. For further information on Map Reduce you can either Google. For now I will present to you a brief introduction of it.



What is MapReduce

To know it truly you have to understand your problem first. So, what kind of problems can Map Reduce solve ?

  • Counting occurrences of digits in a list of numbers
  • Counting prime numbers in a list
  • Counting the number of sentences in a text
  • Counting the Average of 10 million numbers in a database
  • Counting the name of all people belonging to a particular sales region

Do you think that these are trivial problems ? Yes, they appear as if they are but what if you have millions of records and time for processing the results is very important for you? Thinking again ? You’ll get your answer.
Not just time but multiple dimensions of a task are there and map reduce if implemented efficiently, can help you overcome the risks associated with processing that much data.

Okay, enough of what and why! Now ask me how !!!


A MapReduce ‘system’ consists of two major kinds of functions, namely the Map Function and the Reduce function (not necessarily with the same names but more often with the pre-decided intentions of acting as the Map and Reduce functions). So, how do we solve the simple problem of counting a million numbers from a list quickly and display their sum ? This is, let me tell you, a very long operation though simple (For a complex program in MapReduce using not Hadoop but mincemeat module please go throughthis.

In this particular example the Map Function(s) will go through the list of numbers and create a list(s) of key-value pairs of the format {m,1} for every number m that occur during the scan. The reduce function takes the list from the Map function(s) and forms a list of key-value pairs as {m,list(1+)}. 1+ means 1 or more occurrences of 1.

The complicated expression above is nothing but just the number m encountered in the scan(s) by the Map Function(s) and the 1’s in the value in the Reduce task appear as many times as the number was encountered in the Map Function(s). So, that basically means {m, number of times m was encountered in the Map Phase}.

The next step is to aggregate the 1’s in the value for every m. This means {m,sum(1s)}. The task is almost done now. All we got to do is just display the number and the corresponding sum of the 1s as the count of the number. But wait, still you don’t understand as why this is a big deal right? Anybody can do this. But hey! The Map Functions aren’t just there to take all your load and process alone all of it. Nope! there are in fact many instances of your Map Function working in parallel in different machines in your cluster (if it exists, else just multithread your program to create multiple instances of Map (but why should you when you have distributed systems). Every Map Function running simultaneously work on different chunks of your big list, hence, sharing the task and reducing processing time. See! What if you have a big cluster and many machines running multiple instances your Map Functions to process your list? It’s simple; your work gets done in no time!!! Similarly, the Reduce functions can also run in multiple machines but generally after sorting ( where your mincemeat or hadoop programs will first sort the say,m‘s and distribute distinct such m‘s to different reduce functions in different machines). So, even aggregation task gets quicker and you are ready with your output to impress your boss!

A brief outline of what happened to the list of numbers is as follows:

    1. Map functions counted every occurrence of every number m
    2. Map functions stored every number m in the form {m,1} - as many pairs for any number m
    3. Reduce functions collected all such {m,1} pairs
    4. Reduce functions converted all such pairs as {m,sum(1's)} - Only 1 pair for a number m
    5. Reduce functions finally displayed the pairs or passed it to the main function to display or process

In the part two of the tutorial I will explain how to install Hadoop and do the same program in python using Hadoop Framework.


For a similar program in mincemeat please go through :

import mincemeat

import sys

file = open(sys.argv[1], “r”)
#The data source can be any dictionary-like object
data = list(file)
file.close()
datasource = dict(enumerate(data))

def mapfn(k, v):
import hashlib
total = 0
for num in v.split():
condition = num.isdigit()
if condition:
yield ‘sum’, int(num)
if condition:
yield ‘sumsquares’, int(num)**2

yield ‘count’, 1

def reducefn(k, vs):
result = sum(vs)
return result

s = mincemeat.Server()
s.datasource = datasource
s.mapfn = mapfn
s.reducefn = reducefn

results = s.run_server(password=”changeme”)
sumn = results[“sum”]
sumn2 = results[“sumsquares”]
n = results[“count”]
variance = (n*sumn2 – sumn**2)/float(n**2)
stdev = variance**0.5
print “Count is : %d”%n
print “Sum is : %s” %sumn
print “Stdev : %0.2f”%stdev

Advertisements

A Simple MD5 Password Cracking Program using Python


While on my way to completion of my program in Information Systems at University of Cincinnati, I stumbled upon this very interesting assignment in my Cloud Computing course offered by the computer science department. It was a simple 4 or less character strings password breaker that attacks a given 32 or less characters’ hex string and provides the strings that are in its VALUE BUCKET. For example we have the sample execution :

Attacking d077f…

{‘found’: [‘cat’, ‘gkf9’]}

In the aforementioned example we are attacking the first 5 characters of a 32 digit hashed hex string where the values collide. That’s another topic of interest that I will discuss later.

The program uses mincemeat.py module from https://github.com/bigsandy/mincemeatpy. This is a Python 2.7.x Map Reduce library that can divide map and reduce tasks to distributed clients to make tasks faster. In my upcoming posts I will write about Map Reduce and Hadoop.

Logic

  • Generate all possible strings of size 1 to 4 using (0-9) and (a-z)
  • This can be done in various ways like using pre-built libraries or by some fresh logic like generating first the two character strings and then looping them and appending the same two character strings to them. Once ready, we can choose any series from the list starting with any value from 0 to z ,say , 0000 to 0zzz and consider their last three characters as another addition to our main list. Once done, we can take two character strings and append to the main list and finally, one character strings. This way, we have a total list generating all possibilities of strings form 1 to 4 characters of {0-9 and a-z} in any combination.

  • Build grains using modulus technique and send to map function.
  • In the original list ‘bigdata’ we find the length of the list as len(bigdata) and find all its factors. Once found, we can think of the possible number of clients that will execute the map functions and divide the list accordingly in a dictionary of lists , say, {[‘0’, ‘list-chunk1’],[‘1′,’list-chunk2’]…} and build a datasource dictionary using this to be sent to the servers.

  • Since the map function and reduce function cannot use global variables from the parent program we have to pass the input hashed hex string in the datasource itself by a simple technique of sending a dictionary within a dictionary. So, instead of {[‘0’, ‘list-chunk1’],[‘1′,’list-chunk2’]…} the datasource looks like {[‘0’, {‘d077f’,’list-chunk1′}],[‘1’,{‘do77f’,’list-chunk2′]}…} etc where every key value pair is being sent to a separate map function or a different client. This can be unwrapped in the map function to obtain the hashed hex string ‘d077f’ and the list that has to be hashed string wise to check if its first five characters match ‘d077f’ (example).

  • Send output from map to reduce function
  • If a match occurs, send the hashed query string ‘d077f’ (example) and the values that hash to it to reduce function.

  • Send output from reduce function to the parent program
  • If map sends a match, capture the results and aggregate all such results into a single list. Example, {‘d077f’, [‘cat’,’wtf’]} send it to the parent program.

  • Capture reduce functions output
  • Once the parent function receives data from the reduce function the data can be displayed.

Program : 

import hashlib
import string
import itertools
import sys
import mincemeat

inputx = sys.argv[1]
deadlist=[]
deadlist1=[]
deadlist2=[] #Final List
deadlist1string = []
deadlist2string = []
print "Attacking %s..."%sys.argv[1]
m = range(0,10)
for num in m:
	deadlist1string.append(str(num))		
for char in list(string.lowercase):
	deadlist1string.append(char)
for char in deadlist1string:
	for inchar in deadlist1string:
		#First two chars
		deadlist1.append(char)
		deadlist1.append(inchar)
		deadlist.append(''.join(deadlist1))
		deadlist1=[]

#second two chars
for stringx in deadlist:
	deadlist2string.append(stringx)
	for stringx2 in deadlist:
		deadlist2.append(stringx+stringx2)

length3char = len(deadlist2)/(36)
listFor3Digits = deadlist2[:length3char]

#print listFor3Digits
for stringx in listFor3Digits:
	singlestring = stringx
	deadlist2.append(''.join(list(singlestring)[-3:]))
deadlist2+=deadlist2string+deadlist1string

'''listx = []
haha = len(deadlist2)
for i in xrange(1,haha+1):
	if (haha%i == 0):
		listx.append(i)'''

bigdata = []
#print deadlist2[::5188]
for i in xrange(0,333):
		loldict = {}
		loldict[inputx] = deadlist2[(5188*i):(5188*(i+1))]
		bigdata.append(loldict)
		loldict = {}

datasource = dict(enumerate(bigdata))#333*5188 - 333 key value pairs where values are lists
#chunkData = list(itertools.islice(datasource.items(), 1,2)) 


def mapfn(k, v):
	for key in v.keys():
		for w in v[key]:
			if hashlib.md5(w).hexdigest()[:len(key)] == key:
				yield hashlib.md5(w).hexdigest()[:len(key)], w

def reducefn(k, vs):
	result = vs
	return result

s = mincemeat.Server()
s.datasource = datasource
s.mapfn = mapfn
s.reducefn = reducefn

results = s.run_server(password="changeme")
for mm in results:
	found = {}
	found["found"] = results[sys.argv[1]]
	print found
	found = {}