final code
This commit is contained in:
parent
c67fca4f25
commit
eef27bd373
38341
BigBasket_Products.csv
Normal file
38341
BigBasket_Products.csv
Normal file
File diff suppressed because it is too large
Load Diff
29
main.py
29
main.py
@ -1,4 +1,3 @@
|
|||||||
from pyspark.sql import SparkSession
|
|
||||||
from pyspark.sql.functions import col
|
from pyspark.sql.functions import col
|
||||||
import argparse
|
import argparse
|
||||||
|
|
||||||
@ -8,17 +7,27 @@ def transform_data(data_source: str, output_uri: str) -> None:
|
|||||||
|
|
||||||
df.createOrReplaceTempView("bigbasket_products")
|
df.createOrReplaceTempView("bigbasket_products")
|
||||||
|
|
||||||
SQL_QUERY = """
|
SQL_QUERY = ["""
|
||||||
select product, count(*)
|
select avg(market_price) - avg(sale_price)
|
||||||
from bigbasket_products
|
from bigbasket_products
|
||||||
group by product
|
""",
|
||||||
"""
|
"""
|
||||||
|
select avg(rating)
|
||||||
transformed_df = spark.sql(SQL_QUERY)
|
from bigbasket_products
|
||||||
|
""",
|
||||||
print(f"Number of rows: {transformed_df.count()}")
|
"""
|
||||||
|
select avg(market_price)
|
||||||
transform_df.write.mode("overwrite").parquet(output_uri)
|
from bigbasket_products
|
||||||
|
""",
|
||||||
|
"""
|
||||||
|
select avg(sale_price)
|
||||||
|
from bigbasket_products
|
||||||
|
"""
|
||||||
|
]
|
||||||
|
for i in SQL_QUERY:
|
||||||
|
transformed_df = spark.sql(i)
|
||||||
|
transformed_df.show()
|
||||||
|
transformed_df.write.mode("overwrite").parquet(output_uri)
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
parser = argparse.ArgumentParser()
|
parser = argparse.ArgumentParser()
|
||||||
|
Reference in New Issue
Block a user