Python For Data Science Cheat Sheet
PySpark - SQL Basics
Learn Python for data science Interactively at www.DataCamp.com
DataCamp
Learn Python for Data Science Interactively
Initializing SparkSession
Spark SQL is Apache Spark's module for
working with structured data.
>>> from pyspark.sql import SparkSession
>>> spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.cong("spark.some.cong.option", "some-value") \
.getOrCreate()
Creating DataFrames
PySpark & Spark SQL
>>> spark.stop()
Stopping SparkSession
>>> df.select("rstName", "city")\
.write \
.save("nameAndCity.parquet")
>>> df.select("rstName", "age") \
.write \
.save("namesAndAges.json",format="json")
From RDDs
From Spark Data Sources
Queries
>>> from pyspark.sql import functions as F
Select
>>> df.select("rstName").show() Show all entries in rstName column
>>> df.select("rstName","lastName") \
.show()
>>> df.select("rstName", Show all entries in rstName, age
"age", and type
explode("phoneNumber") \
.alias("contactInfo")) \
.select("contactInfo.type",
"rstName",
"age") \
.show()
>>> df.select(df["rstName"],df["age"]+ 1) Show all entries in rstName and age,
.show() add 1 to the entries of age
>>> df.select(df['age'] > 24).show() Show all entries where age >24
When
>>> df.select("rstName", Show rstName and 0 or 1 depending
F.when(df.age > 30, 1) \ on age >30
.otherwise(0)) \
.show()
>>> df[df.rstName.isin("Jane","Boris")] Show rstName if in the given options
.collect()
Like
>>> df.select("rstName", Show rstName, and lastName is
df.lastName.like("Smith")) \ TRUE if lastName is like Smith
.show()
Startswith - Endswith
>>> df.select("rstName", Show rstName, and TRUE if
df.lastName \ lastName starts with Sm
.startswith("Sm")) \
.show()
>>> df.select(df.lastName.endswith("th"))
\ Show last names ending in th
.show()
Substring
>>> df.select(df.rstName.substr(1, 3) \ Return substrings of rstName
.alias("name")) \
.collect()
Between
>>> df.select(df.age.between(22, 24)) \ Show age: values are TRUE if between
.show() 22 and 24
Running SQL Queries Programmatically
>>> df5 = spark.sql("SELECT * FROM customer").show()
>>> peopledf2 = spark.sql("SELECT * FROM global_temp.people")\
.show()
Add, Update & Remove Columns
>>> df = df.withColumn('city',df.address.city) \
.withColumn('postalCode',df.address.postalCode) \
.withColumn('state',df.address.state) \
.withColumn('streetAddress',df.address.streetAddress) \
.withColumn('telePhoneNumber',
explode(df.phoneNumber.number)) \
.withColumn('telePhoneType',
explode(df.phoneNumber.type))
>>> df = df.drop("address", "phoneNumber")
>>> df = df.drop(df.address).drop(df.phoneNumber)
>>> df = df.dropDuplicates()
>>> df = df.withColumnRenamed('telePhoneNumber', 'phoneNumber')
Duplicate Values
Adding Columns
Updating Columns
Removing Columns
JSON
>>> df = spark.read.json("customer.json")
>>> df.show()
+--------------------+---+---------+--------+--------------------+
| address|age|rstName |lastName| phoneNumber|
+--------------------+---+---------+--------+--------------------+
|[New York,10021,N...| 25| John| Smith|[[212 555-1234,ho...|
|[New York,10021,N...| 21| Jane| Doe|[[322 888-1234,ho...|
+--------------------+---+---------+--------+--------------------+
>>> df2 = spark.read.load("people.json", format="json")
Parquet files
>>> df3 = spark.read.load("users.parquet")
TXT files
>>> df4 = spark.read.text("people.txt")
A SparkSession can be used create DataFrame, register DataFrame as tables,
execute SQL over tables, cache tables, and read parquet files.
>>> from pyspark.sql.types import *
Infer Schema
>>> sc = spark.sparkContext
>>> lines = sc.textFile("people.txt")
>>> parts = lines.map(lambda l: l.split(","))
>>> people = parts.map(lambda p: Row(name=p[0],age=int(p[1])))
>>> peopledf = spark.createDataFrame(people)
Specify Schema
>>> people = parts.map(lambda p: Row(name=p[0],
age=int(p[1].strip())))
>>> schemaString = "name age"
>>> elds = [StructField(eld_name, StringType(), True) for
eld_name in schemaString.split()]
>>> schema = StructType(elds)
>>> spark.createDataFrame(people, schema).show()
+--------+---+
| name|age|
+--------+---+
| Mine| 28|
| Filip| 29|
|Jonathan| 30|
+--------+---+
Inspect Data
Sort
>>> peopledf.sort(peopledf.age.desc()).collect()
>>> df.sort("age", ascending=False).collect()
>>> df.orderBy(["age","city"],ascending=[0,1])\
.collect()
Missing & Replacing Values
>>> peopledf.createGlobalTempView("people")
>>> df.createTempView("customer")
>>> df.createOrReplaceTempView("customer")
Registering DataFrames as Views
Query Views
GroupBy
>>> df.na.ll(50).show() Replace null values
>>> df.na.drop().show() Return new df omiing rows with null values
>>> df.na \ Return new df replacing one value with
.replace(10, 20) \ another
.show()
>>> df.groupBy("age")\ Group by age, count the members
.count() \ in the groups
.show()
>>> df.describe().show() Compute summary statistics
>>> df.columns Return the columns of df
>>> df.count() Count the number of rows in df
>>> df.distinct().count() Count the number of distinct rows in df
>>> df.printSchema() Print the schema of df
>>> df.explain() Print the (logical and physical) plans
>>> df.dtypes Return df column names and data types
>>> df.show() Display the content of df
>>> df.head() Return first n rows
>>> df.rst() Return first row
>>> df.take(2) Return the first n rows
>>> df.schema Return the schema of df
Filter
>>> df.lter(df["age"]>24).show() Filter entries of age, only keep those
records of which the values are >24
Output
Data Structures
Write & Save to Files
>>> rdd1 = df.rdd Convert df into an RDD
>>> df.toJSON().rst() Convert df into a RDD of string
>>> df.toPandas() Return the contents of df as Pandas
DataFrame
Repartitioning
>>> df.repartition(10)\ df with 10 partitions
.rdd \
.getNumPartitions()
>>> df.coalesce(1).rdd.getNumPartitions() df with 1 partition