SpecialistOff.NET / Вопросы / Статьи / Фрагменты кода / Резюме / Метки / Помощь / Файлы
НазадМетки: airflow apache airflow
Deletes a database from the specified Cloud Spanner instance. If the database does not exist, no action is taken, and the operator succeeds.
For parameter definition, take a look at CloudSpannerInstanceDatabaseDeleteOperator
.
Some arguments in the example DAG are taken from environment variables.
airflow/contrib/example_dags/example_gcp_spanner.pyVIEW SOURCE
GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project') GCP_SPANNER_INSTANCE_ID = os.environ.get('GCP_SPANNER_INSTANCE_ID', 'testinstance') GCP_SPANNER_DATABASE_ID = os.environ.get('GCP_SPANNER_DATABASE_ID', 'testdatabase') GCP_SPANNER_CONFIG_NAME = os.environ.get('GCP_SPANNER_CONFIG_NAME', 'projects/example-project/instanceConfigs/eur3') GCP_SPANNER_NODE_COUNT = os.environ.get('GCP_SPANNER_NODE_COUNT', '1') GCP_SPANNER_DISPLAY_NAME = os.environ.get('GCP_SPANNER_DISPLAY_NAME', 'Test Instance') # OPERATION_ID should be unique per operation OPERATION_ID = 'unique_operation_id'
You can create the operator with or without project id. If project id is missing it will be retrieved from the GCP connection used. Both variants are shown:
airflow/contrib/example_dags/example_gcp_spanner.pyVIEW SOURCE
spanner_database_delete_task = CloudSpannerInstanceDatabaseDeleteOperator( project_id=GCP_PROJECT_ID, instance_id=GCP_SPANNER_INSTANCE_ID, database_id=GCP_SPANNER_DATABASE_ID, task_id='spanner_database_delete_task' ) spanner_database_delete_task2 = CloudSpannerInstanceDatabaseDeleteOperator( instance_id=GCP_SPANNER_INSTANCE_ID, database_id=GCP_SPANNER_DATABASE_ID, task_id='spanner_database_delete_task2' )
template_fields = ('project_id', 'instance_id', 'gcp_conn_id')
See Google Cloud Spanner API documentation to drop an instance of a database.
Creates a new Cloud Spanner database in the specified instance, or if the desired database exists, assumes success with no changes applied to database configuration. No structure of the database is verified - it’s enough if the database exists with the same name.
For parameter definition, take a look at CloudSpannerInstanceDatabaseDeployOperator
.
Some arguments in the example DAG are taken from environment variables.
airflow/contrib/example_dags/example_gcp_spanner.pyVIEW SOURCE
GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project') GCP_SPANNER_INSTANCE_ID = os.environ.get('GCP_SPANNER_INSTANCE_ID', 'testinstance') GCP_SPANNER_DATABASE_ID = os.environ.get('GCP_SPANNER_DATABASE_ID', 'testdatabase') GCP_SPANNER_CONFIG_NAME = os.environ.get('GCP_SPANNER_CONFIG_NAME', 'projects/example-project/instanceConfigs/eur3') GCP_SPANNER_NODE_COUNT = os.environ.get('GCP_SPANNER_NODE_COUNT', '1') GCP_SPANNER_DISPLAY_NAME = os.environ.get('GCP_SPANNER_DISPLAY_NAME', 'Test Instance') # OPERATION_ID should be unique per operation OPERATION_ID = 'unique_operation_id'
You can create the operator with or without project id. If project id is missing it will be retrieved from the GCP connection used. Both variants are shown:
airflow/contrib/example_dags/example_gcp_spanner.pyVIEW SOURCE
spanner_database_deploy_task = CloudSpannerInstanceDatabaseDeployOperator( project_id=GCP_PROJECT_ID, instance_id=GCP_SPANNER_INSTANCE_ID, database_id=GCP_SPANNER_DATABASE_ID, ddl_statements=[ "CREATE TABLE my_table1 (id INT64, name STRING(MAX)) PRIMARY KEY (id)", "CREATE TABLE my_table2 (id INT64, name STRING(MAX)) PRIMARY KEY (id)", ], task_id='spanner_database_deploy_task' ) spanner_database_deploy_task2 = CloudSpannerInstanceDatabaseDeployOperator( instance_id=GCP_SPANNER_INSTANCE_ID, database_id=GCP_SPANNER_DATABASE_ID, ddl_statements=[ "CREATE TABLE my_table1 (id INT64, name STRING(MAX)) PRIMARY KEY (id)", "CREATE TABLE my_table2 (id INT64, name STRING(MAX)) PRIMARY KEY (id)", ], task_id='spanner_database_deploy_task2' )
template_fields = ('project_id', 'instance_id', 'database_id', 'ddl_statements', 'gcp_conn_id') template_ext = ('.sql', )
See Google Cloud Spanner API documentation to create a new database.
Runs a DDL query in a Cloud Spanner database and allows you to modify the structure of an existing database.
You can optionally specify an operation_id parameter which simplifies determining whether the statements were executed in case the update_database call is replayed (idempotency check). The operation_id should be unique within the database, and must be a valid identifier: [a-z][a-z0-9_]*. More information can be found in the documentation of updateDdl API
For parameter definition take a look at CloudSpannerInstanceDatabaseUpdateOperator
.
Some arguments in the example DAG are taken from environment variables.
airflow/contrib/example_dags/example_gcp_spanner.pyVIEW SOURCE
GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project') GCP_SPANNER_INSTANCE_ID = os.environ.get('GCP_SPANNER_INSTANCE_ID', 'testinstance') GCP_SPANNER_DATABASE_ID = os.environ.get('GCP_SPANNER_DATABASE_ID', 'testdatabase') GCP_SPANNER_CONFIG_NAME = os.environ.get('GCP_SPANNER_CONFIG_NAME', 'projects/example-project/instanceConfigs/eur3') GCP_SPANNER_NODE_COUNT = os.environ.get('GCP_SPANNER_NODE_COUNT', '1') GCP_SPANNER_DISPLAY_NAME = os.environ.get('GCP_SPANNER_DISPLAY_NAME', 'Test Instance') # OPERATION_ID should be unique per operation OPERATION_ID = 'unique_operation_id'
You can create the operator with or without project id. If project id is missing it will be retrieved from the GCP connection used. Both variants are shown:
airflow/contrib/example_dags/example_gcp_spanner.pyVIEW SOURCE
spanner_database_update_task = CloudSpannerInstanceDatabaseUpdateOperator( project_id=GCP_PROJECT_ID, instance_id=GCP_SPANNER_INSTANCE_ID, database_id=GCP_SPANNER_DATABASE_ID, ddl_statements=[ "CREATE TABLE my_table3 (id INT64, name STRING(MAX)) PRIMARY KEY (id)", ], task_id='spanner_database_update_task' )
airflow/contrib/example_dags/example_gcp_spanner.pyVIEW SOURCE
spanner_database_update_idempotent1_task = CloudSpannerInstanceDatabaseUpdateOperator( project_id=GCP_PROJECT_ID, instance_id=GCP_SPANNER_INSTANCE_ID, database_id=GCP_SPANNER_DATABASE_ID, operation_id=OPERATION_ID, ddl_statements=[ "CREATE TABLE my_table_unique (id INT64, name STRING(MAX)) PRIMARY KEY (id)", ], task_id='spanner_database_update_idempotent1_task' ) spanner_database_update_idempotent2_task = CloudSpannerInstanceDatabaseUpdateOperator( instance_id=GCP_SPANNER_INSTANCE_ID, database_id=GCP_SPANNER_DATABASE_ID, operation_id=OPERATION_ID, ddl_statements=[ "CREATE TABLE my_table_unique (id INT64, name STRING(MAX)) PRIMARY KEY (id)", ], task_id='spanner_database_update_idempotent2_task' )
template_fields = ('project_id', 'instance_id', 'database_id', 'ddl_statements', 'gcp_conn_id') template_ext = ('.sql', )
See Google Cloud Spanner API documentation for database update_ddl.
Executes an arbitrary DML query (INSERT, UPDATE, DELETE).
For parameter definition take a look at CloudSpannerInstanceDatabaseQueryOperator
.
Some arguments in the example DAG are taken from environment variables.
airflow/contrib/example_dags/example_gcp_spanner.pyVIEW SOURCE
GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project') GCP_SPANNER_INSTANCE_ID = os.environ.get('GCP_SPANNER_INSTANCE_ID', 'testinstance') GCP_SPANNER_DATABASE_ID = os.environ.get('GCP_SPANNER_DATABASE_ID', 'testdatabase') GCP_SPANNER_CONFIG_NAME = os.environ.get('GCP_SPANNER_CONFIG_NAME', 'projects/example-project/instanceConfigs/eur3') GCP_SPANNER_NODE_COUNT = os.environ.get('GCP_SPANNER_NODE_COUNT', '1') GCP_SPANNER_DISPLAY_NAME = os.environ.get('GCP_SPANNER_DISPLAY_NAME', 'Test Instance') # OPERATION_ID should be unique per operation OPERATION_ID = 'unique_operation_id'
You can create the operator with or without project id. If project id is missing it will be retrieved from the GCP connection used. Both variants are shown:
airflow/contrib/example_dags/example_gcp_spanner.pyVIEW SOURCE
spanner_instance_query_task = CloudSpannerInstanceDatabaseQueryOperator( project_id=GCP_PROJECT_ID, instance_id=GCP_SPANNER_INSTANCE_ID, database_id=GCP_SPANNER_DATABASE_ID, query=["DELETE FROM my_table2 WHERE true"], task_id='spanner_instance_query_task' ) spanner_instance_query_task2 = CloudSpannerInstanceDatabaseQueryOperator( instance_id=GCP_SPANNER_INSTANCE_ID, database_id=GCP_SPANNER_DATABASE_ID, query=["DELETE FROM my_table2 WHERE true"], task_id='spanner_instance_query_task2' )
template_fields = ('project_id', 'instance_id', 'database_id', 'query', 'gcp_conn_id') template_ext = ('.sql',)
See Google Cloud Spanner API documentation for more information about DML syntax.
Deletes a Cloud Spanner instance. If an instance does not exist, no action is taken, and the operator succeeds.
For parameter definition take a look at CloudSpannerInstanceDeleteOperator
.
Some arguments in the example DAG are taken from environment variables:
airflow/contrib/example_dags/example_gcp_spanner.pyVIEW SOURCE
GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project') GCP_SPANNER_INSTANCE_ID = os.environ.get('GCP_SPANNER_INSTANCE_ID', 'testinstance') GCP_SPANNER_DATABASE_ID = os.environ.get('GCP_SPANNER_DATABASE_ID', 'testdatabase') GCP_SPANNER_CONFIG_NAME = os.environ.get('GCP_SPANNER_CONFIG_NAME', 'projects/example-project/instanceConfigs/eur3') GCP_SPANNER_NODE_COUNT = os.environ.get('GCP_SPANNER_NODE_COUNT', '1') GCP_SPANNER_DISPLAY_NAME = os.environ.get('GCP_SPANNER_DISPLAY_NAME', 'Test Instance') # OPERATION_ID should be unique per operation OPERATION_ID = 'unique_operation_id'
You can create the operator with or without project id. If project id is missing it will be retrieved from the GCP connection used. Both variants are shown:
airflow/contrib/example_dags/example_gcp_spanner.pyVIEW SOURCE
spanner_instance_delete_task = CloudSpannerInstanceDeleteOperator( project_id=GCP_PROJECT_ID, instance_id=GCP_SPANNER_INSTANCE_ID, task_id='spanner_instance_delete_task' ) spanner_instance_delete_task2 = CloudSpannerInstanceDeleteOperator( instance_id=GCP_SPANNER_INSTANCE_ID, task_id='spanner_instance_delete_task2' )
template_fields = ('project_id', 'instance_id', 'gcp_conn_id')
See Google Cloud Spanner API documentation to delete an instance.