Analyzing the Amazon data set

Calculating the average rating for every item and the average item rating for all items.

Scala

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object AmazonAVG {
  def main(args: Array[String]) {

    val sc = new SparkContext(new SparkConf().setAppName("Spark Count").setMaster("local"))

    if(args.length < 1){
      println("No input file provided")
      System.exit(1)
    }

    val lines = sc.textFile(args(0))

    // do some standard mapping
    val rows = lines.map(a => a.split("\016"))
    val mappi = rows.map(x => (x(0),x))

    //var p2 = mappi.combineByKey(x => 0.0, (x:Double,v:Array[String]) => x, (v:Double,v2:Double) => v + v2

      // reduce to (rating, ratingsPerItem) -> calc avg
      var ratings = mappi.combineByKey(x => (x(6).toDouble, 1), (z:(Double,Int),v) => (z._1 + v(6).toDouble, z._2 + 1), (v:(Double, Int),v2:(Double,Int)) => (v._1 + v2._1, v._2 + v2._2)).mapValues(x => x._1 / x._2)

      // resolve tuple to avg
      val avgRating = ratings.mapValues("%1.2f".format(_))
      val totalMean = ratings.map(_._2).mean()

      println(totalMean)

    }

  }

Runtime for 5M entries: 30.66s

Python

import sys
from pyspark import SparkContext

sc = SparkContext("local")

if len(sys.argv) < 2:
    print("no input file specified")
    sys.exit(1)

inputFile = sys.argv[1]

print("input %s" % inputFile)

lines = sc.textFile(inputFile)
rows = lines.map(lambda x: x.split("\016"))
mappi = rows.map(lambda x: (x[0], x))

ratings = mappi.combineByKey(lambda x: (float(x[6]),1), lambda x,y: (x[0] + float(y[6]), x[1] + 1), lambda x,y: (x[0] + y[0], x[1] + y[1] )).mapValues(lambda x: x[0] / x[1])

avgRating = ratings.mapValues(lambda x: "{:1.2f}".format(x))
totalMean = ratings.map(lambda x: x[1]).mean()
print(totalMean)

Runtime for 5M entries: 56.96s

Analyzing Uniprot

Calculating the amino acid distribution

Scala

import scala.collection.mutable.ListMap

val counts = lines.flatMap(line => { val cells = line.split("Ä"); (cells(1).split("").zipWithIndex) })
val stat= counts.countByKey
val total = fo.foldLeft(0L)(_ + _._2)

val relStat= stat.mapValues(v => v / 1.0 / total)

val relStatSorted = new ListMap() ++ relStat.toList.sortBy(_._2)
val relStatFormatted = fo3.mapValues(relStatSorted => "%1.2f".format(x))
println(relStatFormatted.mkString(";"))

Runtime: 33.53s (547.085 entries)

Python

import sys
from pyspark import SparkContext
from collections import OrderedDict

sc = SparkContext("local")

if len(sys.argv) < 2:
    print("no input file specified")
    sys.exit(1)

inputFile = sys.argv[1]

print("input %s" % inputFile)

lines = sc.textFile(inputFile)


def countFlatter(line):
    cells = line.split(u"Ä")
    li = enumerate(list(cells[1]))
    return [(l[1], l[0]) for l in li]

counts = lines.flatMap(countFlatter)
stat = OrderedDict(sorted(counts.countByKey().iteritems()))
total = sum(stat.values())
relativeStat = {k: v / 1.0 / total for k, v in stat.iteritems()}

relativeStatForm = {k: "%.2f" % v for k, v in relativeStat.iteritems()}
print(relativeStatForm)

Runtime: 65.28s (547.085 entries)

Total mean in Amazon

Scala

val rows = sc.textFile(inputFile).map(x => x.split("\016"))
val totalMean = rows.map(x => x(6).toDouble).mean()

Python

rows = sc.textFile(inputFile).map(lambda x: x.split("\016"))
totalMean = rows.map(lambda x: float(x[6])).mean()

Java

import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.DoubleFunction;
import java.util.Arrays;
import java.util.List;

public class AmazonMean 
{
    public static void main(String[] args)
    {
        System.out.println(args[0]);
        JavaSparkContext sc = new JavaSparkContext();
        JavaRDD<String> file = sc.textFile(args[0]);
        JavaRDD<List<String>> lines = file.map(s -> Arrays.asList(s.split("\016")));

        JavaDoubleRDD ratings = lines.mapToDouble(new DoubleFunction<List<String>>()
        {
            public double call(List<String> s)
            {
                return Double.parseDouble(s.get(6));
            }
        });

        Double totalMean  = ratings.mean();
        System.out.printf("mean %.2f\n", totalMean); 
    }
}