AWS Data Analytics: Building Serverless Data Lakes with Amazon Athena and AWS Glue

Modern organizations generate massive amounts of data from various sources including applications, IoT devices, web analytics, and business systems. Managing and extracting insights from this data requires robust, scalable, and cost-effective analytics solutions. AWS provides a comprehensive serverless data analytics stack centered around Amazon S3 as a data lake, AWS Glue for ETL processing, and Amazon Athena for interactive queries, enabling organizations to build sophisticated analytics platforms without managing infrastructure.

Understanding Serverless Data Analytics Architecture

The serverless data analytics pattern on AWS eliminates the need to provision and manage servers for data processing and analytics workloads. This architecture leverages Amazon S3 as the foundational storage layer, providing virtually unlimited scalability and durability for structured and unstructured data. AWS Glue serves as the serverless ETL service, automatically discovering, cataloging, and transforming data, while Amazon Athena enables interactive SQL queries directly against data stored in S3.

This architecture pattern excels in scenarios requiring flexible data processing, ad-hoc analytics, cost optimization, and rapid time-to-insight. The pay-per-use model ensures you only pay for the resources consumed during actual data processing and query execution, making it ideal for variable workloads and exploratory analytics.

Core Components and Data Flow

AWS Glue operates as a fully managed ETL service that automatically discovers data schemas, suggests transformations, and generates ETL code. The Glue Data Catalog serves as a central metadata repository, maintaining schema information and table definitions that can be accessed by multiple analytics services. Glue Crawlers automatically scan data sources to infer schemas and populate the Data Catalog.

Amazon Athena provides serverless interactive query capabilities using standard SQL, enabling analysts and data scientists to query data without learning new tools or languages. Athena integrates seamlessly with the Glue Data Catalog, automatically understanding table structures and data locations. The service supports various data formats including Parquet, ORC, JSON, CSV, and Avro.

Amazon S3 forms the foundation of the data lake, organizing data using logical partitioning strategies that optimize query performance and cost. Proper partitioning enables Athena to scan only relevant data portions, significantly reducing query execution time and costs.

Comprehensive Implementation: E-commerce Analytics Platform

Let’s build a comprehensive e-commerce analytics platform that processes customer behavior data, sales transactions, and product information to generate actionable business insights and support data-driven decision-making.

Data Lake Infrastructure Setup

Here’s a comprehensive CloudFormation template that establishes the complete serverless analytics infrastructure:

AWSTemplateFormatVersion: '2010-09-09'
Description: 'Serverless Data Analytics Platform with Athena and Glue'

Parameters:
  CompanyName:
    Type: String
    Default: ecommerce
    Description: Company name for resource naming
  
  Environment:
    Type: String
    Default: prod
    AllowedValues: [dev, staging, prod]
    Description: Environment name

Resources:
  # S3 Buckets for Data Lake
  RawDataBucket:
    Type: AWS::S3::Bucket
    Properties:
      BucketName: !Sub '${CompanyName}-${Environment}-raw-data-${AWS::AccountId}'
      VersioningConfiguration:
        Status: Enabled
      BucketEncryption:
        ServerSideEncryptionConfiguration:
          - ServerSideEncryptionByDefault:
              SSEAlgorithm: AES256
      LifecycleConfiguration:
        Rules:
          - Id: TransitionToIA
            Status: Enabled
            Transitions:
              - StorageClass: STANDARD_IA
                TransitionInDays: 30
              - StorageClass: GLACIER
                TransitionInDays: 90
              - StorageClass: DEEP_ARCHIVE
                TransitionInDays: 365
      NotificationConfiguration:
        LambdaConfigurations:
          - Event: s3:ObjectCreated:*
            Function: !GetAtt DataIngestionTrigger.Arn
            Filter:
              S3Key:
                Rules:
                  - Name: prefix
                    Value: raw/
      PublicAccessBlockConfiguration:
        BlockPublicAcls: true
        BlockPublicPolicy: true
        IgnorePublicAcls: true
        RestrictPublicBuckets: true

  ProcessedDataBucket:
    Type: AWS::S3::Bucket
    Properties:
      BucketName: !Sub '${CompanyName}-${Environment}-processed-data-${AWS::AccountId}'
      VersioningConfiguration:
        Status: Enabled
      BucketEncryption:
        ServerSideEncryptionConfiguration:
          - ServerSideEncryptionByDefault:
              SSEAlgorithm: AES256
      PublicAccessBlockConfiguration:
        BlockPublicAcls: true
        BlockPublicPolicy: true
        IgnorePublicAcls: true
        RestrictPublicBuckets: true

  AthenaResultsBucket:
    Type: AWS::S3::Bucket
    Properties:
      BucketName: !Sub '${CompanyName}-${Environment}-athena-results-${AWS::AccountId}'
      LifecycleConfiguration:
        Rules:
          - Id: DeleteOldQueryResults
            Status: Enabled
            ExpirationInDays: 30
      PublicAccessBlockConfiguration:
        BlockPublicAcls: true
        BlockPublicPolicy: true
        IgnorePublicAcls: true
        RestrictPublicBuckets: true

  # AWS Glue Database
  GlueDatabase:
    Type: AWS::Glue::Database
    Properties:
      CatalogId: !Ref AWS::AccountId
      DatabaseInput:
        Name: !Sub '${CompanyName}_${Environment}_analytics'
        Description: 'Data catalog for e-commerce analytics platform'

  # Glue Service Role
  GlueServiceRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal:
              Service: glue.amazonaws.com
            Action: sts:AssumeRole
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole
      Policies:
        - PolicyName: S3Access
          PolicyDocument:
            Version: '2012-10-17'
            Statement:
              - Effect: Allow
                Action:
                  - s3:GetObject
                  - s3:PutObject
                  - s3:DeleteObject
                  - s3:ListBucket
                Resource:
                  - !Sub '${RawDataBucket}/*'
                  - !Sub '${ProcessedDataBucket}/*'
                  - !Ref RawDataBucket
                  - !Ref ProcessedDataBucket
              - Effect: Allow
                Action:
                  - logs:CreateLogGroup
                  - logs:CreateLogStream
                  - logs:PutLogEvents
                Resource: '*'

  # Glue Crawlers
  CustomerDataCrawler:
    Type: AWS::Glue::Crawler
    Properties:
      Name: !Sub '${CompanyName}-customer-data-crawler'
      Role: !GetAtt GlueServiceRole.Arn
      DatabaseName: !Ref GlueDatabase
      Targets:
        S3Targets:
          - Path: !Sub 's3://${RawDataBucket}/customers/'
      SchemaChangePolicy:
        UpdateBehavior: UPDATE_IN_DATABASE
        DeleteBehavior: LOG
      Configuration: |
        {
          "Version": 1.0,
          "CrawlerOutput": {
            "Partitions": {
              "AddOrUpdateBehavior": "InheritFromTable"
            }
          }
        }

  TransactionDataCrawler:
    Type: AWS::Glue::Crawler
    Properties:
      Name: !Sub '${CompanyName}-transaction-data-crawler'
      Role: !GetAtt GlueServiceRole.Arn
      DatabaseName: !Ref GlueDatabase
      Targets:
        S3Targets:
          - Path: !Sub 's3://${RawDataBucket}/transactions/'
      SchemaChangePolicy:
        UpdateBehavior: UPDATE_IN_DATABASE
        DeleteBehavior: LOG

  ProductDataCrawler:
    Type: AWS::Glue::Crawler
    Properties:
      Name: !Sub '${CompanyName}-product-data-crawler'
      Role: !GetAtt GlueServiceRole.Arn
      DatabaseName: !Ref GlueDatabase
      Targets:
        S3Targets:
          - Path: !Sub 's3://${RawDataBucket}/products/'
      SchemaChangePolicy:
        UpdateBehavior: UPDATE_IN_DATABASE
        DeleteBehavior: LOG

  # Glue ETL Jobs
  CustomerDataTransformJob:
    Type: AWS::Glue::Job
    Properties:
      Name: !Sub '${CompanyName}-customer-data-transform'
      Role: !GetAtt GlueServiceRole.Arn
      GlueVersion: '4.0'
      Command:
        Name: glueetl
        PythonVersion: '3'
        ScriptLocation: !Sub 's3://${ProcessedDataBucket}/glue-scripts/customer_transform.py'
      DefaultArguments:
        '--job-language': 'python'
        '--job-bookmark-option': 'job-bookmark-enable'
        '--TempDir': !Sub 's3://${ProcessedDataBucket}/temp/'
        '--enable-continuous-cloudwatch-log': 'true'
        '--enable-spark-ui': 'true'
        '--spark-event-logs-path': !Sub 's3://${ProcessedDataBucket}/spark-logs/'
        '--raw-bucket': !Ref RawDataBucket
        '--processed-bucket': !Ref ProcessedDataBucket
      MaxRetries: 2
      Timeout: 60
      NumberOfWorkers: 2
      WorkerType: G.1X

  SalesAggregationJob:
    Type: AWS::Glue::Job
    Properties:
      Name: !Sub '${CompanyName}-sales-aggregation'
      Role: !GetAtt GlueServiceRole.Arn
      GlueVersion: '4.0'
      Command:
        Name: glueetl
        PythonVersion: '3'
        ScriptLocation: !Sub 's3://${ProcessedDataBucket}/glue-scripts/sales_aggregation.py'
      DefaultArguments:
        '--job-language': 'python'
        '--job-bookmark-option': 'job-bookmark-enable'
        '--TempDir': !Sub 's3://${ProcessedDataBucket}/temp/'
        '--enable-continuous-cloudwatch-log': 'true'
        '--raw-bucket': !Ref RawDataBucket
        '--processed-bucket': !Ref ProcessedDataBucket
        '--database-name': !Ref GlueDatabase
      MaxRetries: 2
      Timeout: 120
      NumberOfWorkers: 5
      WorkerType: G.1X

  # Lambda Function for Data Ingestion Trigger
  DataIngestionTrigger:
    Type: AWS::Lambda::Function
    Properties:
      FunctionName: !Sub '${CompanyName}-data-ingestion-trigger'
      Runtime: python3.9
      Handler: index.lambda_handler
      Role: !GetAtt LambdaExecutionRole.Arn
      Environment:
        Variables:
          GLUE_DATABASE: !Ref GlueDatabase
          CUSTOMER_CRAWLER: !Ref CustomerDataCrawler
          TRANSACTION_CRAWLER: !Ref TransactionDataCrawler
          PRODUCT_CRAWLER: !Ref ProductDataCrawler
      Code:
        ZipFile: |
          import json
          import boto3
          import os
          import urllib.parse
          
          glue_client = boto3.client('glue')
          
          def lambda_handler(event, context):
              try:
                  for record in event['Records']:
                      bucket = record['s3']['bucket']['name']
                      key = urllib.parse.unquote_plus(record['s3']['object']['key'])
                      
                      print(f"Processing file: s3://{bucket}/{key}")
                      
                      # Determine which crawler to run based on file path
                      if key.startswith('raw/customers/'):
                          crawler_name = os.environ['CUSTOMER_CRAWLER']
                      elif key.startswith('raw/transactions/'):
                          crawler_name = os.environ['TRANSACTION_CRAWLER']
                      elif key.startswith('raw/products/'):
                          crawler_name = os.environ['PRODUCT_CRAWLER']
                      else:
                          print(f"No crawler configured for path: {key}")
                          continue
                      
                      # Start the appropriate crawler
                      try:
                          response = glue_client.start_crawler(Name=crawler_name)
                          print(f"Started crawler {crawler_name}: {response}")
                      except glue_client.exceptions.CrawlerRunningException:
                          print(f"Crawler {crawler_name} is already running")
                      except Exception as e:
                          print(f"Error starting crawler {crawler_name}: {str(e)}")
                  
                  return {
                      'statusCode': 200,
                      'body': json.dumps('Processing completed successfully')
                  }
                  
              except Exception as e:
                  print(f"Error processing event: {str(e)}")
                  return {
                      'statusCode': 500,
                      'body': json.dumps(f'Error: {str(e)}')
                  }

  # Lambda permission for S3 to invoke the function
  LambdaInvokePermission:
    Type: AWS::Lambda::Permission
    Properties:
      FunctionName: !Ref DataIngestionTrigger
      Action: lambda:InvokeFunction
      Principal: s3.amazonaws.com
      SourceArn: !GetAtt RawDataBucket.Arn

  # Lambda Execution Role
  LambdaExecutionRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal:
              Service: lambda.amazonaws.com
            Action: sts:AssumeRole
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
      Policies:
        - PolicyName: GlueAccess
          PolicyDocument:
            Version: '2012-10-17'
            Statement:
              - Effect: Allow
                Action:
                  - glue:StartCrawler
                  - glue:GetCrawler
                  - glue:GetCrawlerMetrics
                Resource: '*'

  # Athena Workgroup
  AthenaWorkgroup:
    Type: AWS::Athena::WorkGroup
    Properties:
      Name: !Sub '${CompanyName}-analytics-workgroup'
      Description: 'Workgroup for e-commerce analytics queries'
      State: ENABLED
      WorkGroupConfiguration:
        EnforceWorkGroupConfiguration: true
        PublishCloudWatchMetrics: true
        ResultConfiguration:
          OutputLocation: !Sub 's3://${AthenaResultsBucket}/'
          EncryptionConfiguration:
            EncryptionOption: SSE_S3
        EngineVersion:
          SelectedEngineVersion: 'Athena engine version 3'

  # IAM Role for Athena
  AthenaExecutionRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal:
              Service: athena.amazonaws.com
            Action: sts:AssumeRole
      Policies:
        - PolicyName: AthenaS3Access
          PolicyDocument:
            Version: '2012-10-17'
            Statement:
              - Effect: Allow
                Action:
                  - s3:GetObject
                  - s3:ListBucket
                Resource:
                  - !Sub '${RawDataBucket}/*'
                  - !Sub '${ProcessedDataBucket}/*'
                  - !Ref RawDataBucket
                  - !Ref ProcessedDataBucket
              - Effect: Allow
                Action:
                  - s3:PutObject
                  - s3:GetObject
                  - s3:DeleteObject
                Resource:
                  - !Sub '${AthenaResultsBucket}/*'
              - Effect: Allow
                Action:
                  - glue:GetDatabase
                  - glue:GetTable
                  - glue:GetTables
                  - glue:GetPartition
                  - glue:GetPartitions
                Resource: '*'

  # CloudWatch Log Groups
  GlueJobLogGroup:
    Type: AWS::Logs::LogGroup
    Properties:
      LogGroupName: !Sub '/aws/glue/${CompanyName}-etl-jobs'
      RetentionInDays: 30

  LambdaLogGroup:
    Type: AWS::Logs::LogGroup
    Properties:
      LogGroupName: !Sub '/aws/lambda/${CompanyName}-data-ingestion-trigger'
      RetentionInDays: 14

  # Sample Data Generation Lambda (for testing)
  SampleDataGenerator:
    Type: AWS::Lambda::Function
    Properties:
      FunctionName: !Sub '${CompanyName}-sample-data-generator'
      Runtime: python3.9
      Handler: index.lambda_handler
      Role: !GetAtt SampleDataRole.Arn
      Timeout: 300
      Environment:
        Variables:
          RAW_BUCKET: !Ref RawDataBucket
      Code:
        ZipFile: |
          import json
          import boto3
          import csv
          import random
          import datetime
          from io import StringIO
          import os
          
          s3_client = boto3.client('s3')
          
          def lambda_handler(event, context):
              bucket = os.environ['RAW_BUCKET']
              
              # Generate sample customer data
              customer_data = generate_customer_data()
              upload_csv_to_s3(customer_data, bucket, 'raw/customers/customers.csv')
              
              # Generate sample transaction data
              transaction_data = generate_transaction_data()
              upload_csv_to_s3(transaction_data, bucket, 'raw/transactions/transactions.csv')
              
              # Generate sample product data
              product_data = generate_product_data()
              upload_csv_to_s3(product_data, bucket, 'raw/products/products.csv')
              
              return {
                  'statusCode': 200,
                  'body': json.dumps('Sample data generated successfully')
              }
          
          def generate_customer_data():
              customers = []
              for i in range(1000):
                  customers.append({
                      'customer_id': f'CUST_{i:05d}',
                      'first_name': random.choice(['John', 'Jane', 'Bob', 'Alice', 'Charlie', 'Diana']),
                      'last_name': random.choice(['Smith', 'Johnson', 'Brown', 'Davis', 'Wilson', 'Taylor']),
                      'email': f'customer{i}@example.com',
                      'age': random.randint(18, 80),
                      'city': random.choice(['New York', 'Los Angeles', 'Chicago', 'Houston', 'Phoenix']),
                      'state': random.choice(['NY', 'CA', 'IL', 'TX', 'AZ']),
                      'registration_date': (datetime.datetime.now() - datetime.timedelta(days=random.randint(1, 365))).strftime('%Y-%m-%d')
                  })
              return customers
          
          def generate_transaction_data():
              transactions = []
              for i in range(5000):
                  transactions.append({
                      'transaction_id': f'TXN_{i:06d}',
                      'customer_id': f'CUST_{random.randint(0, 999):05d}',
                      'product_id': f'PROD_{random.randint(0, 99):03d}',
                      'quantity': random.randint(1, 5),
                      'price': round(random.uniform(10.0, 500.0), 2),
                      'transaction_date': (datetime.datetime.now() - datetime.timedelta(days=random.randint(1, 90))).strftime('%Y-%m-%d'),
                      'payment_method': random.choice(['credit_card', 'debit_card', 'paypal', 'apple_pay'])
                  })
              return transactions
          
          def generate_product_data():
              products = []
              categories = ['Electronics', 'Clothing', 'Books', 'Home & Garden', 'Sports']
              for i in range(100):
                  products.append({
                      'product_id': f'PROD_{i:03d}',
                      'product_name': f'Product {i}',
                      'category': random.choice(categories),
                      'brand': random.choice(['BrandA', 'BrandB', 'BrandC', 'BrandD']),
                      'cost': round(random.uniform(5.0, 200.0), 2),
                      'retail_price': round(random.uniform(10.0, 500.0), 2),
                      'stock_quantity': random.randint(0, 1000)
                  })
              return products
          
          def upload_csv_to_s3(data, bucket, key):
              csv_buffer = StringIO()
              if data:
                  writer = csv.DictWriter(csv_buffer, fieldnames=data[0].keys())
                  writer.writeheader()
                  writer.writerows(data)
                  
                  s3_client.put_object(
                      Bucket=bucket,
                      Key=key,
                      Body=csv_buffer.getvalue(),
                      ContentType='text/csv'
                  )
                  print(f"Uploaded {len(data)} records to s3://{bucket}/{key}")

  SampleDataRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal:
              Service: lambda.amazonaws.com
            Action: sts:AssumeRole
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
      Policies:
        - PolicyName: S3WriteAccess
          PolicyDocument:
            Version: '2012-10-17'
            Statement:
              - Effect: Allow
                Action:
                  - s3:PutObject
                  - s3:PutObjectAcl
                Resource:
                  - !Sub '${RawDataBucket}/*'

Outputs:
  RawDataBucketName:
    Description: Name of the raw data S3 bucket
    Value: !Ref RawDataBucket
    Export:
      Name: !Sub '${CompanyName}-${Environment}-raw-bucket'
  
  ProcessedDataBucketName:
    Description: Name of the processed data S3 bucket
    Value: !Ref ProcessedDataBucket
    Export:
      Name: !Sub '${CompanyName}-${Environment}-processed-bucket'
  
  GlueDatabaseName:
    Description: Name of the Glue database
    Value: !Ref GlueDatabase
    Export:
      Name: !Sub '${CompanyName}-${Environment}-glue-database'
  
  AthenaWorkgroupName:
    Description: Name of the Athena workgroup
    Value: !Ref AthenaWorkgroup
    Export:
      Name: !Sub '${CompanyName}-${Environment}-athena-workgroup'

  SampleDataGeneratorArn:
    Description: ARN of the sample data generator function
    Value: !GetAtt SampleDataGenerator.Arn
    Export:
      Name: !Sub '${CompanyName}-${Environment}-sample-data-generator'

Advanced ETL Processing with AWS Glue

Here are the Glue ETL scripts for processing and transforming the e-commerce data:

Customer Data Transformation Script

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql import functions as F
from pyspark.sql.types import *

# Initialize Glue context
args = getResolvedOptions(sys.argv, ['JOB_NAME', 'raw-bucket', 'processed-bucket'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# Read customer data from S3
raw_bucket = args['raw_bucket']
processed_bucket = args['processed_bucket']

# Create dynamic frame from S3
customer_dyf = glueContext.create_dynamic_frame.from_options(
    format_options={"multiline": False},
    connection_type="s3",
    format="csv",
    connection_options={
        "paths": [f"s3://{raw_bucket}/raw/customers/"],
        "recurse": True
    },
    transformation_ctx="customer_dyf"
)

# Convert to DataFrame for complex transformations
customer_df = customer_dyf.toDF()

# Data quality checks and transformations
customer_transformed = customer_df \
    .filter(F.col("customer_id").isNotNull()) \
    .filter(F.col("email").contains("@")) \
    .withColumn("full_name", F.concat_ws(" ", F.col("first_name"), F.col("last_name"))) \
    .withColumn("age_group", 
        F.when(F.col("age") < 25, "18-24")
         .when(F.col("age") < 35, "25-34")
         .when(F.col("age") < 45, "35-44")
         .when(F.col("age") < 55, "45-54")
         .when(F.col("age") < 65, "55-64")
         .otherwise("65+")) \
    .withColumn("registration_year", F.year(F.col("registration_date"))) \
    .withColumn("email_domain", F.split(F.col("email"), "@").getItem(1))

# Add data quality metrics
total_records = customer_df.count()
valid_records = customer_transformed.count()
print(f"Customer data quality: {valid_records}/{total_records} records passed validation")

# Convert back to DynamicFrame
customer_transformed_dyf = DynamicFrame.fromDF(customer_transformed, glueContext, "customer_transformed")

# Write to S3 in Parquet format with partitioning
glueContext.write_dynamic_frame.from_options(
    frame=customer_transformed_dyf,
    connection_type="s3",
    format="glueparquet",
    connection_options={
        "path": f"s3://{processed_bucket}/processed/customers/",
        "partitionKeys": ["state", "age_group"]
    },
    format_options={"compression": "snappy"},
    transformation_ctx="customer_write"
)

job.commit()

Sales Aggregation Script

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from datetime import datetime, timedelta

# Initialize
args = getResolvedOptions(sys.argv, ['JOB_NAME', 'raw-bucket', 'processed-bucket', 'database-name'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# Read data from Glue Data Catalog
database_name = args['database_name']

# Read transactions
transactions_dyf = glueContext.create_dynamic_frame.from_catalog(
    database=database_name,
    table_name="transactions",
    transformation_ctx="transactions_dyf"
)

# Read products
products_dyf = glueContext.create_dynamic_frame.from_catalog(
    database=database_name,
    table_name="products",
    transformation_ctx="products_dyf"
)

# Read customers
customers_dyf = glueContext.create_dynamic_frame.from_catalog(
    database=database_name,
    table_name="customers",
    transformation_ctx="customers_dyf"
)

# Convert to DataFrames
transactions_df = transactions_dyf.toDF()
products_df = products_dyf.toDF()
customers_df = customers_dyf.toDF()

# Data transformations and enrichment
# Calculate total amount for each transaction
transactions_enriched = transactions_df \
    .withColumn("total_amount", F.col("quantity") * F.col("price")) \
    .withColumn("transaction_date_parsed", F.to_date(F.col("transaction_date"))) \
    .withColumn("transaction_year", F.year(F.col("transaction_date_parsed"))) \
    .withColumn("transaction_month", F.month(F.col("transaction_date_parsed"))) \
    .withColumn("transaction_

regards
Osama

Leave a comment

This site uses Akismet to reduce spam. Learn how your comment data is processed.