from functools import wraps
from inspect import signature
import geopandas as gpd
from geopandas.io.sql import _get_srid_from_crs
from shapely import wkb
import pandas as pd
from geoalchemy2 import Geometry
from sqlalchemy import create_engine
from sqlalchemy.types import JSON
import trackintel as ti
from trackintel.io.util import _index_warning_default_none
from trackintel.model.util import doc, _shared_docs
def _handle_con_string(func):
"""Decorator function to create a `Connection` out of a connection string."""
@wraps(func) # copy all metadata
def wrapper(*args, **kwargs):
# bind to name for easy access of both kwargs and args
bound_values = signature(func).bind(*args, **kwargs)
con = bound_values.arguments["con"]
# only do something if connection string
if not isinstance(con, str):
return func(*args, **kwargs)
engine = create_engine(con)
con = engine.connect()
# overwrite con argument with open connection
bound_values.arguments["con"] = con
args = bound_values.args
kwargs = bound_values.kwargs
try:
result = func(*args, **kwargs)
finally:
con.close()
return result
return wrapper
[docs]
@_index_warning_default_none
@_handle_con_string
def read_positionfixes_postgis(
sql,
con,
geom_col="geom",
crs=None,
index_col=None,
coerce_float=True,
parse_dates=None,
params=None,
chunksize=None,
read_gpd_kws=None,
):
"""Reads positionfixes from a PostGIS database.
Parameters
----------
sql : str
SQL query e.g. "SELECT * FROM positionfixes"
con : sqlalchemy.engine.Connection or sqlalchemy.engine.Engine
active connection to PostGIS database.
geom_col : str, default 'geom'
The geometry column of the table.
crs : optional
Coordinate reference system to use for the returned GeoDataFrame
index_col : string or list of strings, optional, default: None
Column(s) to set as index(MultiIndex)
coerce_float : boolean, default True
Attempt to convert values of non-string, non-numeric objects (like
decimal.Decimal) to floating point, useful for SQL result sets
parse_dates : list or dict, default None
- List of column names to parse as dates.
- Dict of ``{column_name: format string}`` where format string is
strftime compatible in case of parsing string times, or is one of
(D, s, ns, ms, us) in case of parsing integer timestamps.
- Dict of ``{column_name: arg dict}`` where the arg dict corresponds
to the keyword arguments of :func:`pandas.to_datetime`. Especially
useful with databases without native Datetime support, such as SQLite.
params : list, tuple or dict, optional, default None
List of parameters to pass to execute method.
chunksize : int, default None
If specified, return an iterator where chunksize is the number
of rows to include in each chunk.
read_gpd_kws : dict, default None
Further keyword arguments as available in trackintels trackintel.io.read_positionfixes_gpd().
Especially useful to rename column names from the SQL table to trackintel conform column names.
See second example how to use it in code.
Returns
-------
GeoDataFrame
A GeoDataFrame containing the positionfixes.
Examples
--------
>>> pfs = ti.io.read_positionfixes_postgis("SELECT * FROM positionfixes", con, geom_col="geom")
>>> pfs = ti.io.read_positionfixes_postgis("SELECT * FROM positionfixes", con, geom_col="geom",
... index_col="id",
read_gpd_kws={"user_id"="USER", "tracked_at": "time"})
"""
pfs = gpd.GeoDataFrame.from_postgis(
sql,
con,
geom_col=geom_col,
crs=crs,
index_col=index_col,
coerce_float=coerce_float,
parse_dates=parse_dates,
params=params,
chunksize=chunksize,
)
return ti.io.read_positionfixes_gpd(pfs, **(read_gpd_kws or {}))
[docs]
@doc(
_shared_docs["write_postgis"],
first_arg="\npositionfixes : Positionfixes\n The positionfixes to store to the database.\n",
long="positionfixes",
short="pfs",
)
@_handle_con_string
def write_positionfixes_postgis(
positionfixes, name, con, schema=None, if_exists="fail", index=True, index_label=None, chunksize=None, dtype=None
):
gpd.GeoDataFrame.to_postgis(positionfixes, name, con, schema, if_exists, index, index_label, chunksize, dtype)
[docs]
@_index_warning_default_none
@_handle_con_string
def read_triplegs_postgis(
sql,
con,
geom_col="geom",
crs=None,
index_col=None,
coerce_float=True,
parse_dates=None,
params=None,
chunksize=None,
read_gpd_kws=None,
):
"""Reads triplegs from a PostGIS database.
Parameters
----------
sql : str
SQL query e.g. "SELECT * FROM triplegs"
con : sqlalchemy.engine.Connection or sqlalchemy.engine.Engine
active connection to PostGIS database.
geom_col : str, default 'geom'
The geometry column of the table.
crs : optional
Coordinate reference system to use for the returned GeoDataFrame
index_col : string or list of strings, optional, default: None
Column(s) to set as index(MultiIndex)
coerce_float : boolean, default True
Attempt to convert values of non-string, non-numeric objects (like
decimal.Decimal) to floating point, useful for SQL result sets
parse_dates : list or dict, default None
- List of column names to parse as dates.
- Dict of ``{column_name: format string}`` where format string is
strftime compatible in case of parsing string times, or is one of
(D, s, ns, ms, us) in case of parsing integer timestamps.
- Dict of ``{column_name: arg dict}``, where the arg dict corresponds
to the keyword arguments of :func:`pandas.to_datetime`. Especially
useful with databases without native Datetime support, such as SQLite.
params : list, tuple or dict, optional, default None
List of parameters to pass to execute method.
chunksize : int, default None
If specified, return an iterator where chunksize is the number
of rows to include in each chunk.
read_gpd_kws : dict, default None
Further keyword arguments as available in trackintels trackintel.io.read_triplegs_gpd().
Especially useful to rename column names from the SQL table to trackintel conform column names.
See second example how to use it in code.
Returns
-------
GeoDataFrame
A GeoDataFrame containing the triplegs.
Examples
--------
>>> tpls = ti.io.read_triplegs_postgis("SELECT * FROM triplegs", con, geom_col="geom")
>>> tpls = ti.io.read_triplegs_postgis("SELECT * FROM triplegs", con, geom_col="geom", index_col="id",
... read_gpd_kws={"user_id": "USER"})
"""
tpls = gpd.GeoDataFrame.from_postgis(
sql,
con,
geom_col=geom_col,
crs=crs,
index_col=index_col,
coerce_float=coerce_float,
parse_dates=parse_dates,
params=params,
chunksize=chunksize,
)
return ti.io.read_triplegs_gpd(tpls, **(read_gpd_kws or {}))
[docs]
@doc(
_shared_docs["write_postgis"],
first_arg="\ntriplegs : Triplegs\n The triplegs to store to the database.\n",
long="triplegs",
short="tpls",
)
@_handle_con_string
def write_triplegs_postgis(
triplegs, name, con, schema=None, if_exists="fail", index=True, index_label=None, chunksize=None, dtype=None
):
gpd.GeoDataFrame.to_postgis(
triplegs,
name,
con,
schema=schema,
if_exists=if_exists,
index=index,
index_label=index_label,
chunksize=chunksize,
dtype=dtype,
)
[docs]
@_index_warning_default_none
@_handle_con_string
def read_staypoints_postgis(
sql,
con,
geom_col="geom",
crs=None,
index_col=None,
coerce_float=True,
parse_dates=None,
params=None,
chunksize=None,
read_gpd_kws=None,
):
"""Read staypoints from a PostGIS database.
Parameters
----------
sql : str
SQL query e.g. "SELECT * FROM staypoints"
con : sqlalchemy.engine.Connection or sqlalchemy.engine.Engine
active connection to PostGIS database.
geom_col : str, default 'geom'
The geometry column of the table.
crs : optional
Coordinate reference system to use for the returned GeoDataFrame
index_col : string or list of strings, optional, default: None
Column(s) to set as index(MultiIndex)
coerce_float : boolean, default True
Attempt to convert values of non-string, non-numeric objects (like
decimal.Decimal) to floating point, useful for SQL result sets
parse_dates : list or dict, default None
- List of column names to parse as dates.
- Dict of ``{column_name: format string}`` where format string is
strftime compatible in case of parsing string times, or is one of
(D, s, ns, ms, us) in case of parsing integer timestamps.
- Dict of ``{column_name: arg dict}``, where the arg dict corresponds
to the keyword arguments of :func:`pandas.to_datetime`. Especially
useful with databases without native Datetime support, such as SQLite.
params : list, tuple or dict, optional, default None
List of parameters to pass to execute method.
chunksize : int, default None
If specified, return an iterator where chunksize is the number
of rows to include in each chunk.
read_gpd_kws : dict, default None
Further keyword arguments as available in trackintels trackintel.io.read_staypoints_gpd().
Especially useful to rename column names from the SQL table to trackintel conform column names.
See second example how to use it in code.
Returns
-------
GeoDataFrame
A GeoDataFrame containing the staypoints.
Examples
--------
>>> sp = ti.io.read_staypoints_postgis("SELECT * FROM staypoints", con, geom_col="geom")
>>> sp = ti.io.read_staypoints_postgis("SELECT * FROM staypoints", con, geom_col="geom", index_col="id",
... read_gpd_kws={"user_id": "USER"})
"""
sp = gpd.GeoDataFrame.from_postgis(
sql,
con,
geom_col=geom_col,
crs=crs,
index_col=index_col,
coerce_float=coerce_float,
parse_dates=parse_dates,
params=params,
chunksize=chunksize,
)
return ti.io.read_staypoints_gpd(sp, **(read_gpd_kws or {}))
[docs]
@doc(
_shared_docs["write_postgis"],
first_arg="\nstaypoints : Staypoints\n The staypoints to store to the database.\n",
long="staypoints",
short="sp",
)
@_handle_con_string
def write_staypoints_postgis(
staypoints, name, con, schema=None, if_exists="fail", index=True, index_label=None, chunksize=None, dtype=None
):
gpd.GeoDataFrame.to_postgis(
staypoints,
name,
con,
schema=schema,
if_exists=if_exists,
index=index,
index_label=index_label,
chunksize=chunksize,
dtype=dtype,
)
[docs]
@_index_warning_default_none
@_handle_con_string
def read_locations_postgis(
sql,
con,
center="center",
crs=None,
index_col=None,
coerce_float=True,
parse_dates=None,
params=None,
chunksize=None,
extent=None,
read_gpd_kws=None,
):
"""Reads locations from a PostGIS database.
Parameters
----------
sql : str
SQL query e.g. "SELECT * FROM locations"
con : sqlalchemy.engine.Connection or sqlalchemy.engine.Engine
active connection to PostGIS database.
center : str, default 'center'
The geometry column of the table. For the center of the location.
crs : optional
Coordinate reference system to use for the returned GeoDataFrame
index_col : string or list of strings, optional, default: None
Column(s) to set as index(MultiIndex)
coerce_float : boolean, default True
Attempt to convert values of non-string, non-numeric objects (like
decimal.Decimal) to floating point, useful for SQL result sets
parse_dates : list or dict, default None
- List of column names to parse as dates.
- Dict of ``{column_name: format string}`` where format string is
strftime compatible in case of parsing string times, or is one of
(D, s, ns, ms, us) in case of parsing integer timestamps.
- Dict of ``{column_name: arg dict}``, where the arg dict corresponds
to the keyword arguments of :func:`pandas.to_datetime`. Especially
useful with databases without native Datetime support, such as SQLite.
params : list, tuple or dict, optional, default None
List of parameters to pass to execute method.
chunksize : int, default None
If specified, return an iterator where chunksize is the number
of rows to include in each chunk.
extent : string, default None
If specified read the extent column as geometry column.
read_gpd_kws : dict, default None
Further keyword arguments as available in trackintels trackintel.io.read_locations_gpd().
Especially useful to rename column names from the SQL table to trackintel conform column names.
See second example how to use it in code.
Returns
-------
GeoDataFrame
A GeoDataFrame containing the locations.
Examples
--------
>>> locs = ti.io.read_locations_postgis("SELECT * FROM locations", con, center="center")
>>> locs = ti.io.read_locations_postgis("SELECT * FROM locations", con, center="geom", index_col="id",
... extent="extent, read_gpd_kws={"user_id": "USER"})
)
"""
locs = gpd.GeoDataFrame.from_postgis(
sql,
con,
geom_col=center,
crs=crs,
index_col=index_col,
coerce_float=coerce_float,
parse_dates=parse_dates,
params=params,
chunksize=chunksize,
)
if extent is not None:
locs[extent] = gpd.GeoSeries.from_wkb(locs[extent])
return ti.io.read_locations_gpd(locs, center=center, **(read_gpd_kws or {}))
[docs]
@doc(
_shared_docs["write_postgis"],
first_arg="\nlocations : Locations\n The locations to store to the database.\n",
long="locations",
short="locs",
)
@_handle_con_string
def write_locations_postgis(
locations, name, con, schema=None, if_exists="fail", index=True, index_label=None, chunksize=None, dtype=None
):
# Assums that "extent" is not geometry column but center is.
# May build additional check for that.
if "extent" in locations.columns:
# geopandas.to_postgis can only handle one geometry column -> do it manually
srid = _get_srid_from_crs(locations)
extent_schema = Geometry("POLYGON", srid)
if dtype is None:
dtype = {"extent": extent_schema}
else:
dtype["extent"] = extent_schema
locations = locations.copy()
locations["extent"] = locations["extent"].apply(lambda x: wkb.dumps(x, srid=srid, hex=True))
gpd.GeoDataFrame.to_postgis(
locations,
name,
con,
schema=schema,
if_exists=if_exists,
index=index,
index_label=index_label,
chunksize=chunksize,
dtype=dtype,
)
[docs]
@_index_warning_default_none
@_handle_con_string
def read_trips_postgis(
sql,
con,
geom_col=None,
crs=None,
index_col=None,
coerce_float=True,
parse_dates=None,
params=None,
chunksize=None,
read_gpd_kws=None,
):
"""Read trips from a PostGIS database.
Parameters
----------
sql : str
SQL query e.g. "SELECT * FROM trips"
con : sqlalchemy.engine.Connection or sqlalchemy.engine.Engine
active connection to PostGIS database.
geom_col : str, optional
The geometry column of the table (if exists). Start and endpoint of the trip.
crs : optional
Coordinate reference system if table has geometry.
index_col : string or list of strings, optional, default: None
Column(s) to set as index(MultiIndex)
coerce_float : boolean, default True
Attempt to convert values of non-string, non-numeric objects (like
decimal.Decimal) to floating point, useful for SQL result sets
parse_dates : list or dict, default None
- List of column names to parse as dates.
- Dict of ``{column_name: format string}`` where format string is
strftime compatible in case of parsing string times, or is one of
(D, s, ns, ms, us) in case of parsing integer timestamps.
- Dict of ``{column_name: arg dict}``, where the arg dict corresponds
to the keyword arguments of :func:`pandas.to_datetime`. Especially
useful with databases without native Datetime support, such as SQLite.
params : list, tuple or dict, optional, default None
List of parameters to pass to execute method.
chunksize : int, default None
If specified, return an iterator where chunksize is the number
of rows to include in each chunk.
read_gpd_kws : dict, default None
Further keyword arguments as available in trackintels trackintel.io.read_trips_gpd().
Especially useful to rename column names from the SQL table to trackintel conform column names.
See second example how to use it in code.
Returns
-------
GeoDataFrame
A GeoDataFrame containing the trips.
Examples
--------
>>> trips = ti.io.read_trips_postgis("SELECT * FROM trips", con)
>>> trips = ti.io.read_trips_postgis("SELECT * FROM trips", con, geom_col="geom", index_col="id",
... read_gpd_kws={"user_id": "USER", "origin_staypoint_id": "ORIGIN",
"destination_staypoint_id": "DEST"})
"""
if geom_col is None:
trips = pd.read_sql(
sql,
con,
index_col=index_col,
coerce_float=coerce_float,
params=params,
parse_dates=parse_dates,
chunksize=chunksize,
)
else:
trips = gpd.GeoDataFrame.from_postgis(
sql,
con,
geom_col=geom_col,
crs=crs,
index_col=index_col,
coerce_float=coerce_float,
parse_dates=parse_dates,
params=params,
chunksize=chunksize,
)
return ti.io.read_trips_gpd(trips, **(read_gpd_kws or {}))
[docs]
@doc(
_shared_docs["write_postgis"],
first_arg="\ntrips : Trips\n The trips to store to the database.\n",
long="trips",
short="trips",
)
@_handle_con_string
def write_trips_postgis(
trips, name, con, schema=None, if_exists="fail", index=True, index_label=None, chunksize=None, dtype=None
):
if isinstance(trips, gpd.GeoDataFrame):
gpd.GeoDataFrame.to_postgis(
trips,
name,
con,
schema=schema,
if_exists=if_exists,
index=index,
index_label=index_label,
chunksize=chunksize,
dtype=dtype,
)
else: # is DataFrame
trips.to_sql(
name,
con,
schema=schema,
if_exists=if_exists,
index=index,
index_label=index_label,
chunksize=chunksize,
dtype=dtype,
)
[docs]
@_index_warning_default_none
@_handle_con_string
def read_tours_postgis(
sql,
con,
geom_col=None,
crs=None,
index_col=None,
coerce_float=True,
parse_dates=None,
params=None,
chunksize=None,
read_gpd_kws=None,
):
"""Read tours from a PostGIS database.
Parameters
----------
sql : str
SQL query e.g. "SELECT * FROM tours"
con : sqlalchemy.engine.Connection or sqlalchemy.engine.Engine
Active connection to PostGIS database.
geom_col : str, optional
The geometry column of the table (if exists).
crs : optional
Coordinate reference system if table has geometry.
index_col : string or list of strings, optional
Column(s) to set as index(MultiIndex)
coerce_float : boolean, default True
Attempt to convert values of non-string, non-numeric objects (like
decimal.Decimal) to floating point, useful for SQL result sets
parse_dates : list or dict, default None
- List of column names to parse as dates.
- Dict of ``{column_name: format string}`` where format string is
strftime compatible in case of parsing string times, or is one of
(D, s, ns, ms, us) in case of parsing integer timestamps.
- Dict of ``{column_name: arg dict}``, where the arg dict corresponds
to the keyword arguments of :func:`pandas.to_datetime`. Especially
useful with databases without native Datetime support, such as SQLite.
params : list, tuple or dict, optional, default None
List of parameters to pass to execute method.
chunksize : int, default None
If specified, return an iterator where chunksize is the number
of rows to include in each chunk.
read_gpd_kws : dict, default None
Further keyword arguments as available in trackintels trackintel.io.read_tours_gpd().
Especially useful to rename column names from the SQL table to trackintel conform column names.
See second example how to use it in code.
Returns
-------
Tours
Examples
--------
>>> tours = ti.io.read_tours_postgis("SELECT * FROM tours", con)
>>> tours = ti.io.read_tours_postgis("SELECT * FROM tours", con, index_col="id",
read_gpd_kws={"user_id": "USER"})
"""
if geom_col is None:
tours = pd.read_sql(
sql,
con,
index_col=index_col,
coerce_float=coerce_float,
params=params,
parse_dates=parse_dates,
chunksize=chunksize,
)
else:
tours = gpd.GeoDataFrame.from_postgis(
sql,
con,
geom_col=geom_col,
crs=crs,
index_col=index_col,
coerce_float=coerce_float,
parse_dates=parse_dates,
params=params,
chunksize=chunksize,
)
return ti.io.read_tours_gpd(tours, **(read_gpd_kws or {}))
[docs]
@doc(
_shared_docs["write_postgis"],
first_arg="\ntours : Tours\n The tours to store to the database.\n",
long="tours",
short="tours",
)
@_handle_con_string
def write_tours_postgis(
tours, name, con, schema=None, if_exists="fail", index=True, index_label=None, chunksize=None, dtype=None
):
if "trips" in tours.columns:
dtype = dtype or {}
dtype.setdefault("trips", JSON)
tours.to_sql(
name,
con,
schema=schema,
if_exists=if_exists,
index=index,
index_label=index_label,
chunksize=chunksize,
dtype=dtype,
)