Source code for trackintel.preprocessing.staypoints

import numpy as np
import geopandas as gpd
import pandas as pd
from sklearn.cluster import DBSCAN
import warnings

from trackintel import Staypoints, Locations
from trackintel.geogr import meters_to_decimal_degrees, check_gdf_planar
from trackintel.preprocessing.util import applyParallel, angle_centroid_multipoints


[docs] def generate_locations( staypoints, method="dbscan", epsilon=100, num_samples=1, distance_metric="haversine", agg_level="user", activities_only=False, print_progress=False, n_jobs=1, ): """ Generate locations from the staypoints. Parameters ---------- staypoints : Staypoints method : {'dbscan'} Method to create locations. - `dbscan` : Uses the DBSCAN algorithm to cluster staypoints. epsilon : float, default 100 The epsilon for the 'dbscan' method. if 'distance_metric' is 'haversine' or 'euclidean', the unit is in meters. num_samples : int, default 1 The minimal number of samples in a cluster. distance_metric: {'haversine', 'euclidean'} The distance metric used by the applied method. Any mentioned below are possible: https://scikit-learn.org/stable/modules/generated/sklearn.metrics.pairwise_distances.html agg_level: {'user','dataset'} The level of aggregation when generating locations: - `user`: locations are generated independently per-user. - `dataset`: shared locations are generated for all users. activities_only: bool, default False (requires "activity" column) Flag to set if locations should be generated only from staypoints on which the value for "activity" is True. Useful if activites represent more significant places. print_progress : bool, default False If print_progress is True, the progress bar is displayed n_jobs: int, default 1 The maximum number of concurrently running jobs. If -1 all CPUs are used. If 1 is given, no parallel computing code is used at all, which is useful for debugging. See https://joblib.readthedocs.io/en/latest/parallel.html#parallel-reference-documentation for a detailed description Returns ------- sp: Staypoints The original staypoints with a new column ``[`location_id`]``. locs: Locations The generated locations, with geometry columns ``[`center` (default geometry), `extent`]``. Depending on the contained staypoints, `center` is their centroid, and `extent` is their convex hull with a buffer distance of `epsilon`. Examples -------- >>> sp.generate_locations(method='dbscan', epsilon=100, num_samples=1) """ Staypoints.validate(staypoints) if agg_level not in ["user", "dataset"]: raise ValueError(f"agg_level '{agg_level}' is unknown. Supported values are ['user', 'dataset'].") if method not in ["dbscan"]: raise ValueError(f"method '{method}' is unknown. Supported value is ['dbscan'].") # initialize the return GeoDataFrames sp = staypoints.copy() non_activities = None if activities_only: if "activity" not in sp.columns: raise KeyError('staypoints must contain column "activity" if "activities_only" flag is set.') non_activities = sp[~sp["activity"]] sp = sp[sp["activity"]] sp = sp.sort_values(["user_id", "started_at"]) geo_col = sp.geometry.name if method == "dbscan": eps = epsilon / 6371000 if distance_metric == "haversine" else epsilon # scikit haversine_distance wants radian. (We assume that this is good enough) # https://scikit-learn.org/stable/modules/generated/sklearn.metrics.pairwise.haversine_distances.html db = DBSCAN(eps=eps, min_samples=num_samples, algorithm="ball_tree", metric=distance_metric) if agg_level == "user": sp = applyParallel( sp.groupby("user_id", as_index=False), _gen_locs_dbscan, n_jobs=n_jobs, print_progress=print_progress, distance_metric=distance_metric, db=db, ) # keeping track of noise labels sp_non_noise_labels = sp[sp["location_id"] != -1] sp_noise_labels = sp[sp["location_id"] == -1] # sort so that the last location id of a user = max(location id) sp_non_noise_labels = sp_non_noise_labels.sort_values(["user_id", "location_id"]) # identify start positions of new user_ids start_of_user_id = sp_non_noise_labels["user_id"] != sp_non_noise_labels["user_id"].shift(1) # calculate the offset (= last location id of the previous user) # multiplication is to mask all positions where no new user starts and addition is to have a +1 when a # new user starts loc_id_offset = sp_non_noise_labels["location_id"].shift(1) * start_of_user_id + start_of_user_id # fill first nan with 0 and create the cumulative sum loc_id_offset = loc_id_offset.fillna(0).cumsum() sp_non_noise_labels["location_id"] = sp_non_noise_labels["location_id"] + loc_id_offset sp = gpd.GeoDataFrame(pd.concat([sp_non_noise_labels, sp_noise_labels]), geometry=geo_col) sp.sort_values(["user_id", "started_at"], inplace=True) else: _gen_locs_dbscan(sp, db=db, distance_metric=distance_metric) ### create locations as grouped staypoints temp_sp = sp[["user_id", "location_id", sp.geometry.name]] if agg_level == "user": # directly dissolve by 'user_id' and 'location_id' locs = temp_sp.dissolve(by=["user_id", "location_id"], as_index=False) else: ## generate user-location pairs with same geometries across users # get user-location pairs locs = temp_sp[["user_id", "location_id"]].drop_duplicates(ignore_index=True) # get location geometries geom_gdf = temp_sp.dissolve(by=["location_id"], as_index=False).drop(columns={"user_id"}) # merge pairs with location geometries locs = geom_gdf.merge(locs, on="location_id", how="right") # filter staypoints not belonging to locations locs = locs.loc[locs["location_id"] != -1] if check_gdf_planar(locs): locs["center"] = locs.geometry.centroid else: # error of wrapping e.g. mean([-180, +180]) -> own function needed locs["center"] = angle_centroid_multipoints(locs.geometry) # extent is the convex hull of the geometry locs["extent"] = locs.geometry.convex_hull # We create a buffer of distance epsilon around the convex_hull to denote location extent # Perform meter to decimal conversion if the distance metric is haversine if distance_metric == "haversine": locs["extent"] = locs.apply( lambda p: p["extent"].buffer(meters_to_decimal_degrees(epsilon, p["center"].y)), axis=1, result_type="reduce", ) else: locs["extent"] = locs["extent"].buffer(epsilon) locs = locs.set_geometry("center", crs=sp.crs) locs = locs[["user_id", "location_id", "center", "extent"]] # index management locs.rename(columns={"location_id": "id"}, inplace=True) locs.set_index("id", inplace=True) # staypoints not linked to a location receive np.nan in 'location_id' sp.loc[sp["location_id"] == -1, "location_id"] = np.nan # merge non_activities back if "activities_only" flag is set sp = pd.concat([sp, non_activities]) ## dtype consistency # locs id (generated by this function) should be int64 locs.index = locs.index.astype("int64") # location_id of staypoints can only be in Int64 (missing values) sp["location_id"] = sp["location_id"].astype("Int64") # user_id of locs should be the same as sp locs["user_id"] = locs["user_id"].astype(sp["user_id"].dtype) if len(locs) == 0: warnings.warn("No locations can be generated, returning empty locs.") return sp, locs # keep class of staypoints sp = Staypoints(sp) if isinstance(staypoints, Staypoints) else sp return sp, Locations(locs)
def _gen_locs_dbscan(sp, distance_metric, db): """Small helper function that takes staypoints and apply them to DBSCAN. Parameters ---------- sp : Staypoints distance_metric : str db : sklearn.cluster.DBSCAN Returns ------- sp : Staypoints Staypoints with new column "location_id" """ p = np.array([sp.geometry.x, sp.geometry.y]).transpose() if distance_metric == "haversine": p = np.deg2rad(p) # haversine distance metric assumes input is in rad labels = db.fit_predict(p) sp["location_id"] = labels return sp
[docs] def merge_staypoints(staypoints, triplegs, max_time_gap="10min", agg={}): """ Aggregate staypoints horizontally via time threshold. Parameters ---------- staypoints : Staypoints The staypoints must contain a column `location_id` (see `generate_locations` function) triplegs: Triplegs max_time_gap : str or pd.Timedelta, default "10min" Maximum duration between staypoints to still be merged. If str must be parsable by pd.to_timedelta. agg: dict, optional Dictionary to aggregate the rows after merging staypoints. This dictionary is used as input to the pandas aggregate function: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.agg.html If empty, only the required columns of staypoints (which are ['user_id', 'started_at', 'finished_at']) are aggregated and returned. In order to return for example also the geometry column of the merged staypoints, set 'agg={"geom":"first"}' to return the first geometry of the merged staypoints, or 'agg={"geom":"last"}' to use the last one. Returns ------- sp: Staypoints The new staypoints with the default columns and columns in `agg`, where staypoints at same location and close in time are aggregated. Notes ----- - Due to the modification of the staypoint index, the relation between the staypoints and the corresponding positionfixes **is broken** after execution of this function! In explanation, the staypoint_id column of pfs does not necessarily correspond to an id in the new sp table that is returned from this function. The same holds for trips (if generated yet) where the staypoints contained in a trip might be merged in this function. - If there is a tripleg between two staypoints, the staypoints are not merged. If you for some reason want to merge such staypoints, simply pass an empty DataFrame for the tpls argument. Examples -------- >>> # direct function call >>> ti.preprocessing.merge_staypoints(staypoints=sp, triplegs=tpls) >>> # or using the trackintel datamodel >>> sp.merge_staypoints(triplegs, max_time_gap="1h", agg={"geom":"first"}) """ if isinstance(max_time_gap, str): max_time_gap = pd.to_timedelta(max_time_gap) # otherwise check if it's a Timedelta already, and raise error if not elif not isinstance(max_time_gap, pd.Timedelta): raise TypeError("Parameter max_time_gap must be either of type String or pd.Timedelta!") assert "location_id" in staypoints.columns, "Staypoints must contain column location_id" sp_merge = staypoints.copy() index_name = staypoints.index.name # concat sp and tpls to get information whether there is a tripleg between to staypoints tpls_merge = triplegs.copy() tpls_merge["type"] = "tripleg" sp_merge["type"] = "staypoint" # convert datatypes in order to preserve the datatypes (especially ints) despite of NaNs during concat sp_merge = sp_merge.convert_dtypes() # a joined dataframe sp_tpls is constructed to add the columns 'type' and 'next_type' to the 'sp_merge' table # concat and sort by time sp_tpls = pd.concat([sp_merge, tpls_merge]).sort_values(by=["user_id", "started_at"]) # TODO: we want to make tpls as argumtent optional and adapt this logic here. See issue #463. sp_tpls.index.rename(index_name, inplace=True) # get information whether the there is a tripleg after a staypoint sp_tpls["next_type"] = sp_tpls["type"].shift(-1) # get only staypoints, but with next type information sp_merge = sp_tpls[sp_tpls["type"] == "staypoint"] # reset index and make temporary index sp_merge = sp_merge.reset_index() # copy index to use it in the end (id is modified) sp_merge["index_temp"] = sp_merge[index_name] # roll by 1 to get next row info sp_merge[["next_user_id", "next_started_at", "next_location_id"]] = sp_merge[ ["user_id", "started_at", "location_id"] ].shift(-1) # Conditions to keep on merging cond = pd.Series(data=False, index=sp_merge.index) cond_old = pd.Series(data=True, index=sp_merge.index) cond_diff = cond != cond_old while np.sum(cond_diff) >= 1: # .values is important otherwise the "=" would imply a join via the new index sp_merge["next_id"] = sp_merge["index_temp"].shift(-1).values # identify rows to merge cond0 = sp_merge["next_user_id"] == sp_merge["user_id"] cond1 = sp_merge["next_started_at"] - sp_merge["finished_at"] <= max_time_gap # time constraint cond2 = sp_merge["location_id"] == sp_merge["next_location_id"] cond3 = sp_merge["index_temp"] != sp_merge["next_id"] # already merged cond4 = sp_merge["next_type"] != "tripleg" # no tripleg inbetween two staypoints cond = cond0 & cond1 & cond2 & cond3 & cond4 # assign index to next row sp_merge.loc[cond, "index_temp"] = sp_merge.loc[cond, "next_id"] # check whether anything was changed cond_diff = cond != cond_old cond_old = cond.copy() # Staypoint-required columnsare aggregated in the following manner: agg_dict = { index_name: "first", "user_id": "first", "started_at": "first", "finished_at": "last", "location_id": "first", } # User-defined further aggregation agg_dict.update(agg) # aggregate values sp = sp_merge.groupby(by="index_temp").agg(agg_dict).sort_values(by=["user_id", "started_at"]) # clean sp = sp.set_index(index_name) return sp