Snippets Collections
val classes = Seq(
  getClass,                   // To get the jar with our own code.
  classOf[mysql.jdbc.Driver]  // To get the connector.
)
val jars = classes.map(_.getProtectionDomain().getCodeSource().getLocation().getPath())
val conf = new SparkConf().setJars(jars)
val columnsToSum = List(col("var1"), col("var2"), col("var3"), col("var4"), col("var5"))

val output = input.withColumn("sums", columnsToSum.reduce(_ + _))
ALTER TABLE table_identifier ADD [IF NOT EXISTS]
    ( partition_spec [ partition_spec ... ] )
"""
A simple example demonstrating basic Spark SQL features using fictional
data inspired by a paper on determining the optimum length of chopsticks.
https://www.ncbi.nlm.nih.gov/pubmed/15676839
Run with:
  ./bin/spark-submit OptimumChopstick.py
"""
from __future__ import print_function
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.storagelevel import StorageLevel
#rdd.persist(StorageLevel.MEMORY_ONLY_SER)
​
# Get avg Food pinching effeciency by length
def AvgEffeciencyByLength(df):
    meansDf = df.groupby('ChopstickLength').mean('FoodPinchingEffeciency').orderBy('avg(FoodPinchingEffeciency)',ascending=0)
    return meansDf
​
# init
spark = SparkSession.builder.appName("Optimum Chopstick").getOrCreate()
sc = spark.sparkContext
input_loc = "s3://llubbe-gdelt-open-data/ChopstickEffeciency/"
​
# Read input by line
lines = sc.textFile(input_loc)
parts = lines.map(lambda l: l.split(","))
parts.persist(StorageLevel.MEMORY_ONLY_SER)
# Each line is converted to a tuple.
chopstickItems = parts.map(lambda p: (str(p[0]), float(p[1]), int(p[2]), int(p[3].strip())))
​
# Define a schema
fields = [StructField("TestID", StringType()),
          StructField("FoodPinchingEffeciency", DoubleType()), 
          StructField("Individual", IntegerType()), 
          StructField("ChopstickLength", IntegerType())]
schema = StructType(fields)
​
# Apply the schema to the RDD
chopsticksDF = spark.createDataFrame(chopstickItems, schema)
​
effeciencyByLength = AvgEffeciencyByLength(chopsticksDF)
effeciencyByLength.distinct().count()
​
moar_chopsticksDF = spark.read.load(input_loc, format="csv", schema=schema)
moar_effeciencyByLength = AvgEffeciencyByLength(moar_chopsticksDF)
moar_effeciencyByLength.distinct().count()
​
spark.stop()
split_col = pyspark.sql.functions.split(df['my_str_col'], '-')
df = df.withColumn('NAME1', split_col.getItem(0))
df = df.withColumn('NAME2', split_col.getItem(1))
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('abc').getOrCreate()
star

Tue Aug 10 2021 13:28:39 GMT+0000 (Coordinated Universal Time) https://stackoverflow.com/questions/24916852/how-can-i-connect-to-a-postgresql-database-into-apache-spark-using-scala

#scala #spark #postgresql
star

Wed Aug 04 2021 09:11:31 GMT+0000 (Coordinated Universal Time) https://stackoverflow.com/questions/37624699/adding-a-column-of-rowsums-across-a-list-of-columns-in-spark-dataframe

#scala #spark
star

Sun Jul 25 2021 03:19:17 GMT+0000 (Coordinated Universal Time) https://docs.databricks.com/spark/latest/spark-sql/language-manual/sql-ref-syntax-ddl-alter-table.html

#delta #spark
star

Thu Apr 01 2021 17:59:16 GMT+0000 (Coordinated Universal Time)

#aws #emr #spark
star

Wed Feb 24 2021 17:36:03 GMT+0000 (Coordinated Universal Time) https://stackoverflow.com/questions/39235704/split-spark-dataframe-string-column-into-multiple-columns

#pyspark #spark #python #etl
star

Fri Sep 04 2020 05:19:26 GMT+0000 (Coordinated Universal Time) https://stackoverflow.com/questions/39780792/how-to-build-a-sparksession-in-spark-2-0-using-pyspark

#python #pyspark #spark #spark-session

Save snippets that work with our extensions

Available in the Chrome Web Store Get Firefox Add-on Get VS Code extension