-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathGlueJob_Ecom-funnel-data-ETL.py
48 lines (28 loc) · 1.24 KB
/
GlueJob_Ecom-funnel-data-ETL.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
from pyspark.context import SparkContext,SparkConf
from pyspark.sql import SQLContext
from pyspark.sql import functions as f
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from awsglue.job import Job
import sys
# args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME'])
args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME','file_name'])
file_name = args['file_name']
conf = SparkConf()
conf.set("spark.sql.parquet.compression.codec","snappy")
conf.set("spark.sql.parquet.writeLegacyFormat","true")
output_dir_path = "s3://bucket-name/parquet_output"
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
input_file_path = "s3://bucket-name/user_behaviour/"+file_name
df = spark.read.option("header","true")\
.option("inferSchema","true")\
.option("quote","\"")\
.option("escape","\"").csv(input_file_path)
df = df.withColumn('event_timestamp',f.to_timestamp('event_timestamp',format='MM/dd/yyyy HH:mm'))
df= df.withColumn('year',f.year(f.col('event_timestamp')))\
.withColumn('month',f.month(f.col('event_timestamp')))
df.write.partitionBy(['year','month']).mode('append').format('parquet').save(output_dir_path)