pyspark-java.lang.IllegalStateException: Input row doesn't have expected number of values required by the schema

I suppose your name field has comma in it, so its splitting this also. So its expecting 7 columns

There might be some malformed lines.

Please try to use the code as below to exclude bad record in one file

val df = spark.read.format(“csv”).option("badRecordsPath", "/tmp/badRecordsPath").load(“csvpath”)

//it will read csv and create a dataframe, if there will be any malformed record it will move this into the path you provided.

// please read below

https://docs.databricks.com/spark/latest/spark-sql/handling-bad-records.html

I found the issue- it was due to one bad record, where comma was embedded in string. And even though string was double quoted, python splits string into 2 columns. I tried using databricks package

# from command prompt
pyspark --packages com.databricks:spark-csv_2.10:1.4.0

# on pyspark 
 schema1 = StructType ([ StructField("id",IntegerType(), True), \
         StructField("cat_id",IntegerType(), True), \
         StructField("name",StringType(), True),\
         StructField("desc",StringType(), True),\
         StructField("price",DecimalType(), True), \
         StructField("url",StringType(), True)
         ])

df1 = sqlContext.read.format('com.databricks.spark.csv').schema(schema1).load('/user/maria_dev/spark_data/products.csv')
        df1.show()
df1.show()
    +---+------+--------------------+----+-----+--------------------+
    | id|cat_id|                name|desc|price|                 url|
    +---+------+--------------------+----+-----+--------------------+
    |  1|     2|Quest Q64 10 FT. ...|    |   60|http://images.acm...|
    |  2|     2|Under Armour Men'...|    |  130|http://images.acm...|
    |  3|     2|Under Armour Men'...|    |   90|http://images.acm...|
    |  4|     2|Under Armour Men'...|    |   90|http://images.acm...|
    |  5|     2|Riddell Youth Rev...|    |  200|http://images.acm...|

df1.printSchema()
    root
     |-- id: integer (nullable = true)
     |-- cat_id: integer (nullable = true)
     |-- name: string (nullable = true)
     |-- desc: string (nullable = true)
     |-- price: decimal(10,0) (nullable = true)
     |-- url: string (nullable = true)

df1.count()
     1345

Here is my take on cleaning of such records, we normally encounter such situations:

a. Anomaly on the data where the file when created, was not looked if "," is the best delimiter on the columns.

Here is my solution on the case:

Solution a: In such cases, we would like to have the process identify as part of data cleansing if that record is a qualified records. The rest of the records if routed to a bad file/collection would give the opportunity to reconcile such records.

Below is the structure of my dataset (product_id,product_name,unit_price)

1,product-1,10
2,product-2,20
3,product,3,30

In the above case, product,3 is supposed to be read as product-3 which might have been a typo when the product was registered. In such as case, the below sample would work.

>>> tf=open("C:/users/ip2134/pyspark_practice/test_file.txt")
>>> trec=tf.read().splitlines()
>>> for rec in trec:
...   if rec.count(",") == 2:
...      trec_clean.append(rec)
...   else:
...      trec_bad.append(rec)
...
>>> trec_clean
['1,product-1,10', '2,product-2,20']
>>> trec_bad
['3,product,3,30']
>>> trec
['1,product-1,10', '2,product-2,20','3,product,3,30']

The other alternative of dealing with this problem would be trying to see if skipinitialspace=True would work to parse out the columns.

(Ref:Python parse CSV ignoring comma with double-quotes)