"""
A simple example demonstrating basic Spark SQL features using fictional
data inspired by a paper on determining the optimum length of chopsticks.
https://www.ncbi.nlm.nih.gov/pubmed/15676839
Run with:
./bin/spark-submit OptimumChopstick.py
"""
from __future__ import print_function
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.storagelevel import StorageLevel
#rdd.persist(StorageLevel.MEMORY_ONLY_SER)
# Get avg Food pinching effeciency by length
def AvgEffeciencyByLength(df):
meansDf = df.groupby('ChopstickLength').mean('FoodPinchingEffeciency').orderBy('avg(FoodPinchingEffeciency)',ascending=0)
return meansDf
# init
spark = SparkSession.builder.appName("Optimum Chopstick").getOrCreate()
sc = spark.sparkContext
input_loc = "s3://llubbe-gdelt-open-data/ChopstickEffeciency/"
# Read input by line
lines = sc.textFile(input_loc)
parts = lines.map(lambda l: l.split(","))
parts.persist(StorageLevel.MEMORY_ONLY_SER)
# Each line is converted to a tuple.
chopstickItems = parts.map(lambda p: (str(p[0]), float(p[1]), int(p[2]), int(p[3].strip())))
# Define a schema
fields = [StructField("TestID", StringType()),
StructField("FoodPinchingEffeciency", DoubleType()),
StructField("Individual", IntegerType()),
StructField("ChopstickLength", IntegerType())]
schema = StructType(fields)
# Apply the schema to the RDD
chopsticksDF = spark.createDataFrame(chopstickItems, schema)
effeciencyByLength = AvgEffeciencyByLength(chopsticksDF)
effeciencyByLength.distinct().count()
moar_chopsticksDF = spark.read.load(input_loc, format="csv", schema=schema)
moar_effeciencyByLength = AvgEffeciencyByLength(moar_chopsticksDF)
moar_effeciencyByLength.distinct().count()
spark.stop()
Comments