Feeds:
Posts
Comments

Posts Tagged ‘MapReduce’

Let’s see how we can model some regularly used SQL queries using map reduce.

  • select … from … where …
Take following example
select f1, f2 from relation where f1 > 500

For this example let’s assume a suitable InputFormat (in case of Hadoop) does the reading from the database and emit key value pairs in the form (k, rec) where k is primary key and rec is the entire record. Pseudo code using map reduce is given below.

map (k, rec)  {
   if (rec.f1 > 500) {
      rec1 = <rec.f1, rec.f2>
      collect (k , rec1)
   }
}

As can be seen this is implemented using a map function. Output will be emitted only if predicate is satisfied.

  • select aggregate_func() from … where … groupby …
Let’s take the following example.
select f3, sum(f1), avg(f2) from relation where f1 > 500 groupby f3
The pseudo-code below describes how this is achieved using map reduce.
map (k, rec)  {
   if (rec.f1 > 500) {
      rec1 = <rec.f1, rec.f2, rec.f3>
      collect (rec.f3 , rec1)
   }
}
reduce(v, list<rec1>) {
   sum := 0
   avg := 0
   for each rec1 in list {
      sum += rec1.f1
      avg += rec1.f2
   }
   avg := avg / size(list)
   rec2 = <rec1.f3, sum, avg>
   store (v, rec2)
}

Here each v that reduce gets corresponds to a unique value in rec1.f3 field. Group by is implicitly done using the shuffling phase between map and reduce functions.

  • select aggregate_func() from … where … groupby … having …

Here additional having clause is used to filter out grouped results. Let’s take an extended version of earlier example.

select f3, sum(f1), avg(f2) from relation where f1 > 500 groupby f3 having avg(f2) > 50

No change is required in the map function. The modified reduce function is as below.

reduce(v, list<rec1>) {
   sum := 0
   avg := 0
   for each rec1 in list {
      sum += rec1.f1
      avg += rec1.f2
   }
   avg := avg / size(list)
   rec2 = <rec1.f3, sum, avg>
   if (avg > 50) {
      store (v, rec2)
   }
}

Read Full Post »