Google Cloud Transfer Service Operators


GcpTransferServiceJobCreateOperator

Create a transfer job.

The function accepts dates in two formats:

  • consistent with Google API

    { "year": 2019, "month": 2, "day": 11 }
    
  • as an datetime object

The function accepts time in two formats:

  • consistent with Google API

    { "hours": 12, "minutes": 30, "seconds": 0 }
    
  • as an time object

If you want to create a job transfer that copies data from AWS S3 then you must have a connection configured. Information about configuration for AWS is available: Amazon Web Services Connection The selected connection for AWS can be indicated by the parameter aws_conn_id.

Arguments

Some arguments in the example DAG are taken from the OS environment variables:

airflow/contrib/example_dags/example_gcp_transfer.pyVIEW SOURCE

GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
GCP_DESCRIPTION = os.environ.get('GCP_DESCRIPTION', 'description')
GCP_TRANSFER_TARGET_BUCKET = os.environ.get('GCP_TRANSFER_TARGET_BUCKET')
WAIT_FOR_OPERATION_POKE_INTERVAL = int(os.environ.get('WAIT_FOR_OPERATION_POKE_INTERVAL', 5))

GCP_TRANSFER_SOURCE_AWS_BUCKET = os.environ.get('GCP_TRANSFER_SOURCE_AWS_BUCKET')
GCP_TRANSFER_FIRST_TARGET_BUCKET = os.environ.get(
    'GCP_TRANSFER_FIRST_TARGET_BUCKET', 'gcp-transfer-first-target'
)
GCP_TRANSFER_SECOND_TARGET_BUCKET = os.environ.get(
    'GCP_TRANSFER_SECOND_TARGET_BUCKET', 'gcp-transfer-second-target'
)

Using the operator

airflow/contrib/example_dags/example_gcp_transfer.pyVIEW SOURCE

gcs_to_gcs_transfer_body = {
    DESCRIPTION: GCP_DESCRIPTION,
    STATUS: GcpTransferJobsStatus.ENABLED,
    PROJECT_ID: GCP_PROJECT_ID,
    SCHEDULE: {
        SCHEDULE_START_DATE: datetime(2015, 1, 1).date(),
        SCHEDULE_END_DATE: datetime(2030, 1, 1).date(),
        START_TIME_OF_DAY: (datetime.utcnow() + timedelta(minutes=2)).time(),
    },
    TRANSFER_SPEC: {
        GCS_DATA_SOURCE: {BUCKET_NAME: GCP_TRANSFER_FIRST_TARGET_BUCKET},
        GCS_DATA_SINK: {BUCKET_NAME: GCP_TRANSFER_SECOND_TARGET_BUCKET},
        TRANSFER_OPTIONS: {ALREADY_EXISTING_IN_SINK: True},
    },
}  # type: Dict[str, Any]

airflow/contrib/example_dags/example_gcp_transfer.pyVIEW SOURCE

aws_to_gcs_transfer_body = {
    DESCRIPTION: GCP_DESCRIPTION,
    STATUS: GcpTransferJobsStatus.ENABLED,
    PROJECT_ID: GCP_PROJECT_ID,
    SCHEDULE: {
        SCHEDULE_START_DATE: datetime(2015, 1, 1).date(),
        SCHEDULE_END_DATE: datetime(2030, 1, 1).date(),
        START_TIME_OF_DAY: (datetime.utcnow() + timedelta(minutes=2)).time(),
    },
    TRANSFER_SPEC: {
        AWS_S3_DATA_SOURCE: {BUCKET_NAME: GCP_TRANSFER_SOURCE_AWS_BUCKET},
        GCS_DATA_SINK: {BUCKET_NAME: GCP_TRANSFER_FIRST_TARGET_BUCKET},
        TRANSFER_OPTIONS: {ALREADY_EXISTING_IN_SINK: True},
    },
}

airflow/contrib/example_dags/example_gcp_transfer.pyVIEW SOURCE

create_transfer_job_from_aws = GcpTransferServiceJobCreateOperator(
    task_id="create_transfer_job_from_aws", body=aws_to_gcs_transfer_body
)

Templating

template_fields = ('body', 'gcp_conn_id', 'aws_conn_id')

More information

See Google Cloud Transfer Service - Method: transferJobs.create.

GcpTransferServiceJobDeleteOperator

Deletes a transfer job.

Arguments

Some arguments in the example DAG are taken from the OS environment variables:

airflow/contrib/example_dags/example_gcp_transfer.pyVIEW SOURCE

GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
GCP_DESCRIPTION = os.environ.get('GCP_DESCRIPTION', 'description')
GCP_TRANSFER_TARGET_BUCKET = os.environ.get('GCP_TRANSFER_TARGET_BUCKET')
WAIT_FOR_OPERATION_POKE_INTERVAL = int(os.environ.get('WAIT_FOR_OPERATION_POKE_INTERVAL', 5))

GCP_TRANSFER_SOURCE_AWS_BUCKET = os.environ.get('GCP_TRANSFER_SOURCE_AWS_BUCKET')
GCP_TRANSFER_FIRST_TARGET_BUCKET = os.environ.get(
    'GCP_TRANSFER_FIRST_TARGET_BUCKET', 'gcp-transfer-first-target'
)
GCP_TRANSFER_SECOND_TARGET_BUCKET = os.environ.get(
    'GCP_TRANSFER_SECOND_TARGET_BUCKET', 'gcp-transfer-second-target'
)

Using the operator

airflow/contrib/example_dags/example_gcp_transfer.pyVIEW SOURCE

delete_transfer_from_aws_job = GcpTransferServiceJobDeleteOperator(
    task_id="delete_transfer_from_aws_job",
    job_name="{{task_instance.xcom_pull('create_transfer_job_from_aws')['name']}}",
    project_id=GCP_PROJECT_ID,
)

Templating

template_fields = ('job_name', 'project_id', 'gcp_conn_id', 'api_version')

More information

See Google Cloud Transfer Service - REST Resource: transferJobs - Status

GcpTransferServiceJobUpdateOperator

Updates a transfer job.

Arguments

Some arguments in the example DAG are taken from the OS environment variables:

airflow/contrib/example_dags/example_gcp_transfer.pyVIEW SOURCE

GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
GCP_DESCRIPTION = os.environ.get('GCP_DESCRIPTION', 'description')
GCP_TRANSFER_TARGET_BUCKET = os.environ.get('GCP_TRANSFER_TARGET_BUCKET')
WAIT_FOR_OPERATION_POKE_INTERVAL = int(os.environ.get('WAIT_FOR_OPERATION_POKE_INTERVAL', 5))

GCP_TRANSFER_SOURCE_AWS_BUCKET = os.environ.get('GCP_TRANSFER_SOURCE_AWS_BUCKET')
GCP_TRANSFER_FIRST_TARGET_BUCKET = os.environ.get(
    'GCP_TRANSFER_FIRST_TARGET_BUCKET', 'gcp-transfer-first-target'
)
GCP_TRANSFER_SECOND_TARGET_BUCKET = os.environ.get(
    'GCP_TRANSFER_SECOND_TARGET_BUCKET', 'gcp-transfer-second-target'
)

Using the operator

airflow/contrib/example_dags/example_gcp_transfer.pyVIEW SOURCE

update_body = {
    PROJECT_ID: GCP_PROJECT_ID,
    TRANSFER_JOB: {DESCRIPTION: "{}_updated".format(GCP_DESCRIPTION)},
    TRANSFER_JOB_FIELD_MASK: "description",
}

airflow/contrib/example_dags/example_gcp_transfer.pyVIEW SOURCE

update_job = GcpTransferServiceJobUpdateOperator(
    task_id="update_job",
    job_name="{{task_instance.xcom_pull('create_transfer_job_from_aws')['name']}}",
    body=update_body,
)

Templating

template_fields = ('job_name', 'body', 'gcp_conn_id', 'aws_conn_id')

More information

See Google Cloud Transfer Service - Method: transferJobs.patch

GcpTransferServiceOperationCancelOperator

Gets a transfer operation. The result is returned to XCOM.

Arguments

Some arguments in the example DAG are taken from the OS environment variables:

airflow/contrib/example_dags/example_gcp_transfer.pyVIEW SOURCE

GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
GCP_DESCRIPTION = os.environ.get('GCP_DESCRIPTION', 'description')
GCP_TRANSFER_TARGET_BUCKET = os.environ.get('GCP_TRANSFER_TARGET_BUCKET')
WAIT_FOR_OPERATION_POKE_INTERVAL = int(os.environ.get('WAIT_FOR_OPERATION_POKE_INTERVAL', 5))

GCP_TRANSFER_SOURCE_AWS_BUCKET = os.environ.get('GCP_TRANSFER_SOURCE_AWS_BUCKET')
GCP_TRANSFER_FIRST_TARGET_BUCKET = os.environ.get(
    'GCP_TRANSFER_FIRST_TARGET_BUCKET', 'gcp-transfer-first-target'
)
GCP_TRANSFER_SECOND_TARGET_BUCKET = os.environ.get(
    'GCP_TRANSFER_SECOND_TARGET_BUCKET', 'gcp-transfer-second-target'
)

Using the operator

airflow/contrib/example_dags/example_gcp_transfer.pyVIEW SOURCE

cancel_operation = GcpTransferServiceOperationCancelOperator(
    task_id="cancel_operation",
    operation_name="{{task_instance.xcom_pull("
    "'wait_for_second_operation_to_start', key='sensed_operations')[0]['name']}}",
)

Templating

template_fields = ('operation_name', 'gcp_conn_id', 'api_version')

More information

See Google Cloud Transfer Service - Method: transferOperations.cancel

GcpTransferServiceOperationGetOperator

Gets a transfer operation. The result is returned to XCOM.

Arguments

Some arguments in the example DAG are taken from the OS environment variables:

airflow/contrib/example_dags/example_gcp_transfer.pyVIEW SOURCE

GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
GCP_DESCRIPTION = os.environ.get('GCP_DESCRIPTION', 'description')
GCP_TRANSFER_TARGET_BUCKET = os.environ.get('GCP_TRANSFER_TARGET_BUCKET')
WAIT_FOR_OPERATION_POKE_INTERVAL = int(os.environ.get('WAIT_FOR_OPERATION_POKE_INTERVAL', 5))

GCP_TRANSFER_SOURCE_AWS_BUCKET = os.environ.get('GCP_TRANSFER_SOURCE_AWS_BUCKET')
GCP_TRANSFER_FIRST_TARGET_BUCKET = os.environ.get(
    'GCP_TRANSFER_FIRST_TARGET_BUCKET', 'gcp-transfer-first-target'
)
GCP_TRANSFER_SECOND_TARGET_BUCKET = os.environ.get(
    'GCP_TRANSFER_SECOND_TARGET_BUCKET', 'gcp-transfer-second-target'
)

Using the operator

airflow/contrib/example_dags/example_gcp_transfer.pyVIEW SOURCE

get_operation = GcpTransferServiceOperationGetOperator(
    task_id="get_operation", operation_name="{{task_instance.xcom_pull('list_operations')[0]['name']}}"
)

Templating

template_fields = ('operation_name', 'gcp_conn_id')

More information

See Google Cloud Transfer Service - Method: transferOperations.get

GcpTransferServiceOperationsListOperator

List a transfer operations. The result is returned to XCOM.

Arguments

Some arguments in the example DAG are taken from the OS environment variables:

airflow/contrib/example_dags/example_gcp_transfer.pyVIEW SOURCE

GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
GCP_DESCRIPTION = os.environ.get('GCP_DESCRIPTION', 'description')
GCP_TRANSFER_TARGET_BUCKET = os.environ.get('GCP_TRANSFER_TARGET_BUCKET')
WAIT_FOR_OPERATION_POKE_INTERVAL = int(os.environ.get('WAIT_FOR_OPERATION_POKE_INTERVAL', 5))

GCP_TRANSFER_SOURCE_AWS_BUCKET = os.environ.get('GCP_TRANSFER_SOURCE_AWS_BUCKET')
GCP_TRANSFER_FIRST_TARGET_BUCKET = os.environ.get(
    'GCP_TRANSFER_FIRST_TARGET_BUCKET', 'gcp-transfer-first-target'
)
GCP_TRANSFER_SECOND_TARGET_BUCKET = os.environ.get(
    'GCP_TRANSFER_SECOND_TARGET_BUCKET', 'gcp-transfer-second-target'
)

Using the operator

airflow/contrib/example_dags/example_gcp_transfer.pyVIEW SOURCE

list_operations = GcpTransferServiceOperationsListOperator(
    task_id="list_operations",
    filter={
        FILTER_PROJECT_ID: GCP_PROJECT_ID,
        FILTER_JOB_NAMES: ["{{task_instance.xcom_pull('create_transfer_job_from_aws')['name']}}"],
    },
)

Templating

template_fields = ('filter', 'gcp_conn_id')

More information

See Google Cloud Transfer Service - Method: transferOperations.list

GcpTransferServiceOperationPauseOperator

Pauses a transfer operations.

Arguments

Some arguments in the example DAG are taken from the OS environment variables:

airflow/contrib/example_dags/example_gcp_transfer.pyVIEW SOURCE

GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
GCP_DESCRIPTION = os.environ.get('GCP_DESCRIPTION', 'description')
GCP_TRANSFER_TARGET_BUCKET = os.environ.get('GCP_TRANSFER_TARGET_BUCKET')
WAIT_FOR_OPERATION_POKE_INTERVAL = int(os.environ.get('WAIT_FOR_OPERATION_POKE_INTERVAL', 5))

GCP_TRANSFER_SOURCE_AWS_BUCKET = os.environ.get('GCP_TRANSFER_SOURCE_AWS_BUCKET')
GCP_TRANSFER_FIRST_TARGET_BUCKET = os.environ.get(
    'GCP_TRANSFER_FIRST_TARGET_BUCKET', 'gcp-transfer-first-target'
)
GCP_TRANSFER_SECOND_TARGET_BUCKET = os.environ.get(
    'GCP_TRANSFER_SECOND_TARGET_BUCKET', 'gcp-transfer-second-target'
)

Using the operator

airflow/contrib/example_dags/example_gcp_transfer.pyVIEW SOURCE

pause_operation = GcpTransferServiceOperationPauseOperator(
    task_id="pause_operation",
    operation_name="{{task_instance.xcom_pull('wait_for_operation_to_start', "
    "key='sensed_operations')[0]['name']}}",
)

Templating

template_fields = ('operation_name', 'gcp_conn_id', 'api_version')

More information

See Google Cloud Transfer Service - Method: transferOperations.pause

GcpTransferServiceOperationResumeOperator

Resumes a transfer operations.

Arguments

Some arguments in the example DAG are taken from the OS environment variables:

airflow/contrib/example_dags/example_gcp_transfer.pyVIEW SOURCE

GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
GCP_DESCRIPTION = os.environ.get('GCP_DESCRIPTION', 'description')
GCP_TRANSFER_TARGET_BUCKET = os.environ.get('GCP_TRANSFER_TARGET_BUCKET')
WAIT_FOR_OPERATION_POKE_INTERVAL = int(os.environ.get('WAIT_FOR_OPERATION_POKE_INTERVAL', 5))

GCP_TRANSFER_SOURCE_AWS_BUCKET = os.environ.get('GCP_TRANSFER_SOURCE_AWS_BUCKET')
GCP_TRANSFER_FIRST_TARGET_BUCKET = os.environ.get(
    'GCP_TRANSFER_FIRST_TARGET_BUCKET', 'gcp-transfer-first-target'
)
GCP_TRANSFER_SECOND_TARGET_BUCKET = os.environ.get(
    'GCP_TRANSFER_SECOND_TARGET_BUCKET', 'gcp-transfer-second-target'
)

Using the operator

airflow/contrib/example_dags/example_gcp_transfer.pyVIEW SOURCE

resume_operation = GcpTransferServiceOperationResumeOperator(
    task_id="resume_operation", operation_name="{{task_instance.xcom_pull('get_operation')['name']}}"
)

Templating

template_fields = ('operation_name', 'gcp_conn_id', 'api_version')

More information

See Google Cloud Transfer Service - Method: transferOperations.resume

GCPTransferServiceWaitForJobStatusSensor

Waits for at least one operation belonging to the job to have the expected status.

Arguments

Some arguments in the example DAG are taken from the OS environment variables:

airflow/contrib/example_dags/example_gcp_transfer.pyVIEW SOURCE

GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
GCP_DESCRIPTION = os.environ.get('GCP_DESCRIPTION', 'description')
GCP_TRANSFER_TARGET_BUCKET = os.environ.get('GCP_TRANSFER_TARGET_BUCKET')
WAIT_FOR_OPERATION_POKE_INTERVAL = int(os.environ.get('WAIT_FOR_OPERATION_POKE_INTERVAL', 5))

GCP_TRANSFER_SOURCE_AWS_BUCKET = os.environ.get('GCP_TRANSFER_SOURCE_AWS_BUCKET')
GCP_TRANSFER_FIRST_TARGET_BUCKET = os.environ.get(
    'GCP_TRANSFER_FIRST_TARGET_BUCKET', 'gcp-transfer-first-target'
)
GCP_TRANSFER_SECOND_TARGET_BUCKET = os.environ.get(
    'GCP_TRANSFER_SECOND_TARGET_BUCKET', 'gcp-transfer-second-target'
)

Using the operator

airflow/contrib/example_dags/example_gcp_transfer.pyVIEW SOURCE

wait_for_operation_to_end = GCPTransferServiceWaitForJobStatusSensor(
    task_id="wait_for_operation_to_end",
    job_name="{{task_instance.xcom_pull('create_transfer_job_from_aws')['name']}}",
    project_id=GCP_PROJECT_ID,
    expected_statuses={GcpTransferOperationStatus.SUCCESS},
    poke_interval=WAIT_FOR_OPERATION_POKE_INTERVAL,
)

Templating

template_fields = ('job_name',)