July 28, 2023     5 min read

Vacuuming Amazon Athena Iceberg with AWS Step Functions

Vacuuming Amazon Athena Iceberg with AWS Step Functions

In this post, we will look at how to resolve ICEBERG_VACUUM_MORE_RUNS_NEEDED errors by using AWS Step Functions to retry vacuuming Amazon Athena Iceberg tables until they are clean.

The Problem

Right now, running the VAACUUM command on a heavily updated Amazon Athena Icerberg table will result in the following error:

ICEBERG_VACUUM_MORE_RUNS_NEEDED: Removed 1000 files in this round of vacuum, but there are more files remaining. Please run another VACUUM command to process the remaining files

The rationale for this behaviour is still a bit unclear, and it is not clear if this will be fixed in the future (if this unanswered repost.aws question is anything to go by).

The Solution

A solution I have come across in the wild is to simply run the vacuum command again until it succeeds. This is a bit of a hacky solution, but it works. I came across this solution from Matano and my team has implemented a similar solution in our own codebase.

Vacuuming Iceberg with AWS Step Functions
Vacuuming Iceberg with AWS Step Functions

The following is a CloudFormation template that will create a Step Function state machine that will run the vacuum command until it succeeds. The state machine will run every hour.

AWSTemplateFormatVersion: '2010-09-09'
Description: A CloudFormation template to create IAM role, policy, Step Functions, and CloudWatch event rule and target for table ingestion maintenance vacuum

Parameters:
  WorkgroupName:
    Type: String
    Default: primary
    Description: The name of the Athena Workgroup

  DatabaseName:
    Type: String
    Default: default
    Description: The name of the database
  TableName:
    Type: String
    Description: The name of the Iceberg table

  BucketName:
    Type: String
    Description: The name of the S3 bucket used for your Iceberg table
  BucketPrefix:
    Type: String
    Default: ""
    Description: The prefix for the S3 bucket used for your Iceberg table
  AthenaOutputBucketName:
    Type: String
    Description: The name of the S3 bucket used for Athena query outputs
  AthenaOutputBucketPrefix:
    Type: String
    Default: ""
    Description: The prefix for the S3 bucket used for Athena query outputs

  VacuumFrequency:
    Type: String
    Default: rate(1 hour)
    Description: The frequency for the Iceberg vacuum

Conditions:
  IsBucketPrefixSpecified: !Not [!Equals [!Ref BucketPrefix, ""]]
  IsAthenaOutputBucketPrefixSpecified: !Not [!Equals [!Ref AthenaOutputBucketPrefix, ""]]

Resources:
  StepFunctionRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal:
              Service: 'states.amazonaws.com'
            Action: 'sts:AssumeRole'
            Condition:
              ArnLike:
                aws:SourceArn: !Sub 'arn:aws:states:${AWS::Region}:${AWS::AccountId}:stateMachine:*'
              StringEquals:
                aws:SourceAccount: !Ref AWS::AccountId
          - Effect: Allow
            Principal:
              Service: 'events.amazonaws.com'
            Action: 'sts:AssumeRole'
            Condition:
              ArnLike:
                aws:SourceArn: !Sub 'arn:aws:events:${AWS::Region}:${AWS::AccountId}:rule/MaintenanceVacuumEventRule'

  StepFunctionRolePolicy:
    Type: AWS::IAM::Policy
    Properties:
      Roles: [!Ref StepFunctionRole]
      PolicyName: StepFunctionRolePolicy
      PolicyDocument: 
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Action:
              - xray:PutTraceSegments
              - xray:PutTelemetryRecords
              - xray:GetSamplingRules
              - xray:GetSamplingTargets
            Resource: '*'
          - Effect: Allow
            Action: states:StartExecution
            Resource: 
              - !Ref MaintenanceVacuumStateMachine
          - Effect: Allow
            Action:
              - athena:startQueryExecution
              - athena:stopQueryExecution
              - athena:getQueryExecution
              - athena:getDataCatalog
            Resource: 
              - !Sub 'arn:aws:athena:${AWS::Region}:${AWS::AccountId}:workgroup/${WorkgroupName}'
              - !Sub 'arn:aws:athena:${AWS::Region}:${AWS::AccountId}:datacatalog/*'
          - !If
            - IsBucketPrefixSpecified
            - Effect: Allow
              Action:
                - s3:GetObject
                - s3:PutObject
                - s3:DeleteObject
                - s3:ListBucket
              Resource:
                - !Sub 'arn:aws:s3:::${BucketName}'
                - !Sub 'arn:aws:s3:::${BucketName}/${BucketPrefix}/*'
            - Effect: Allow
              Action:
                - s3:GetObject
                - s3:PutObject
                - s3:DeleteObject
                - s3:ListBucket
              Resource:
                - !Sub 'arn:aws:s3:::${BucketName}'
                - !Sub 'arn:aws:s3:::${BucketName}/*'
          - !If
            - IsAthenaOutputBucketPrefixSpecified
            - Effect: Allow
              Action:
                - s3:GetBucketLocation
                - s3:GetObject
                - s3:ListBucket
                - s3:ListBucketMultipartUploads
                - s3:ListMultipartUploadParts
                - s3:AbortMultipartUpload
                - s3:CreateBucket
                - s3:PutObject
              Resource:
                - !Sub 'arn:aws:s3:::${AthenaOutputBucketName}'
                - !Sub 'arn:aws:s3:::${AthenaOutputBucketName}/${AthenaOutputBucketPrefix}/*'
            - Effect: Allow
              Action:
                - s3:GetBucketLocation
                - s3:GetObject
                - s3:ListBucket
                - s3:ListBucketMultipartUploads
                - s3:ListMultipartUploadParts
                - s3:AbortMultipartUpload
                - s3:CreateBucket
                - s3:PutObject
              Resource:
                - !Sub 'arn:aws:s3:::${AthenaOutputBucketName}'
                - !Sub 'arn:aws:s3:::${AthenaOutputBucketName}/*'
          - Effect: Allow
            Action:
              - glue:GetDatabase*
              - glue:UpdateTable
              - glue:GetTable
              - glue:GetTables
              - glue:BatchDeleteTable
              - glue:GetPartition*
              - glue:BatchGetPartition
            Resource:
              - !Sub 'arn:aws:glue:${AWS::Region}:${AWS::AccountId}:catalog'
              - !Sub 'arn:aws:glue:${AWS::Region}:${AWS::AccountId}:database/${DatabaseName}'
              - !Sub 'arn:aws:glue:${AWS::Region}:${AWS::AccountId}:table/${DatabaseName}/${TableName}'

  MaintenanceVacuumStateMachine:
    Type: AWS::StepFunctions::StateMachine
    Properties:
      RoleArn: !GetAtt StepFunctionRole.Arn
      TracingConfiguration:
        Enabled: true
      DefinitionString: !Sub |
        {
          "StartAt": "StartState",
          "States": {
            "StartState": {
              "Type": "Task",
              "Resource": "arn:aws:states:::athena:startQueryExecution.sync",
              "Parameters": {
                "QueryString": "VACUUM ${DatabaseName}.${TableName}",
                "WorkGroup": "${WorkgroupName}",
                "ResultConfiguration": {
                  "OutputLocation": "s3://${AthenaOutputBucketName}/${AthenaOutputBucketPrefix}"
                }
              },
              "Catch": [
                {
                  "ErrorEquals": ["States.TaskFailed"],
                  "Next": "Parse Error JSON"
                }
              ],
              "End": true
            },
            "Parse Error JSON": {
              "Type": "Pass",
              "Parameters": {
                "Cause.$": "States.StringToJson($.Cause)"
              },
              "Next": "Check If Error Needs More Vacuums"
            },
            "Check If Error Needs More Vacuums": {
              "Type": "Choice",
              "Choices": [
                {
                  "Variable": "$.Cause.QueryExecution.Status.StateChangeReason",
                  "StringMatches": "ICEBERG_VACUUM_MORE_RUNS_NEEDED*",
                  "Next": "StartState"
                }
              ],
              "Default": "Fail"
            },
            "Fail": {
              "Type": "Fail",
              "Cause": "Query failed."
            }
          }
        }

  MaintenanceVacuumEventRule:
    Type: AWS::Events::Rule
    Properties:
      Name: MaintenanceVacuumEventRule
      ScheduleExpression: !Ref VacuumFrequency
      State: 'ENABLED'
      Targets:
        - Arn: !Ref MaintenanceVacuumStateMachine
          RoleArn: !GetAtt StepFunctionRole.Arn
          Id: maintenance-vacuum

To deploy this template, you can use the AWS CLI:

Note: This does not override some default parameters, such as DatabaseName, WorkgroupName and things like bucket prefixes. You can override these parameters by adding --parameter-overrides to the command below.

aws cloudformation deploy \
  --stack-name maintenance-vacuum \
  --template-file template.yaml \
  --capabilities CAPABILITY_NAMED_IAM \
  --parameter-overrides \
      TableName=my_iceberg_table \
      BucketName=my-bucket
      AthenaOutputBucketName=my-bucket

Summary

In this post, I showed you how to use Amazon Athena to run maintenance vacuum commands on your Iceberg tables. I also showed you how to automate this process using AWS Step Functions and Amazon EventBridge.

Hopefully in the future, a more managed solution will be available for Iceberg tables by AWS. Until then, this solution should help you keep your Iceberg tables clean and optimized.

devopstar

DevOpStar by Nathan Glover | 2024