SpecialistOff.NET / Вопросы / Статьи / Фрагменты кода / Резюме / Метки / Помощь / Файлы
НазадМетки: apache airflow airflow
Create a transfer job.
The function accepts dates in two formats:
consistent with Google API
{ "year": 2019, "month": 2, "day": 11 }
as an datetime
object
The function accepts time in two formats:
consistent with Google API
{ "hours": 12, "minutes": 30, "seconds": 0 }
as an time
object
If you want to create a job transfer that copies data from AWS S3 then you must have a connection configured. Information about configuration for AWS is available: Amazon Web Services Connection The selected connection for AWS can be indicated by the parameter aws_conn_id
.
Some arguments in the example DAG are taken from the OS environment variables:
airflow/contrib/example_dags/example_gcp_transfer.pyVIEW SOURCE
GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project') GCP_DESCRIPTION = os.environ.get('GCP_DESCRIPTION', 'description') GCP_TRANSFER_TARGET_BUCKET = os.environ.get('GCP_TRANSFER_TARGET_BUCKET') WAIT_FOR_OPERATION_POKE_INTERVAL = int(os.environ.get('WAIT_FOR_OPERATION_POKE_INTERVAL', 5)) GCP_TRANSFER_SOURCE_AWS_BUCKET = os.environ.get('GCP_TRANSFER_SOURCE_AWS_BUCKET') GCP_TRANSFER_FIRST_TARGET_BUCKET = os.environ.get( 'GCP_TRANSFER_FIRST_TARGET_BUCKET', 'gcp-transfer-first-target' ) GCP_TRANSFER_SECOND_TARGET_BUCKET = os.environ.get( 'GCP_TRANSFER_SECOND_TARGET_BUCKET', 'gcp-transfer-second-target' )
airflow/contrib/example_dags/example_gcp_transfer.pyVIEW SOURCE
gcs_to_gcs_transfer_body = { DESCRIPTION: GCP_DESCRIPTION, STATUS: GcpTransferJobsStatus.ENABLED, PROJECT_ID: GCP_PROJECT_ID, SCHEDULE: { SCHEDULE_START_DATE: datetime(2015, 1, 1).date(), SCHEDULE_END_DATE: datetime(2030, 1, 1).date(), START_TIME_OF_DAY: (datetime.utcnow() + timedelta(minutes=2)).time(), }, TRANSFER_SPEC: { GCS_DATA_SOURCE: {BUCKET_NAME: GCP_TRANSFER_FIRST_TARGET_BUCKET}, GCS_DATA_SINK: {BUCKET_NAME: GCP_TRANSFER_SECOND_TARGET_BUCKET}, TRANSFER_OPTIONS: {ALREADY_EXISTING_IN_SINK: True}, }, } # type: Dict[str, Any]
airflow/contrib/example_dags/example_gcp_transfer.pyVIEW SOURCE
aws_to_gcs_transfer_body = { DESCRIPTION: GCP_DESCRIPTION, STATUS: GcpTransferJobsStatus.ENABLED, PROJECT_ID: GCP_PROJECT_ID, SCHEDULE: { SCHEDULE_START_DATE: datetime(2015, 1, 1).date(), SCHEDULE_END_DATE: datetime(2030, 1, 1).date(), START_TIME_OF_DAY: (datetime.utcnow() + timedelta(minutes=2)).time(), }, TRANSFER_SPEC: { AWS_S3_DATA_SOURCE: {BUCKET_NAME: GCP_TRANSFER_SOURCE_AWS_BUCKET}, GCS_DATA_SINK: {BUCKET_NAME: GCP_TRANSFER_FIRST_TARGET_BUCKET}, TRANSFER_OPTIONS: {ALREADY_EXISTING_IN_SINK: True}, }, }
airflow/contrib/example_dags/example_gcp_transfer.pyVIEW SOURCE
create_transfer_job_from_aws = GcpTransferServiceJobCreateOperator( task_id="create_transfer_job_from_aws", body=aws_to_gcs_transfer_body )
template_fields = ('body', 'gcp_conn_id', 'aws_conn_id')
See Google Cloud Transfer Service - Method: transferJobs.create.
Deletes a transfer job.
Some arguments in the example DAG are taken from the OS environment variables:
airflow/contrib/example_dags/example_gcp_transfer.pyVIEW SOURCE
GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project') GCP_DESCRIPTION = os.environ.get('GCP_DESCRIPTION', 'description') GCP_TRANSFER_TARGET_BUCKET = os.environ.get('GCP_TRANSFER_TARGET_BUCKET') WAIT_FOR_OPERATION_POKE_INTERVAL = int(os.environ.get('WAIT_FOR_OPERATION_POKE_INTERVAL', 5)) GCP_TRANSFER_SOURCE_AWS_BUCKET = os.environ.get('GCP_TRANSFER_SOURCE_AWS_BUCKET') GCP_TRANSFER_FIRST_TARGET_BUCKET = os.environ.get( 'GCP_TRANSFER_FIRST_TARGET_BUCKET', 'gcp-transfer-first-target' ) GCP_TRANSFER_SECOND_TARGET_BUCKET = os.environ.get( 'GCP_TRANSFER_SECOND_TARGET_BUCKET', 'gcp-transfer-second-target' )
airflow/contrib/example_dags/example_gcp_transfer.pyVIEW SOURCE
delete_transfer_from_aws_job = GcpTransferServiceJobDeleteOperator( task_id="delete_transfer_from_aws_job", job_name="{{task_instance.xcom_pull('create_transfer_job_from_aws')['name']}}", project_id=GCP_PROJECT_ID, )
template_fields = ('job_name', 'project_id', 'gcp_conn_id', 'api_version')
See Google Cloud Transfer Service - REST Resource: transferJobs - Status
Updates a transfer job.
Some arguments in the example DAG are taken from the OS environment variables:
airflow/contrib/example_dags/example_gcp_transfer.pyVIEW SOURCE
GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project') GCP_DESCRIPTION = os.environ.get('GCP_DESCRIPTION', 'description') GCP_TRANSFER_TARGET_BUCKET = os.environ.get('GCP_TRANSFER_TARGET_BUCKET') WAIT_FOR_OPERATION_POKE_INTERVAL = int(os.environ.get('WAIT_FOR_OPERATION_POKE_INTERVAL', 5)) GCP_TRANSFER_SOURCE_AWS_BUCKET = os.environ.get('GCP_TRANSFER_SOURCE_AWS_BUCKET') GCP_TRANSFER_FIRST_TARGET_BUCKET = os.environ.get( 'GCP_TRANSFER_FIRST_TARGET_BUCKET', 'gcp-transfer-first-target' ) GCP_TRANSFER_SECOND_TARGET_BUCKET = os.environ.get( 'GCP_TRANSFER_SECOND_TARGET_BUCKET', 'gcp-transfer-second-target' )
airflow/contrib/example_dags/example_gcp_transfer.pyVIEW SOURCE
update_body = { PROJECT_ID: GCP_PROJECT_ID, TRANSFER_JOB: {DESCRIPTION: "{}_updated".format(GCP_DESCRIPTION)}, TRANSFER_JOB_FIELD_MASK: "description", }
airflow/contrib/example_dags/example_gcp_transfer.pyVIEW SOURCE
update_job = GcpTransferServiceJobUpdateOperator( task_id="update_job", job_name="{{task_instance.xcom_pull('create_transfer_job_from_aws')['name']}}", body=update_body, )
template_fields = ('job_name', 'body', 'gcp_conn_id', 'aws_conn_id')
See Google Cloud Transfer Service - Method: transferJobs.patch
Gets a transfer operation. The result is returned to XCOM.
Some arguments in the example DAG are taken from the OS environment variables:
airflow/contrib/example_dags/example_gcp_transfer.pyVIEW SOURCE
GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project') GCP_DESCRIPTION = os.environ.get('GCP_DESCRIPTION', 'description') GCP_TRANSFER_TARGET_BUCKET = os.environ.get('GCP_TRANSFER_TARGET_BUCKET') WAIT_FOR_OPERATION_POKE_INTERVAL = int(os.environ.get('WAIT_FOR_OPERATION_POKE_INTERVAL', 5)) GCP_TRANSFER_SOURCE_AWS_BUCKET = os.environ.get('GCP_TRANSFER_SOURCE_AWS_BUCKET') GCP_TRANSFER_FIRST_TARGET_BUCKET = os.environ.get( 'GCP_TRANSFER_FIRST_TARGET_BUCKET', 'gcp-transfer-first-target' ) GCP_TRANSFER_SECOND_TARGET_BUCKET = os.environ.get( 'GCP_TRANSFER_SECOND_TARGET_BUCKET', 'gcp-transfer-second-target' )
airflow/contrib/example_dags/example_gcp_transfer.pyVIEW SOURCE
cancel_operation = GcpTransferServiceOperationCancelOperator( task_id="cancel_operation", operation_name="{{task_instance.xcom_pull(" "'wait_for_second_operation_to_start', key='sensed_operations')[0]['name']}}", )
template_fields = ('operation_name', 'gcp_conn_id', 'api_version')
See Google Cloud Transfer Service - Method: transferOperations.cancel
Gets a transfer operation. The result is returned to XCOM.
Some arguments in the example DAG are taken from the OS environment variables:
airflow/contrib/example_dags/example_gcp_transfer.pyVIEW SOURCE
GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project') GCP_DESCRIPTION = os.environ.get('GCP_DESCRIPTION', 'description') GCP_TRANSFER_TARGET_BUCKET = os.environ.get('GCP_TRANSFER_TARGET_BUCKET') WAIT_FOR_OPERATION_POKE_INTERVAL = int(os.environ.get('WAIT_FOR_OPERATION_POKE_INTERVAL', 5)) GCP_TRANSFER_SOURCE_AWS_BUCKET = os.environ.get('GCP_TRANSFER_SOURCE_AWS_BUCKET') GCP_TRANSFER_FIRST_TARGET_BUCKET = os.environ.get( 'GCP_TRANSFER_FIRST_TARGET_BUCKET', 'gcp-transfer-first-target' ) GCP_TRANSFER_SECOND_TARGET_BUCKET = os.environ.get( 'GCP_TRANSFER_SECOND_TARGET_BUCKET', 'gcp-transfer-second-target' )
airflow/contrib/example_dags/example_gcp_transfer.pyVIEW SOURCE
get_operation = GcpTransferServiceOperationGetOperator( task_id="get_operation", operation_name="{{task_instance.xcom_pull('list_operations')[0]['name']}}" )
template_fields = ('operation_name', 'gcp_conn_id')
See Google Cloud Transfer Service - Method: transferOperations.get
List a transfer operations. The result is returned to XCOM.
Some arguments in the example DAG are taken from the OS environment variables:
airflow/contrib/example_dags/example_gcp_transfer.pyVIEW SOURCE
GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project') GCP_DESCRIPTION = os.environ.get('GCP_DESCRIPTION', 'description') GCP_TRANSFER_TARGET_BUCKET = os.environ.get('GCP_TRANSFER_TARGET_BUCKET') WAIT_FOR_OPERATION_POKE_INTERVAL = int(os.environ.get('WAIT_FOR_OPERATION_POKE_INTERVAL', 5)) GCP_TRANSFER_SOURCE_AWS_BUCKET = os.environ.get('GCP_TRANSFER_SOURCE_AWS_BUCKET') GCP_TRANSFER_FIRST_TARGET_BUCKET = os.environ.get( 'GCP_TRANSFER_FIRST_TARGET_BUCKET', 'gcp-transfer-first-target' ) GCP_TRANSFER_SECOND_TARGET_BUCKET = os.environ.get( 'GCP_TRANSFER_SECOND_TARGET_BUCKET', 'gcp-transfer-second-target' )
airflow/contrib/example_dags/example_gcp_transfer.pyVIEW SOURCE
list_operations = GcpTransferServiceOperationsListOperator( task_id="list_operations", filter={ FILTER_PROJECT_ID: GCP_PROJECT_ID, FILTER_JOB_NAMES: ["{{task_instance.xcom_pull('create_transfer_job_from_aws')['name']}}"], }, )
template_fields = ('filter', 'gcp_conn_id')
See Google Cloud Transfer Service - Method: transferOperations.list
Pauses a transfer operations.
Some arguments in the example DAG are taken from the OS environment variables:
airflow/contrib/example_dags/example_gcp_transfer.pyVIEW SOURCE
GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project') GCP_DESCRIPTION = os.environ.get('GCP_DESCRIPTION', 'description') GCP_TRANSFER_TARGET_BUCKET = os.environ.get('GCP_TRANSFER_TARGET_BUCKET') WAIT_FOR_OPERATION_POKE_INTERVAL = int(os.environ.get('WAIT_FOR_OPERATION_POKE_INTERVAL', 5)) GCP_TRANSFER_SOURCE_AWS_BUCKET = os.environ.get('GCP_TRANSFER_SOURCE_AWS_BUCKET') GCP_TRANSFER_FIRST_TARGET_BUCKET = os.environ.get( 'GCP_TRANSFER_FIRST_TARGET_BUCKET', 'gcp-transfer-first-target' ) GCP_TRANSFER_SECOND_TARGET_BUCKET = os.environ.get( 'GCP_TRANSFER_SECOND_TARGET_BUCKET', 'gcp-transfer-second-target' )
airflow/contrib/example_dags/example_gcp_transfer.pyVIEW SOURCE
pause_operation = GcpTransferServiceOperationPauseOperator( task_id="pause_operation", operation_name="{{task_instance.xcom_pull('wait_for_operation_to_start', " "key='sensed_operations')[0]['name']}}", )
template_fields = ('operation_name', 'gcp_conn_id', 'api_version')
See Google Cloud Transfer Service - Method: transferOperations.pause
Resumes a transfer operations.
Some arguments in the example DAG are taken from the OS environment variables:
airflow/contrib/example_dags/example_gcp_transfer.pyVIEW SOURCE
GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project') GCP_DESCRIPTION = os.environ.get('GCP_DESCRIPTION', 'description') GCP_TRANSFER_TARGET_BUCKET = os.environ.get('GCP_TRANSFER_TARGET_BUCKET') WAIT_FOR_OPERATION_POKE_INTERVAL = int(os.environ.get('WAIT_FOR_OPERATION_POKE_INTERVAL', 5)) GCP_TRANSFER_SOURCE_AWS_BUCKET = os.environ.get('GCP_TRANSFER_SOURCE_AWS_BUCKET') GCP_TRANSFER_FIRST_TARGET_BUCKET = os.environ.get( 'GCP_TRANSFER_FIRST_TARGET_BUCKET', 'gcp-transfer-first-target' ) GCP_TRANSFER_SECOND_TARGET_BUCKET = os.environ.get( 'GCP_TRANSFER_SECOND_TARGET_BUCKET', 'gcp-transfer-second-target' )
airflow/contrib/example_dags/example_gcp_transfer.pyVIEW SOURCE
resume_operation = GcpTransferServiceOperationResumeOperator( task_id="resume_operation", operation_name="{{task_instance.xcom_pull('get_operation')['name']}}" )
template_fields = ('operation_name', 'gcp_conn_id', 'api_version')
See Google Cloud Transfer Service - Method: transferOperations.resume
Waits for at least one operation belonging to the job to have the expected status.
Some arguments in the example DAG are taken from the OS environment variables:
airflow/contrib/example_dags/example_gcp_transfer.pyVIEW SOURCE
GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project') GCP_DESCRIPTION = os.environ.get('GCP_DESCRIPTION', 'description') GCP_TRANSFER_TARGET_BUCKET = os.environ.get('GCP_TRANSFER_TARGET_BUCKET') WAIT_FOR_OPERATION_POKE_INTERVAL = int(os.environ.get('WAIT_FOR_OPERATION_POKE_INTERVAL', 5)) GCP_TRANSFER_SOURCE_AWS_BUCKET = os.environ.get('GCP_TRANSFER_SOURCE_AWS_BUCKET') GCP_TRANSFER_FIRST_TARGET_BUCKET = os.environ.get( 'GCP_TRANSFER_FIRST_TARGET_BUCKET', 'gcp-transfer-first-target' ) GCP_TRANSFER_SECOND_TARGET_BUCKET = os.environ.get( 'GCP_TRANSFER_SECOND_TARGET_BUCKET', 'gcp-transfer-second-target' )
airflow/contrib/example_dags/example_gcp_transfer.pyVIEW SOURCE
wait_for_operation_to_end = GCPTransferServiceWaitForJobStatusSensor( task_id="wait_for_operation_to_end", job_name="{{task_instance.xcom_pull('create_transfer_job_from_aws')['name']}}", project_id=GCP_PROJECT_ID, expected_statuses={GcpTransferOperationStatus.SUCCESS}, poke_interval=WAIT_FOR_OPERATION_POKE_INTERVAL, )
template_fields = ('job_name',)