Issue
Let's say i have a pandas dataframe of the following format which i already converted to string, since i dont want to define a schema for it, in order to be able to convert to pyspark df. Therefore I converted the dataframe like this:
train_pd = X_train_f.astype('string')
train_pd.info(verbose=True)
# Column Dtype
--- ------ -----
0 col1 string
1 col2 string
2 col3 string
3 col4 string
When I now run the following code i get the following error message.
training = spark.createDataFrame(train_pd)
TypeError: field col15: Can not merge type <class 'pyspark.sql.types.StructType'> and <class 'pyspark.sql.types.StringType'>
Why is that, I thought that by converting everything to string I would bypass the schema inference.
Sample Data
col0 col1 col2 col3 col4 col5 col6 col7 col8 col9 col10 col11 col12 col13 col14 col15 col16 col17 col18 col19 col20 col21 col22 col23 col24 col25 col26 col27 col28 col29 col30 col31 col32 col33 col34 col35 col36 col37 col38 col39 ... col355 col356 col357 col358 col359 col360 col361 col362 col363 col364 col365 col366 col367 col368 col369 col370 col371 col372 col373 col374 col375 col376 col377 col378 col379 col380 col381 col382 col383 col384 col385 col386 col387 col388 col389 col390 col391 col392 col393 col394
DUMMY DUMMY DUMMY DUMMY DUMMY DUMMY DUMMY DUMMY 1144418 0 1908 0 DUMMY DUMMY 50 <NA> <NA> 0 0 0001 DUMMY 2021-11-03 16:51:25 2021-11-03 17:23:13 04 <NA> <NA> <NA> DUMMY DUMMY DUMMY DUMMY DUMMY 7 DUMMY <NA> DUMMY DUMMY <NA> 30 4315 ... 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0
Solution
Run this script from Converting Pandas dataframe into Spark dataframe error before you run training = pandas_to_spark(train_pd)
:
from pyspark.sql.types import *
# Auxiliar functions
def equivalent_type(f):
if f == 'datetime64[ns]': return TimestampType()
elif f == 'int64': return LongType()
elif f == 'int32': return IntegerType()
elif f == 'float64': return FloatType()
else: return StringType()
def define_structure(string, format_type):
try: typo = equivalent_type(format_type)
except: typo = StringType()
return StructField(string, typo)
# Given pandas dataframe, it will return a spark's dataframe.
def pandas_to_spark(pandas_df):
columns = list(pandas_df.columns)
types = list(pandas_df.dtypes)
struct_list = []
for column, typo in zip(columns, types):
struct_list.append(define_structure(column, typo))
p_schema = StructType(struct_list)
return sqlContext.createDataFrame(pandas_df, p_schema)
Answered By - Luiz Viola
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.