From a52b5b03278e3eb8e696098e3b6292a10d76af5b Mon Sep 17 00:00:00 2001 From: Benjamin Webb Date: Mon, 11 May 2026 17:24:10 -0400 Subject: [PATCH 1/2] Use declarative SQLalchemy Table Use declarative SQLAlchemy table and create if not present in the schema --- .github/workflows/main.yml | 1 - pygeoapi/process/manager/postgresql.py | 121 ++++++++++++------ ...postgres_manager_full_structure.backup.sql | 69 ---------- 3 files changed, 85 insertions(+), 106 deletions(-) delete mode 100644 tests/data/postgres_manager_full_structure.backup.sql diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 6a9e4c1c8..c6bfeb83b 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -139,7 +139,6 @@ jobs: gunzip < tests/data/hotosm_bdi_waterways.sql.gz | psql postgresql://postgres:${{ secrets.DatabasePassword || 'postgres' }}@localhost:5432/test psql postgresql://postgres:${{ secrets.DatabasePassword || 'postgres' }}@localhost:5432/test -f tests/data/dummy_data.sql psql postgresql://postgres:${{ secrets.DatabasePassword || 'postgres' }}@localhost:5432/test -f tests/data/dummy_types_data.sql - psql postgresql://postgres:${{ secrets.DatabasePassword || 'postgres' }}@localhost:5432/test -f tests/data/postgres_manager_full_structure.backup.sql mysql -h 127.0.0.1 -P 3306 -u root -p'mysql' test_geo_app < tests/data/mysql_data.sql docker ps - name: run API tests ⚙️ diff --git a/pygeoapi/process/manager/postgresql.py b/pygeoapi/process/manager/postgresql.py index 05dc408ee..8de7218a8 100644 --- a/pygeoapi/process/manager/postgresql.py +++ b/pygeoapi/process/manager/postgresql.py @@ -47,19 +47,29 @@ from pathlib import Path from typing import Any, Tuple -from sqlalchemy import insert, update, delete -from sqlalchemy.orm import Session +from sqlalchemy import ( + Column, + DateTime, + delete, + insert, + Integer, + LargeBinary, + String, + Table, + text, + update +) +from sqlalchemy.engine import Engine +from sqlalchemy.orm import declarative_base, Session +from pygeoapi.formats import F_JSON, F_JSONLD, FORMAT_TYPES from pygeoapi.process.base import ( JobNotFoundError, JobResultNotFoundError, ProcessorGenericError ) -from pygeoapi.formats import FORMAT_TYPES, F_JSON, F_JSONLD from pygeoapi.process.manager.base import BaseManager -from pygeoapi.provider.sql import ( - get_engine, get_table_model, store_db_parameters -) +from pygeoapi.provider.sql import get_engine, store_db_parameters from pygeoapi.util import JobStatus @@ -70,6 +80,7 @@ class PostgreSQLManager(BaseManager): """PostgreSQL Manager""" default_port = 5432 + _store_db_parameters = store_db_parameters def __init__(self, manager_def: dict): """ @@ -87,7 +98,7 @@ def __init__(self, manager_def: dict): self.connection = manager_def['connection'] options = manager_def.get('options', {}) - store_db_parameters(self, manager_def['connection'], options) + self._store_db_parameters(manager_def['connection'], options) self._engine = get_engine( 'postgresql+psycopg2', self.db_host, @@ -98,22 +109,23 @@ def __init__(self, manager_def: dict): self.db_conn, **self.db_options ) + self.table_output = self.output_dir is None + self.table_model = get_table_model( + self.db_search_path, self._engine, self.table_output + ) + self.c = self.table_model.c try: LOGGER.debug('Getting table model') - self.table_model = get_table_model( - 'jobs', - self.id_field, - self.db_search_path, - self._engine - ) + except Exception as err: msg = 'Table model fetch failed' LOGGER.error(f'{msg}: {err}') raise ProcessorGenericError(msg) - def get_jobs(self, status: JobStatus = None, limit=None, offset=None - ) -> dict: + def get_jobs( + self, status: JobStatus = None, limit=None, offset=None + ) -> dict: """ Get jobs @@ -129,15 +141,12 @@ def get_jobs(self, status: JobStatus = None, limit=None, offset=None LOGGER.debug('Querying for jobs') with Session(self._engine) as session: results = session.query(self.table_model) + if status is not None: - column = getattr(self.table_model, 'status') - results = results.filter(column == status.value) + results = results.filter(self.c.status == status.value) - jobs = [r.__dict__ for r in results.all()] - return { - 'jobs': jobs, - 'numberMatched': len(jobs) - } + jobs = [r._asdict() for r in results.all()] + return {'jobs': jobs, 'numberMatched': len(jobs)} def add_job(self, job_metadata: dict) -> str: """ @@ -151,8 +160,9 @@ def add_job(self, job_metadata: dict) -> str: LOGGER.debug('Adding job') with Session(self._engine) as session: try: - session.execute(insert(self.table_model) - .values(**job_metadata)) + session.execute( + insert(self.table_model).values(**job_metadata) + ) session.commit() except Exception as err: session.rollback() @@ -177,10 +187,9 @@ def update_job(self, job_id: str, update_dict: dict) -> bool: LOGGER.debug('Updating job') with Session(self._engine) as session: try: - column = getattr(self.table_model, self.id_field) stmt = ( update(self.table_model) - .where(column == job_id) + .where(self.c.identifier == job_id) .values(**update_dict) ) result = session.execute(stmt) @@ -207,14 +216,14 @@ def get_job(self, job_id: str) -> dict: LOGGER.debug('Querying for job') with Session(self._engine) as session: - results = session.query(self.table_model) - column = getattr(self.table_model, self.id_field) - results = session.query(self.table_model).filter(column == job_id) + results = session.query(self.table_model).filter( + self.c.identifier == job_id + ) first = results.first() if first is not None: - return first.__dict__ + return first._asdict() else: raise JobNotFoundError() @@ -238,10 +247,8 @@ def delete_job(self, job_id: str) -> bool: LOGGER.debug('Deleting job') with Session(self._engine) as session: try: - column = getattr(self.table_model, self.id_field) - stmt = ( - delete(self.table_model) - .where(column == job_id) + stmt = delete(self.table_model).where( + self.c.identifier == job_id ) result = session.execute(stmt) session.commit() @@ -288,8 +295,11 @@ def get_job_result(self, job_id: str) -> Tuple[str, Any]: else: try: location = Path(location) - if mimetype in (None, FORMAT_TYPES[F_JSON], - FORMAT_TYPES[F_JSONLD]): + if mimetype in ( + None, + FORMAT_TYPES[F_JSON], + FORMAT_TYPES[F_JSONLD] + ): with location.open('r', encoding='utf-8') as fh: result = json.load(fh) else: @@ -302,3 +312,42 @@ def get_job_result(self, job_id: str) -> Tuple[str, Any]: def __repr__(self): return f' {self.name}' + + +def get_table_model( + db_search_path: tuple[str], engine: Engine, table_output: bool +) -> Any: + """Define SQLAlchemy job model""" + + Base = declarative_base() + schema = db_search_path[0] + + Jobs = Table( + 'jobs', + Base.metadata, + Column('identifier', String, primary_key=True, nullable=False), + Column( + 'type', + String, + nullable=False, + server_default=text("'process'::character varying") + ), + Column('process_id', String, nullable=False), + Column('created', DateTime), + Column('started', DateTime), + Column('finished', DateTime), + Column('updated', DateTime), + Column('status', String, nullable=False), + Column('location', String), + Column('mimetype', String), + Column('message', String), + Column('progress', Integer, nullable=False), + schema=schema + ) + + if table_output: + Jobs.append_column(Column('output', LargeBinary)) + + Base.metadata.create_all(engine, tables=[Jobs], checkfirst=True) + + return Jobs diff --git a/tests/data/postgres_manager_full_structure.backup.sql b/tests/data/postgres_manager_full_structure.backup.sql deleted file mode 100644 index 220737c63..000000000 --- a/tests/data/postgres_manager_full_structure.backup.sql +++ /dev/null @@ -1,69 +0,0 @@ --- --- PostgreSQL database dump --- - --- Dumped from database version 14.12 (Ubuntu 14.12-1.pgdg20.04+1) --- Dumped by pg_dump version 16.3 (Ubuntu 16.3-1.pgdg20.04+1) - -SET statement_timeout = 0; -SET lock_timeout = 0; -SET idle_in_transaction_session_timeout = 0; -SET client_encoding = 'UTF8'; -SET standard_conforming_strings = on; -SELECT pg_catalog.set_config('search_path', '', false); -SET check_function_bodies = false; -SET xmloption = content; -SET client_min_messages = warning; -SET row_security = off; - --- --- Name: public; Type: SCHEMA; Schema: -; Owner: postgres --- - -ALTER SCHEMA public OWNER TO postgres; - -SET default_tablespace = ''; - -SET default_table_access_method = heap; - --- --- Name: jobs; Type: TABLE; Schema: public; Owner: postgres --- - -CREATE TABLE public.jobs ( - type character varying DEFAULT 'process'::character varying NOT NULL, - identifier character varying NOT NULL, - process_id character varying NOT NULL, - created timestamp without time zone, - started timestamp without time zone, - finished timestamp without time zone, - updated timestamp without time zone, - status character varying NOT NULL, - location character varying, - mimetype character varying, - message character varying, - progress integer NOT NULL -); - - -ALTER TABLE public.jobs OWNER TO postgres; - --- --- Name: jobs jobs_pkey; Type: CONSTRAINT; Schema: public; Owner: postgres --- - -ALTER TABLE ONLY public.jobs - ADD CONSTRAINT jobs_pkey PRIMARY KEY (identifier); - - --- --- Name: SCHEMA public; Type: ACL; Schema: -; Owner: postgres --- - -REVOKE USAGE ON SCHEMA public FROM PUBLIC; -GRANT ALL ON SCHEMA public TO PUBLIC; - - --- --- PostgreSQL database dump complete --- From ebd557b8e48cff1be75d7d5b2555089a27adf9c8 Mon Sep 17 00:00:00 2001 From: Benjamin Webb Date: Mon, 11 May 2026 20:12:42 -0400 Subject: [PATCH 2/2] Cache get_table_model --- pygeoapi/process/manager/postgresql.py | 15 ++------------- 1 file changed, 2 insertions(+), 13 deletions(-) diff --git a/pygeoapi/process/manager/postgresql.py b/pygeoapi/process/manager/postgresql.py index 8de7218a8..7a2adc559 100644 --- a/pygeoapi/process/manager/postgresql.py +++ b/pygeoapi/process/manager/postgresql.py @@ -29,19 +29,7 @@ # # ================================================================= -# Requires postgresql database structure. -# Create the database: -# e.g. -# CREATE DATABASE test -# WITH TEMPLATE = template0 -# ENCODING = 'UTF8' -# LOCALE = 'en_US.UTF-8'; -# ALTER DATABASE test OWNER TO postgres; -# -# Import dump: -# psql -U postgres -h 127.0.0.1 -p 5432 test < -# tests/data/postgres_manager_full_structure.backup.sql - +import functools import json import logging from pathlib import Path @@ -314,6 +302,7 @@ def __repr__(self): return f' {self.name}' +@functools.cache def get_table_model( db_search_path: tuple[str], engine: Engine, table_output: bool ) -> Any: