Issue
I am new to Python and DataFrame. Here I am writing a Python code to run an ETL job in AWS Glue. Please find the same code snippet below.
test_DyF = glueContext.create_dynamic_frame.from_catalog(database="teststoragedb", table_name="testtestfile_csv")
test_dataframe = test_DyF.select_fields(['empid','name']).toDF()
now the above test_dataframe is of type pyspark.sql.dataframe.DataFrame
Now, I need to loop through the above test_dataframe. As far as I see, I could see only collect
or toLocalIterator
. Please find the below sample code
for row_val in test_dataframe.collect():
But both these methods are very slow and not efficient. I cannot use pandas as it is not supported by AWS Glue.
Please find the steps I am doing
source information:
productid|matchval|similar product|similar product matchval
product A|100|product X|100
product A|101|product Y|101
product B|100|product X|100
product C|102|product Z|102
expected result:
product |similar products
product A|product X, product Y
product B|product X
product C|product Z
This is the code I am writing
- I am getting a distinct dataframe of the source with productID
Loop through this distinct data frame set
a) get the list of matchval for the product from the source
b) identify the similar product based on matchval filters
c) loop through to get the concatinated string ---> This loop using the rdd.collect is affecting the performance
Can you please share any better suggestion on what can be done?
Solution
please elaborate what logic you want to try it out. DF looping can be done via SQL approach or you can also follow below RDD approach
def my_function(each_record):
#my_logic
#loop through for each command.
df.rdd.foreach(my_function)
Added following code further based on your input
df = spark.read.csv("/mylocation/61250775.csv", header=True, inferSchema=True, sep="|")
seq = ['product X','product Y','product Z']
df2 = df.groupBy("productid").pivot("similar_product",seq).count()
+---------+---------+---------+---------+
|productid|product X|product Y|product Z|
+---------+---------+---------+---------+
|product B| 1| null| null|
|product A| 1| 1| null|
|product C| null| null| 1|
+---------+---------+---------+---------+
The final approach which match your requirement
df = spark.read.csv("/mylocation/61250775.csv", header=True, inferSchema=True, sep="|") df.printSchema()
>>> df.printSchema()
root
|-- id: string (nullable = true)
|-- matchval1: integer (nullable = true)
|-- similar: string (nullable = true)
|-- matchval3: integer (nullable = true)
from pyspark.sql.functions import concat_ws
from pyspark.sql.functions import collect_list
dfx = df.groupBy("id").agg(concat_ws(",", collect_list("similar")).alias("Similar_Items")).select(col("id"), col("Similar_Items"))
dfx.show()
+---------+-------------------+
| id| Similar_Items|
+---------+-------------------+
|product B| product X|
|product A|product X,product Y|
|product C| product Z|
+---------+-------------------+
Answered By - Data Engineering Simplified
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.