Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@ jobs:
gunzip < tests/data/hotosm_bdi_waterways.sql.gz | psql postgresql://postgres:${{ secrets.DatabasePassword || 'postgres' }}@localhost:5432/test
psql postgresql://postgres:${{ secrets.DatabasePassword || 'postgres' }}@localhost:5432/test -f tests/data/dummy_data.sql
psql postgresql://postgres:${{ secrets.DatabasePassword || 'postgres' }}@localhost:5432/test -f tests/data/dummy_types_data.sql
psql postgresql://postgres:${{ secrets.DatabasePassword || 'postgres' }}@localhost:5432/test -f tests/data/postgres_manager_full_structure.backup.sql
mysql -h 127.0.0.1 -P 3306 -u root -p'mysql' test_geo_app < tests/data/mysql_data.sql
docker ps
- name: run API tests ⚙️
Expand Down
136 changes: 87 additions & 49 deletions pygeoapi/process/manager/postgresql.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,37 +29,35 @@
#
# =================================================================

# Requires postgresql database structure.
# Create the database:
# e.g.
# CREATE DATABASE test
# WITH TEMPLATE = template0
# ENCODING = 'UTF8'
# LOCALE = 'en_US.UTF-8';
# ALTER DATABASE test OWNER TO postgres;
#
# Import dump:
# psql -U postgres -h 127.0.0.1 -p 5432 test <
# tests/data/postgres_manager_full_structure.backup.sql

import functools
import json
import logging
from pathlib import Path
from typing import Any, Tuple

from sqlalchemy import insert, update, delete
from sqlalchemy.orm import Session
from sqlalchemy import (
Column,
DateTime,
delete,
insert,
Integer,
LargeBinary,
String,
Table,
text,
update
)
from sqlalchemy.engine import Engine
from sqlalchemy.orm import declarative_base, Session

from pygeoapi.formats import F_JSON, F_JSONLD, FORMAT_TYPES
from pygeoapi.process.base import (
JobNotFoundError,
JobResultNotFoundError,
ProcessorGenericError
)
from pygeoapi.formats import FORMAT_TYPES, F_JSON, F_JSONLD
from pygeoapi.process.manager.base import BaseManager
from pygeoapi.provider.sql import (
get_engine, get_table_model, store_db_parameters
)
from pygeoapi.provider.sql import get_engine, store_db_parameters
from pygeoapi.util import JobStatus


Expand All @@ -70,6 +68,7 @@ class PostgreSQLManager(BaseManager):
"""PostgreSQL Manager"""

default_port = 5432
_store_db_parameters = store_db_parameters

def __init__(self, manager_def: dict):
"""
Expand All @@ -87,7 +86,7 @@ def __init__(self, manager_def: dict):
self.connection = manager_def['connection']

options = manager_def.get('options', {})
store_db_parameters(self, manager_def['connection'], options)
self._store_db_parameters(manager_def['connection'], options)
self._engine = get_engine(
'postgresql+psycopg2',
self.db_host,
Expand All @@ -98,22 +97,23 @@ def __init__(self, manager_def: dict):
self.db_conn,
**self.db_options
)
self.table_output = self.output_dir is None

self.table_model = get_table_model(
self.db_search_path, self._engine, self.table_output
)
self.c = self.table_model.c
try:
LOGGER.debug('Getting table model')
self.table_model = get_table_model(
'jobs',
self.id_field,
self.db_search_path,
self._engine
)

except Exception as err:
msg = 'Table model fetch failed'
LOGGER.error(f'{msg}: {err}')
raise ProcessorGenericError(msg)

def get_jobs(self, status: JobStatus = None, limit=None, offset=None
) -> dict:
def get_jobs(
self, status: JobStatus = None, limit=None, offset=None
) -> dict:
"""
Get jobs

Expand All @@ -129,15 +129,12 @@ def get_jobs(self, status: JobStatus = None, limit=None, offset=None
LOGGER.debug('Querying for jobs')
with Session(self._engine) as session:
results = session.query(self.table_model)

if status is not None:
column = getattr(self.table_model, 'status')
results = results.filter(column == status.value)
results = results.filter(self.c.status == status.value)

jobs = [r.__dict__ for r in results.all()]
return {
'jobs': jobs,
'numberMatched': len(jobs)
}
jobs = [r._asdict() for r in results.all()]
return {'jobs': jobs, 'numberMatched': len(jobs)}

def add_job(self, job_metadata: dict) -> str:
"""
Expand All @@ -151,8 +148,9 @@ def add_job(self, job_metadata: dict) -> str:
LOGGER.debug('Adding job')
with Session(self._engine) as session:
try:
session.execute(insert(self.table_model)
.values(**job_metadata))
session.execute(
insert(self.table_model).values(**job_metadata)
)
session.commit()
except Exception as err:
session.rollback()
Expand All @@ -177,10 +175,9 @@ def update_job(self, job_id: str, update_dict: dict) -> bool:
LOGGER.debug('Updating job')
with Session(self._engine) as session:
try:
column = getattr(self.table_model, self.id_field)
stmt = (
update(self.table_model)
.where(column == job_id)
.where(self.c.identifier == job_id)
.values(**update_dict)
)
result = session.execute(stmt)
Expand All @@ -207,14 +204,14 @@ def get_job(self, job_id: str) -> dict:

LOGGER.debug('Querying for job')
with Session(self._engine) as session:
results = session.query(self.table_model)
column = getattr(self.table_model, self.id_field)
results = session.query(self.table_model).filter(column == job_id)
results = session.query(self.table_model).filter(
self.c.identifier == job_id
)

first = results.first()

if first is not None:
return first.__dict__
return first._asdict()
else:
raise JobNotFoundError()

Expand All @@ -238,10 +235,8 @@ def delete_job(self, job_id: str) -> bool:
LOGGER.debug('Deleting job')
with Session(self._engine) as session:
try:
column = getattr(self.table_model, self.id_field)
stmt = (
delete(self.table_model)
.where(column == job_id)
stmt = delete(self.table_model).where(
self.c.identifier == job_id
)
result = session.execute(stmt)
session.commit()
Expand Down Expand Up @@ -288,8 +283,11 @@ def get_job_result(self, job_id: str) -> Tuple[str, Any]:
else:
try:
location = Path(location)
if mimetype in (None, FORMAT_TYPES[F_JSON],
FORMAT_TYPES[F_JSONLD]):
if mimetype in (
None,
FORMAT_TYPES[F_JSON],
FORMAT_TYPES[F_JSONLD]
):
with location.open('r', encoding='utf-8') as fh:
result = json.load(fh)
else:
Expand All @@ -302,3 +300,43 @@ def get_job_result(self, job_id: str) -> Tuple[str, Any]:

def __repr__(self):
return f'<PostgreSQLManager> {self.name}'


@functools.cache
def get_table_model(
db_search_path: tuple[str], engine: Engine, table_output: bool
) -> Any:
"""Define SQLAlchemy job model"""

Base = declarative_base()
schema = db_search_path[0]

Jobs = Table(
'jobs',
Base.metadata,
Column('identifier', String, primary_key=True, nullable=False),
Column(
'type',
String,
nullable=False,
server_default=text("'process'::character varying")
),
Column('process_id', String, nullable=False),
Column('created', DateTime),
Column('started', DateTime),
Column('finished', DateTime),
Column('updated', DateTime),
Column('status', String, nullable=False),
Column('location', String),
Column('mimetype', String),
Column('message', String),
Column('progress', Integer, nullable=False),
schema=schema
)

if table_output:
Jobs.append_column(Column('output', LargeBinary))

Base.metadata.create_all(engine, tables=[Jobs], checkfirst=True)

return Jobs
69 changes: 0 additions & 69 deletions tests/data/postgres_manager_full_structure.backup.sql

This file was deleted.

Loading