SpecialistOff.NET / Вопросы / Статьи / Фрагменты кода / Резюме / Метки / Помощь / Файлы
НазадМетки: airflow apache airflow
Deletes a database from the specified Cloud Spanner instance. If the database does not exist, no action is taken, and the operator succeeds.
For parameter definition, take a look at CloudSpannerInstanceDatabaseDeleteOperator.
Some arguments in the example DAG are taken from environment variables.
airflow/contrib/example_dags/example_gcp_spanner.pyVIEW SOURCE
GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
GCP_SPANNER_INSTANCE_ID = os.environ.get('GCP_SPANNER_INSTANCE_ID', 'testinstance')
GCP_SPANNER_DATABASE_ID = os.environ.get('GCP_SPANNER_DATABASE_ID', 'testdatabase')
GCP_SPANNER_CONFIG_NAME = os.environ.get('GCP_SPANNER_CONFIG_NAME',
'projects/example-project/instanceConfigs/eur3')
GCP_SPANNER_NODE_COUNT = os.environ.get('GCP_SPANNER_NODE_COUNT', '1')
GCP_SPANNER_DISPLAY_NAME = os.environ.get('GCP_SPANNER_DISPLAY_NAME', 'Test Instance')
# OPERATION_ID should be unique per operation
OPERATION_ID = 'unique_operation_id'
You can create the operator with or without project id. If project id is missing it will be retrieved from the GCP connection used. Both variants are shown:
airflow/contrib/example_dags/example_gcp_spanner.pyVIEW SOURCE
spanner_database_delete_task = CloudSpannerInstanceDatabaseDeleteOperator(
project_id=GCP_PROJECT_ID,
instance_id=GCP_SPANNER_INSTANCE_ID,
database_id=GCP_SPANNER_DATABASE_ID,
task_id='spanner_database_delete_task'
)
spanner_database_delete_task2 = CloudSpannerInstanceDatabaseDeleteOperator(
instance_id=GCP_SPANNER_INSTANCE_ID,
database_id=GCP_SPANNER_DATABASE_ID,
task_id='spanner_database_delete_task2'
)
template_fields = ('project_id', 'instance_id', 'gcp_conn_id')
See Google Cloud Spanner API documentation to drop an instance of a database.
Creates a new Cloud Spanner database in the specified instance, or if the desired database exists, assumes success with no changes applied to database configuration. No structure of the database is verified - it’s enough if the database exists with the same name.
For parameter definition, take a look at CloudSpannerInstanceDatabaseDeployOperator.
Some arguments in the example DAG are taken from environment variables.
airflow/contrib/example_dags/example_gcp_spanner.pyVIEW SOURCE
GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
GCP_SPANNER_INSTANCE_ID = os.environ.get('GCP_SPANNER_INSTANCE_ID', 'testinstance')
GCP_SPANNER_DATABASE_ID = os.environ.get('GCP_SPANNER_DATABASE_ID', 'testdatabase')
GCP_SPANNER_CONFIG_NAME = os.environ.get('GCP_SPANNER_CONFIG_NAME',
'projects/example-project/instanceConfigs/eur3')
GCP_SPANNER_NODE_COUNT = os.environ.get('GCP_SPANNER_NODE_COUNT', '1')
GCP_SPANNER_DISPLAY_NAME = os.environ.get('GCP_SPANNER_DISPLAY_NAME', 'Test Instance')
# OPERATION_ID should be unique per operation
OPERATION_ID = 'unique_operation_id'
You can create the operator with or without project id. If project id is missing it will be retrieved from the GCP connection used. Both variants are shown:
airflow/contrib/example_dags/example_gcp_spanner.pyVIEW SOURCE
spanner_database_deploy_task = CloudSpannerInstanceDatabaseDeployOperator(
project_id=GCP_PROJECT_ID,
instance_id=GCP_SPANNER_INSTANCE_ID,
database_id=GCP_SPANNER_DATABASE_ID,
ddl_statements=[
"CREATE TABLE my_table1 (id INT64, name STRING(MAX)) PRIMARY KEY (id)",
"CREATE TABLE my_table2 (id INT64, name STRING(MAX)) PRIMARY KEY (id)",
],
task_id='spanner_database_deploy_task'
)
spanner_database_deploy_task2 = CloudSpannerInstanceDatabaseDeployOperator(
instance_id=GCP_SPANNER_INSTANCE_ID,
database_id=GCP_SPANNER_DATABASE_ID,
ddl_statements=[
"CREATE TABLE my_table1 (id INT64, name STRING(MAX)) PRIMARY KEY (id)",
"CREATE TABLE my_table2 (id INT64, name STRING(MAX)) PRIMARY KEY (id)",
],
task_id='spanner_database_deploy_task2'
)
template_fields = ('project_id', 'instance_id', 'database_id', 'ddl_statements',
'gcp_conn_id')
template_ext = ('.sql', )
See Google Cloud Spanner API documentation to create a new database.
Runs a DDL query in a Cloud Spanner database and allows you to modify the structure of an existing database.
You can optionally specify an operation_id parameter which simplifies determining whether the statements were executed in case the update_database call is replayed (idempotency check). The operation_id should be unique within the database, and must be a valid identifier: [a-z][a-z0-9_]*. More information can be found in the documentation of updateDdl API
For parameter definition take a look at CloudSpannerInstanceDatabaseUpdateOperator.
Some arguments in the example DAG are taken from environment variables.
airflow/contrib/example_dags/example_gcp_spanner.pyVIEW SOURCE
GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
GCP_SPANNER_INSTANCE_ID = os.environ.get('GCP_SPANNER_INSTANCE_ID', 'testinstance')
GCP_SPANNER_DATABASE_ID = os.environ.get('GCP_SPANNER_DATABASE_ID', 'testdatabase')
GCP_SPANNER_CONFIG_NAME = os.environ.get('GCP_SPANNER_CONFIG_NAME',
'projects/example-project/instanceConfigs/eur3')
GCP_SPANNER_NODE_COUNT = os.environ.get('GCP_SPANNER_NODE_COUNT', '1')
GCP_SPANNER_DISPLAY_NAME = os.environ.get('GCP_SPANNER_DISPLAY_NAME', 'Test Instance')
# OPERATION_ID should be unique per operation
OPERATION_ID = 'unique_operation_id'
You can create the operator with or without project id. If project id is missing it will be retrieved from the GCP connection used. Both variants are shown:
airflow/contrib/example_dags/example_gcp_spanner.pyVIEW SOURCE
spanner_database_update_task = CloudSpannerInstanceDatabaseUpdateOperator(
project_id=GCP_PROJECT_ID,
instance_id=GCP_SPANNER_INSTANCE_ID,
database_id=GCP_SPANNER_DATABASE_ID,
ddl_statements=[
"CREATE TABLE my_table3 (id INT64, name STRING(MAX)) PRIMARY KEY (id)",
],
task_id='spanner_database_update_task'
)
airflow/contrib/example_dags/example_gcp_spanner.pyVIEW SOURCE
spanner_database_update_idempotent1_task = CloudSpannerInstanceDatabaseUpdateOperator(
project_id=GCP_PROJECT_ID,
instance_id=GCP_SPANNER_INSTANCE_ID,
database_id=GCP_SPANNER_DATABASE_ID,
operation_id=OPERATION_ID,
ddl_statements=[
"CREATE TABLE my_table_unique (id INT64, name STRING(MAX)) PRIMARY KEY (id)",
],
task_id='spanner_database_update_idempotent1_task'
)
spanner_database_update_idempotent2_task = CloudSpannerInstanceDatabaseUpdateOperator(
instance_id=GCP_SPANNER_INSTANCE_ID,
database_id=GCP_SPANNER_DATABASE_ID,
operation_id=OPERATION_ID,
ddl_statements=[
"CREATE TABLE my_table_unique (id INT64, name STRING(MAX)) PRIMARY KEY (id)",
],
task_id='spanner_database_update_idempotent2_task'
)
template_fields = ('project_id', 'instance_id', 'database_id', 'ddl_statements',
'gcp_conn_id')
template_ext = ('.sql', )
See Google Cloud Spanner API documentation for database update_ddl.
Executes an arbitrary DML query (INSERT, UPDATE, DELETE).
For parameter definition take a look at CloudSpannerInstanceDatabaseQueryOperator.
Some arguments in the example DAG are taken from environment variables.
airflow/contrib/example_dags/example_gcp_spanner.pyVIEW SOURCE
GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
GCP_SPANNER_INSTANCE_ID = os.environ.get('GCP_SPANNER_INSTANCE_ID', 'testinstance')
GCP_SPANNER_DATABASE_ID = os.environ.get('GCP_SPANNER_DATABASE_ID', 'testdatabase')
GCP_SPANNER_CONFIG_NAME = os.environ.get('GCP_SPANNER_CONFIG_NAME',
'projects/example-project/instanceConfigs/eur3')
GCP_SPANNER_NODE_COUNT = os.environ.get('GCP_SPANNER_NODE_COUNT', '1')
GCP_SPANNER_DISPLAY_NAME = os.environ.get('GCP_SPANNER_DISPLAY_NAME', 'Test Instance')
# OPERATION_ID should be unique per operation
OPERATION_ID = 'unique_operation_id'
You can create the operator with or without project id. If project id is missing it will be retrieved from the GCP connection used. Both variants are shown:
airflow/contrib/example_dags/example_gcp_spanner.pyVIEW SOURCE
spanner_instance_query_task = CloudSpannerInstanceDatabaseQueryOperator(
project_id=GCP_PROJECT_ID,
instance_id=GCP_SPANNER_INSTANCE_ID,
database_id=GCP_SPANNER_DATABASE_ID,
query=["DELETE FROM my_table2 WHERE true"],
task_id='spanner_instance_query_task'
)
spanner_instance_query_task2 = CloudSpannerInstanceDatabaseQueryOperator(
instance_id=GCP_SPANNER_INSTANCE_ID,
database_id=GCP_SPANNER_DATABASE_ID,
query=["DELETE FROM my_table2 WHERE true"],
task_id='spanner_instance_query_task2'
)
template_fields = ('project_id', 'instance_id', 'database_id', 'query', 'gcp_conn_id')
template_ext = ('.sql',)
See Google Cloud Spanner API documentation for more information about DML syntax.
Deletes a Cloud Spanner instance. If an instance does not exist, no action is taken, and the operator succeeds.
For parameter definition take a look at CloudSpannerInstanceDeleteOperator.
Some arguments in the example DAG are taken from environment variables:
airflow/contrib/example_dags/example_gcp_spanner.pyVIEW SOURCE
GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
GCP_SPANNER_INSTANCE_ID = os.environ.get('GCP_SPANNER_INSTANCE_ID', 'testinstance')
GCP_SPANNER_DATABASE_ID = os.environ.get('GCP_SPANNER_DATABASE_ID', 'testdatabase')
GCP_SPANNER_CONFIG_NAME = os.environ.get('GCP_SPANNER_CONFIG_NAME',
'projects/example-project/instanceConfigs/eur3')
GCP_SPANNER_NODE_COUNT = os.environ.get('GCP_SPANNER_NODE_COUNT', '1')
GCP_SPANNER_DISPLAY_NAME = os.environ.get('GCP_SPANNER_DISPLAY_NAME', 'Test Instance')
# OPERATION_ID should be unique per operation
OPERATION_ID = 'unique_operation_id'
You can create the operator with or without project id. If project id is missing it will be retrieved from the GCP connection used. Both variants are shown:
airflow/contrib/example_dags/example_gcp_spanner.pyVIEW SOURCE
spanner_instance_delete_task = CloudSpannerInstanceDeleteOperator(
project_id=GCP_PROJECT_ID,
instance_id=GCP_SPANNER_INSTANCE_ID,
task_id='spanner_instance_delete_task'
)
spanner_instance_delete_task2 = CloudSpannerInstanceDeleteOperator(
instance_id=GCP_SPANNER_INSTANCE_ID,
task_id='spanner_instance_delete_task2'
)
template_fields = ('project_id', 'instance_id', 'gcp_conn_id')
See Google Cloud Spanner API documentation to delete an instance.