init
This commit is contained in:
commit
c67fca4f25
29
main.py
Normal file
29
main.py
Normal file
@ -0,0 +1,29 @@
|
|||||||
|
from pyspark.sql import SparkSession
|
||||||
|
from pyspark.sql.functions import col
|
||||||
|
import argparse
|
||||||
|
|
||||||
|
def transform_data(data_source: str, output_uri: str) -> None:
|
||||||
|
with SparkSession.builder.appName("Big Basket Analysis").getOrCreate() as spark:
|
||||||
|
df = spark.read.option("header", "true").csv(data_source)
|
||||||
|
|
||||||
|
df.createOrReplaceTempView("bigbasket_products")
|
||||||
|
|
||||||
|
SQL_QUERY = """
|
||||||
|
select product, count(*)
|
||||||
|
from bigbasket_products
|
||||||
|
group by product
|
||||||
|
"""
|
||||||
|
|
||||||
|
transformed_df = spark.sql(SQL_QUERY)
|
||||||
|
|
||||||
|
print(f"Number of rows: {transformed_df.count()}")
|
||||||
|
|
||||||
|
transform_df.write.mode("overwrite").parquet(output_uri)
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
parser = argparse.ArgumentParser()
|
||||||
|
parser.add_argument('--data_source')
|
||||||
|
parser.add_argument('--output_uri')
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
transform_data(args.data_source, args.output_uri)
|
Reference in New Issue
Block a user