All examples below rely on the following variables, which can be passed via environment variables.
airflow/contrib/example_dags/example_gcp_bigtable_operators.pyVIEW SOURCE
GCP_PROJECT_ID = getenv('GCP_PROJECT_ID', 'example-project') CBT_INSTANCE_ID = getenv('CBT_INSTANCE_ID', 'some-instance-id') CBT_INSTANCE_DISPLAY_NAME = getenv('CBT_INSTANCE_DISPLAY_NAME', 'Human-readable name') CBT_INSTANCE_TYPE = getenv('CBT_INSTANCE_TYPE', '2') CBT_INSTANCE_LABELS = getenv('CBT_INSTANCE_LABELS', '{}') CBT_CLUSTER_ID = getenv('CBT_CLUSTER_ID', 'some-cluster-id') CBT_CLUSTER_ZONE = getenv('CBT_CLUSTER_ZONE', 'europe-west1-b') CBT_CLUSTER_NODES = getenv('CBT_CLUSTER_NODES', '3') CBT_CLUSTER_NODES_UPDATED = getenv('CBT_CLUSTER_NODES_UPDATED', '5') CBT_CLUSTER_STORAGE_TYPE = getenv('CBT_CLUSTER_STORAGE_TYPE', '2') CBT_TABLE_ID = getenv('CBT_TABLE_ID', 'some-table-id') CBT_POKE_INTERVAL = getenv('CBT_POKE_INTERVAL', '60')
Use the BigtableInstanceCreateOperator
to create a Google Cloud Bigtable instance.
If the Cloud Bigtable instance with the given ID exists, the operator does not compare its configuration and immediately succeeds. No changes are made to the existing instance.
You can create the operator with or without project id. If project id is missing it will be retrieved from the GCP connection used. Both variants are shown:
airflow/contrib/example_dags/example_gcp_bigtable_operators.pyVIEW SOURCE
create_instance_task = BigtableInstanceCreateOperator( project_id=GCP_PROJECT_ID, instance_id=CBT_INSTANCE_ID, main_cluster_id=CBT_CLUSTER_ID, main_cluster_zone=CBT_CLUSTER_ZONE, instance_display_name=CBT_INSTANCE_DISPLAY_NAME, instance_type=int(CBT_INSTANCE_TYPE), instance_labels=json.loads(CBT_INSTANCE_LABELS), cluster_nodes=int(CBT_CLUSTER_NODES), cluster_storage_type=int(CBT_CLUSTER_STORAGE_TYPE), task_id='create_instance_task', ) create_instance_task2 = BigtableInstanceCreateOperator( instance_id=CBT_INSTANCE_ID, main_cluster_id=CBT_CLUSTER_ID, main_cluster_zone=CBT_CLUSTER_ZONE, instance_display_name=CBT_INSTANCE_DISPLAY_NAME, instance_type=int(CBT_INSTANCE_TYPE), instance_labels=json.loads(CBT_INSTANCE_LABELS), cluster_nodes=int(CBT_CLUSTER_NODES), cluster_storage_type=int(CBT_CLUSTER_STORAGE_TYPE), task_id='create_instance_task2', ) create_instance_task >> create_instance_task2
Use the BigtableInstanceDeleteOperator
to delete a Google Cloud Bigtable instance.
You can create the operator with or without project id. If project id is missing it will be retrieved from the GCP connection used. Both variants are shown:
airflow/contrib/example_dags/example_gcp_bigtable_operators.pyVIEW SOURCE
delete_instance_task = BigtableInstanceDeleteOperator( project_id=GCP_PROJECT_ID, instance_id=CBT_INSTANCE_ID, task_id='delete_instance_task', ) delete_instance_task2 = BigtableInstanceDeleteOperator( instance_id=CBT_INSTANCE_ID, task_id='delete_instance_task2', )
Use the BigtableClusterUpdateOperator
to modify number of nodes in a Cloud Bigtable cluster.
You can create the operator with or without project id. If project id is missing it will be retrieved from the GCP connection used. Both variants are shown:
airflow/contrib/example_dags/example_gcp_bigtable_operators.pyVIEW SOURCE
cluster_update_task = BigtableClusterUpdateOperator( project_id=GCP_PROJECT_ID, instance_id=CBT_INSTANCE_ID, cluster_id=CBT_CLUSTER_ID, nodes=int(CBT_CLUSTER_NODES_UPDATED), task_id='update_cluster_task', ) cluster_update_task2 = BigtableClusterUpdateOperator( instance_id=CBT_INSTANCE_ID, cluster_id=CBT_CLUSTER_ID, nodes=int(CBT_CLUSTER_NODES_UPDATED), task_id='update_cluster_task2', ) cluster_update_task >> cluster_update_task2
Creates a table in a Cloud Bigtable instance.
If the table with given ID exists in the Cloud Bigtable instance, the operator compares the Column Families. If the Column Families are identical operator succeeds. Otherwise, the operator fails with the appropriate error message.
You can create the operator with or without project id. If project id is missing it will be retrieved from the GCP connection used. Both variants are shown:
airflow/contrib/example_dags/example_gcp_bigtable_operators.pyVIEW SOURCE
create_table_task = BigtableTableCreateOperator( project_id=GCP_PROJECT_ID, instance_id=CBT_INSTANCE_ID, table_id=CBT_TABLE_ID, task_id='create_table', ) create_table_task2 = BigtableTableCreateOperator( instance_id=CBT_INSTANCE_ID, table_id=CBT_TABLE_ID, task_id='create_table_task2', ) create_table_task >> create_table_task2
When creating a table, you can specify the optional initial_split_keys
and column_families
. Please refer to the Python Client for Google Cloud Bigtable documentation for Table and for Column Families.
Use the BigtableTableDeleteOperator
to delete a table in Google Cloud Bigtable.
You can create the operator with or without project id. If project id is missing it will be retrieved from the GCP connection used. Both variants are shown:
airflow/contrib/example_dags/example_gcp_bigtable_operators.pyVIEW SOURCE
delete_table_task = BigtableTableDeleteOperator( project_id=GCP_PROJECT_ID, instance_id=CBT_INSTANCE_ID, table_id=CBT_TABLE_ID, task_id='delete_table_task', ) delete_table_task2 = BigtableTableDeleteOperator( instance_id=CBT_INSTANCE_ID, table_id=CBT_TABLE_ID, task_id='delete_table_task2', )
You can create the operator with or without project id. If project id is missing it will be retrieved from the GCP connection used. Both variants are shown:
Use the BigtableTableWaitForReplicationSensor
to wait for the table to replicate fully.
The same arguments apply to this sensor as the BigtableTableCreateOperator.
Note: If the table or the Cloud Bigtable instance does not exist, this sensor waits for the table until timeout hits and does not raise any exception.
airflow/contrib/example_dags/example_gcp_bigtable_operators.pyVIEW SOURCE
wait_for_table_replication_task = BigtableTableWaitForReplicationSensor( project_id=GCP_PROJECT_ID, instance_id=CBT_INSTANCE_ID, table_id=CBT_TABLE_ID, poke_interval=int(CBT_POKE_INTERVAL), timeout=180, task_id='wait_for_table_replication_task', ) wait_for_table_replication_task2 = BigtableTableWaitForReplicationSensor( instance_id=CBT_INSTANCE_ID, table_id=CBT_TABLE_ID, poke_interval=int(CBT_POKE_INTERVAL), timeout=180, task_id='wait_for_table_replication_task2', )