Read data example:
df_read=spark.readStream.format("cloudFiles") \
.option("cloudFiles.format","csv") \
.option("cloudFiles.includingExistingFiles","true") \
.option("cloudFiles.backfillInterval","1 week") \
.option("badRecordsPath",<bad file path>) \
.schema(data_schema) \
.load(<data source>)
Write stream data:
df_read.writeStream.format("delta") \
.partionBy(<col_name>)
.outputMode("append") \
.option("checkpointLocation",<checkpoint path>) \
.trigger(once=True)
.start(<destination path>)
Monitor the process:
log file in destination path ( read as json file)
bad records in <bad file path>