diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 6a9e4c1c8..c6bfeb83b 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -139,7 +139,6 @@ jobs: gunzip < tests/data/hotosm_bdi_waterways.sql.gz | psql postgresql://postgres:${{ secrets.DatabasePassword || 'postgres' }}@localhost:5432/test psql postgresql://postgres:${{ secrets.DatabasePassword || 'postgres' }}@localhost:5432/test -f tests/data/dummy_data.sql psql postgresql://postgres:${{ secrets.DatabasePassword || 'postgres' }}@localhost:5432/test -f tests/data/dummy_types_data.sql - psql postgresql://postgres:${{ secrets.DatabasePassword || 'postgres' }}@localhost:5432/test -f tests/data/postgres_manager_full_structure.backup.sql mysql -h 127.0.0.1 -P 3306 -u root -p'mysql' test_geo_app < tests/data/mysql_data.sql docker ps - name: run API tests ⚙️ diff --git a/pygeoapi/process/manager/postgresql.py b/pygeoapi/process/manager/postgresql.py index 05dc408ee..7a2adc559 100644 --- a/pygeoapi/process/manager/postgresql.py +++ b/pygeoapi/process/manager/postgresql.py @@ -29,37 +29,35 @@ # # ================================================================= -# Requires postgresql database structure. -# Create the database: -# e.g. -# CREATE DATABASE test -# WITH TEMPLATE = template0 -# ENCODING = 'UTF8' -# LOCALE = 'en_US.UTF-8'; -# ALTER DATABASE test OWNER TO postgres; -# -# Import dump: -# psql -U postgres -h 127.0.0.1 -p 5432 test < -# tests/data/postgres_manager_full_structure.backup.sql - +import functools import json import logging from pathlib import Path from typing import Any, Tuple -from sqlalchemy import insert, update, delete -from sqlalchemy.orm import Session +from sqlalchemy import ( + Column, + DateTime, + delete, + insert, + Integer, + LargeBinary, + String, + Table, + text, + update +) +from sqlalchemy.engine import Engine +from sqlalchemy.orm import declarative_base, Session +from pygeoapi.formats import F_JSON, F_JSONLD, FORMAT_TYPES from pygeoapi.process.base import ( JobNotFoundError, JobResultNotFoundError, ProcessorGenericError ) -from pygeoapi.formats import FORMAT_TYPES, F_JSON, F_JSONLD from pygeoapi.process.manager.base import BaseManager -from pygeoapi.provider.sql import ( - get_engine, get_table_model, store_db_parameters -) +from pygeoapi.provider.sql import get_engine, store_db_parameters from pygeoapi.util import JobStatus @@ -70,6 +68,7 @@ class PostgreSQLManager(BaseManager): """PostgreSQL Manager""" default_port = 5432 + _store_db_parameters = store_db_parameters def __init__(self, manager_def: dict): """ @@ -87,7 +86,7 @@ def __init__(self, manager_def: dict): self.connection = manager_def['connection'] options = manager_def.get('options', {}) - store_db_parameters(self, manager_def['connection'], options) + self._store_db_parameters(manager_def['connection'], options) self._engine = get_engine( 'postgresql+psycopg2', self.db_host, @@ -98,22 +97,23 @@ def __init__(self, manager_def: dict): self.db_conn, **self.db_options ) + self.table_output = self.output_dir is None + self.table_model = get_table_model( + self.db_search_path, self._engine, self.table_output + ) + self.c = self.table_model.c try: LOGGER.debug('Getting table model') - self.table_model = get_table_model( - 'jobs', - self.id_field, - self.db_search_path, - self._engine - ) + except Exception as err: msg = 'Table model fetch failed' LOGGER.error(f'{msg}: {err}') raise ProcessorGenericError(msg) - def get_jobs(self, status: JobStatus = None, limit=None, offset=None - ) -> dict: + def get_jobs( + self, status: JobStatus = None, limit=None, offset=None + ) -> dict: """ Get jobs @@ -129,15 +129,12 @@ def get_jobs(self, status: JobStatus = None, limit=None, offset=None LOGGER.debug('Querying for jobs') with Session(self._engine) as session: results = session.query(self.table_model) + if status is not None: - column = getattr(self.table_model, 'status') - results = results.filter(column == status.value) + results = results.filter(self.c.status == status.value) - jobs = [r.__dict__ for r in results.all()] - return { - 'jobs': jobs, - 'numberMatched': len(jobs) - } + jobs = [r._asdict() for r in results.all()] + return {'jobs': jobs, 'numberMatched': len(jobs)} def add_job(self, job_metadata: dict) -> str: """ @@ -151,8 +148,9 @@ def add_job(self, job_metadata: dict) -> str: LOGGER.debug('Adding job') with Session(self._engine) as session: try: - session.execute(insert(self.table_model) - .values(**job_metadata)) + session.execute( + insert(self.table_model).values(**job_metadata) + ) session.commit() except Exception as err: session.rollback() @@ -177,10 +175,9 @@ def update_job(self, job_id: str, update_dict: dict) -> bool: LOGGER.debug('Updating job') with Session(self._engine) as session: try: - column = getattr(self.table_model, self.id_field) stmt = ( update(self.table_model) - .where(column == job_id) + .where(self.c.identifier == job_id) .values(**update_dict) ) result = session.execute(stmt) @@ -207,14 +204,14 @@ def get_job(self, job_id: str) -> dict: LOGGER.debug('Querying for job') with Session(self._engine) as session: - results = session.query(self.table_model) - column = getattr(self.table_model, self.id_field) - results = session.query(self.table_model).filter(column == job_id) + results = session.query(self.table_model).filter( + self.c.identifier == job_id + ) first = results.first() if first is not None: - return first.__dict__ + return first._asdict() else: raise JobNotFoundError() @@ -238,10 +235,8 @@ def delete_job(self, job_id: str) -> bool: LOGGER.debug('Deleting job') with Session(self._engine) as session: try: - column = getattr(self.table_model, self.id_field) - stmt = ( - delete(self.table_model) - .where(column == job_id) + stmt = delete(self.table_model).where( + self.c.identifier == job_id ) result = session.execute(stmt) session.commit() @@ -288,8 +283,11 @@ def get_job_result(self, job_id: str) -> Tuple[str, Any]: else: try: location = Path(location) - if mimetype in (None, FORMAT_TYPES[F_JSON], - FORMAT_TYPES[F_JSONLD]): + if mimetype in ( + None, + FORMAT_TYPES[F_JSON], + FORMAT_TYPES[F_JSONLD] + ): with location.open('r', encoding='utf-8') as fh: result = json.load(fh) else: @@ -302,3 +300,43 @@ def get_job_result(self, job_id: str) -> Tuple[str, Any]: def __repr__(self): return f' {self.name}' + + +@functools.cache +def get_table_model( + db_search_path: tuple[str], engine: Engine, table_output: bool +) -> Any: + """Define SQLAlchemy job model""" + + Base = declarative_base() + schema = db_search_path[0] + + Jobs = Table( + 'jobs', + Base.metadata, + Column('identifier', String, primary_key=True, nullable=False), + Column( + 'type', + String, + nullable=False, + server_default=text("'process'::character varying") + ), + Column('process_id', String, nullable=False), + Column('created', DateTime), + Column('started', DateTime), + Column('finished', DateTime), + Column('updated', DateTime), + Column('status', String, nullable=False), + Column('location', String), + Column('mimetype', String), + Column('message', String), + Column('progress', Integer, nullable=False), + schema=schema + ) + + if table_output: + Jobs.append_column(Column('output', LargeBinary)) + + Base.metadata.create_all(engine, tables=[Jobs], checkfirst=True) + + return Jobs diff --git a/tests/data/postgres_manager_full_structure.backup.sql b/tests/data/postgres_manager_full_structure.backup.sql deleted file mode 100644 index 220737c63..000000000 --- a/tests/data/postgres_manager_full_structure.backup.sql +++ /dev/null @@ -1,69 +0,0 @@ --- --- PostgreSQL database dump --- - --- Dumped from database version 14.12 (Ubuntu 14.12-1.pgdg20.04+1) --- Dumped by pg_dump version 16.3 (Ubuntu 16.3-1.pgdg20.04+1) - -SET statement_timeout = 0; -SET lock_timeout = 0; -SET idle_in_transaction_session_timeout = 0; -SET client_encoding = 'UTF8'; -SET standard_conforming_strings = on; -SELECT pg_catalog.set_config('search_path', '', false); -SET check_function_bodies = false; -SET xmloption = content; -SET client_min_messages = warning; -SET row_security = off; - --- --- Name: public; Type: SCHEMA; Schema: -; Owner: postgres --- - -ALTER SCHEMA public OWNER TO postgres; - -SET default_tablespace = ''; - -SET default_table_access_method = heap; - --- --- Name: jobs; Type: TABLE; Schema: public; Owner: postgres --- - -CREATE TABLE public.jobs ( - type character varying DEFAULT 'process'::character varying NOT NULL, - identifier character varying NOT NULL, - process_id character varying NOT NULL, - created timestamp without time zone, - started timestamp without time zone, - finished timestamp without time zone, - updated timestamp without time zone, - status character varying NOT NULL, - location character varying, - mimetype character varying, - message character varying, - progress integer NOT NULL -); - - -ALTER TABLE public.jobs OWNER TO postgres; - --- --- Name: jobs jobs_pkey; Type: CONSTRAINT; Schema: public; Owner: postgres --- - -ALTER TABLE ONLY public.jobs - ADD CONSTRAINT jobs_pkey PRIMARY KEY (identifier); - - --- --- Name: SCHEMA public; Type: ACL; Schema: -; Owner: postgres --- - -REVOKE USAGE ON SCHEMA public FROM PUBLIC; -GRANT ALL ON SCHEMA public TO PUBLIC; - - --- --- PostgreSQL database dump complete ---