Source code for trackintel.preprocessing.positionfixes

import datetime
import warnings

import geopandas as gpd
import numpy as np
import pandas as pd
from shapely.geometry import LineString

from trackintel import Positionfixes, Staypoints, Triplegs
from trackintel.geogr import check_gdf_planar, point_haversine_dist
from trackintel.preprocessing.util import _explode_agg, angle_centroid_multipoints, applyParallel


[docs] def generate_staypoints( positionfixes, method="sliding", distance_metric="haversine", dist_threshold=100, time_threshold=5.0, gap_threshold=15.0, include_last=False, print_progress=False, exclude_duplicate_pfs=True, n_jobs=1, ): """ Generate staypoints from positionfixes. Parameters ---------- positionfixes : Positionfixes method : {'sliding'} Method to create staypoints. 'sliding' applies a sliding window over the data. distance_metric : {'haversine'} The distance metric used by the applied method. dist_threshold : float, default 100 The distance threshold for the 'sliding' method, i.e., how far someone has to travel to generate a new staypoint. Units depend on the dist_func parameter. If 'distance_metric' is 'haversine' the unit is in meters time_threshold : float, default 5.0 (minutes) The time threshold for the 'sliding' method in minutes. gap_threshold : float, default 15.0 (minutes) The time threshold of determine whether a gap exists between consecutive pfs. Consecutive pfs with temporal gaps larger than 'gap_threshold' will be excluded from staypoints generation. Only valid in 'sliding' method. include_last: boolean, default False The algorithm in Li et al. (2008) only detects staypoint if the user steps out of that staypoint. This will omit the last staypoint (if any). Set 'include_last' to True to include this last staypoint. print_progress: boolean, default False Show per-user progress if set to True. exclude_duplicate_pfs: boolean, default True Filters duplicate positionfixes before generating staypoints. Duplicates can lead to problems in later processing steps (e.g., when generating triplegs). It is not recommended to set this to False. n_jobs: int, default 1 The maximum number of concurrently running jobs. If -1 all CPUs are used. If 1 is given, no parallel computing code is used at all, which is useful for debugging. See https://joblib.readthedocs.io/en/latest/parallel.html#parallel-reference-documentation for a detailed description Returns ------- pfs: Positionfixes The original positionfixes with a new column ``[`staypoint_id`]``. sp: Staypoints The generated staypoints. Notes ----- The 'sliding' method is adapted from Li et al. (2008). In the original algorithm, the 'finished_at' time for the current staypoint lasts until the 'tracked_at' time of the first positionfix outside this staypoint. Users are assumed to be stationary during this missing period and potential tracking gaps may be included in staypoints. To avoid including too large missing signal gaps, set 'gap_threshold' to a small value, e.g., 15 min. Examples -------- >>> pfs.generate_staypoints('sliding', dist_threshold=100) References ---------- Zheng, Y. (2015). Trajectory data mining: an overview. ACM Transactions on Intelligent Systems and Technology (TIST), 6(3), 29. Li, Q., Zheng, Y., Xie, X., Chen, Y., Liu, W., & Ma, W. Y. (2008, November). Mining user similarity based on location history. In Proceedings of the 16th ACM SIGSPATIAL international conference on Advances in geographic information systems (p. 34). ACM. """ Positionfixes.validate(positionfixes) # copy the original pfs for adding 'staypoint_id' column pfs = positionfixes.copy() if exclude_duplicate_pfs: len_org = pfs.shape[0] pfs = pfs.drop_duplicates() nb_dropped = len_org - pfs.shape[0] if nb_dropped > 0: warn_str = ( f"{nb_dropped} duplicates were dropped from your positionfixes. Dropping duplicates is" + " recommended but can be prevented using the 'exclude_duplicate_pfs' flag." ) warnings.warn(warn_str) # if the positionfixes already have a column "staypoint_id", we drop it if "staypoint_id" in pfs: pfs.drop(columns="staypoint_id", inplace=True) elevation_flag = "elevation" in pfs.columns # if there is elevation data geo_col = pfs.geometry.name if elevation_flag: sp_column = ["user_id", "started_at", "finished_at", "elevation", geo_col] else: sp_column = ["user_id", "started_at", "finished_at", geo_col] # TODO: tests using a different distance function, e.g., L2 distance if method == "sliding": # Algorithm from Li et al. (2008). For details, please refer to the paper. sp = applyParallel( pfs.groupby("user_id", as_index=False), _generate_staypoints_sliding_user, n_jobs=n_jobs, print_progress=print_progress, geo_col=geo_col, elevation_flag=elevation_flag, dist_threshold=dist_threshold, time_threshold=time_threshold, gap_threshold=gap_threshold, distance_metric=distance_metric, include_last=include_last, ).reset_index(drop=True) # index management sp["staypoint_id"] = sp.index sp.index.name = "id" if "pfs_id" not in sp.columns: sp["pfs_id"] = None pfs = _explode_agg("pfs_id", "staypoint_id", pfs, sp) sp = gpd.GeoDataFrame(sp, columns=sp_column, geometry=geo_col, crs=pfs.crs) ## dtype consistency # sp id (generated by this function) should be int64 sp.index = sp.index.astype("int64") # ret_pfs['staypoint_id'] should be Int64 (missing values) pfs["staypoint_id"] = pfs["staypoint_id"].astype("Int64") # user_id of sp should be the same as ret_pfs sp["user_id"] = sp["user_id"].astype(pfs["user_id"].dtype) if len(sp) == 0: warnings.warn("No staypoints can be generated, returning empty sp.") return pfs, sp return pfs, Staypoints(sp)
[docs] def generate_triplegs( positionfixes, staypoints=None, method="between_staypoints", gap_threshold=15, ): """ Generate triplegs from positionfixes. Parameters ---------- positionfixes : Positionfixes If 'staypoint_id' column is not found, 'staypoints' needs to be provided. staypoints : Staypoints, optional The staypoints (corresponding to the positionfixes). If this is not passed, the positionfixes need 'staypoint_id' associated with them. method: {'between_staypoints', 'overlap_staypoints'} Method to create triplegs. 'between_staypoints' method defines a tripleg as all positionfixes between two staypoints (no overlap). 'overlap_staypoints' method defines a tripleg as all positionfixes between two staypoints and includes the coordinates of the staypoints. The latter method require positionfixes to have the 'staypoint_id' column and passing staypoints as an input. gap_threshold: float, default 15 (minutes) Maximum allowed temporal gap size in minutes. If tracking data is missing for more than `gap_threshold` minutes, a new tripleg will be generated. Returns ------- pfs: Positionfixes The original positionfixes with a new column ``[`tripleg_id`]``. tpls: Triplegs The generated triplegs. Notes ----- The methods require either a column 'staypoint_id' on the positionfixes or passing some staypoints that correspond to the positionfixes! This means you usually should call ``generate_staypoints()`` first. Following the assumptions in the function generate_staypoints(), to ensure consistency, the time extend and geometry for triplegs is defined as follows: - 'between_staypoints': The generated tripleg will not have overlapping pf with the existing sps, thus triplegs' 'geometry' does not have common pf as sps. 'started_at' is the timestamp of the first pf after a sp, and 'finished_at' is the time of the last pf before a sp. This means a temporal gap will occur between the first pf of sp and the last pf of tripleg: pfs_stp_first['tracked_at'] - pfs_tpl_last['tracked_at'] != 0. No temporal gap will occur between sp ends and tripleg starts (as per sp time definition). - 'overlap_staypoints': The generated tripleg will have common start and end point geometries with the existing sps. 'started_at' is the timestamp of the first pf after a sp (same as 'between_staypoints', to be consistent with generate_staypoints()), and 'finished_at' is the time of the first pf of a following sp. Temporal gaps will not occur between sps and triplegs. Examples -------- >>> pfs.generate_triplegs('between_staypoints', gap_threshold=15) """ Positionfixes.validate(positionfixes) # copy the original pfs for adding 'tripleg_id' column pfs = positionfixes.copy() # if the positionfixes already have a column "tripleg_id", we drop it if "tripleg_id" in pfs: pfs.drop(columns="tripleg_id", inplace=True) # we need to ensure pfs is properly ordered pfs.sort_values(by=["user_id", "tracked_at"], inplace=True) # get case: # Case 1: True, pfs have a column 'staypoint_id' # Case 2: False, pfs do not have a column 'staypoint_id' but staypoint are provided staypoints_exist = "staypoint_id" in pfs.columns if staypoints is not None: Staypoints.validate(staypoints) if (staypoints is None) and (not staypoints_exist): raise TypeError("staypoints input must be provided for pfs without staypoint_id column.") if method == "overlap_staypoints": if staypoints is None: raise TypeError("staypoints input must be provided for overlap_staypoints method.") if not staypoints_exist: raise TypeError("positionfixes must contain a staypoint_id column for overlap_staypoints method.") if method not in ["between_staypoints", "overlap_staypoints"]: raise ValueError( f"Method unknown. We only support 'between_staypoints' and 'overlap_staypoints'. You passed {method}" ) # Preprocessing for case 2: # - step 1: Assign staypoint ids to positionfixes by matching timestamps (per user) # - step 2: Find first positionfix after a staypoint # (relevant if the pfs of sp are not provided, and we can only infer the pfs after sp through time) if not staypoints_exist: warnings.warn( "Providing positionfixes without a 'staypoint_id' column will no longer be supported in future releases, consider call generate_staypoints() first.", DeprecationWarning, ) # initialize the index list of pfs where a tpl will begin insert_index_ls = [] pfs["staypoint_id"] = pd.NA for user_id_this in pfs["user_id"].unique(): sp_user = staypoints[staypoints["user_id"] == user_id_this] pfs_user = pfs[pfs["user_id"] == user_id_this] # step 1 # All positionfixes with timestamp between staypoints are assigned the value 0 # Intersect all positionfixes of a user with all staypoints of the same user intervals = pd.IntervalIndex.from_arrays(sp_user["started_at"], sp_user["finished_at"], closed="left") is_in_interval = pfs_user["tracked_at"].apply(lambda x: intervals.contains(x).any()).astype("bool") pfs.loc[is_in_interval[is_in_interval].index, "staypoint_id"] = 0 # step 2 # Identify first positionfix after a staypoint # find index of closest positionfix with equal or greater timestamp. tracked_at_sorted = pfs_user["tracked_at"].sort_values() insert_position_user = tracked_at_sorted.searchsorted(sp_user["finished_at"]) insert_index_user = tracked_at_sorted.iloc[insert_position_user].index # store the insert insert_position_user in an array insert_index_ls.extend(list(insert_index_user)) # cond_staypoints_case2 = pd.Series(False, index=pfs.index) cond_staypoints_case2.loc[insert_index_ls] = True # initialize tripleg_id with pd.NA and fill all pfs that belong to staypoints with -1 # pd.NA will be replaced later with tripleg ids pfs["tripleg_id"] = pd.NA pfs.loc[~pd.isna(pfs["staypoint_id"]), "tripleg_id"] = -1 # get all conditions that trigger a new tripleg. # condition 1: a positionfix belongs to a new tripleg if the user changes. For this we need to sort pfs. # The first positionfix of the new user is the start of a new tripleg (if it is no staypoint) cond_new_user = (pfs["user_id"] != pfs["user_id"].shift(1)) & pd.isna(pfs["staypoint_id"]) # condition 2: Temporal gaps # if there is a gap that is longer than gap_threshold minutes, we start a new tripleg cond_temporal_gap = pfs["tracked_at"] - pfs["tracked_at"].shift(1) > datetime.timedelta(minutes=gap_threshold) # condition 3: staypoint # By our definition the pf after a stp is the first pf of a tpl. # this works only for numeric staypoint ids, TODO: can we change? _stp_id = (pfs["staypoint_id"] + 1).fillna(0) cond_stp = (_stp_id - _stp_id.shift(1)) != 0 # special check for case 2: pfs that belong to stp might not present in the data. # We need to select these pfs using time. if not staypoints_exist: cond_stp = cond_stp | cond_staypoints_case2 # combine conditions cond_all = cond_new_user | cond_temporal_gap | cond_stp # make sure not to create triplegs within staypoints: cond_all = cond_all & pd.isna(pfs["staypoint_id"]) # get the start position of tpls tpls_starts = np.where(cond_all)[0] tpls_diff = np.diff(tpls_starts) # get the start position of staypoint # pd.NA causes error in boolean comparision, replace to -1 sp_id = pfs["staypoint_id"].copy().fillna(-1) unique_sp, sp_starts = np.unique(sp_id, return_index=True) # get the index of where the tpls_starts belong in sp_starts sp_starts = sp_starts[unique_sp != -1] tpls_place_in_sp = np.searchsorted(sp_starts, tpls_starts) # get the length between each stp and tpl try: # pfs ends with stp sp_tpls_diff = sp_starts[tpls_place_in_sp] - tpls_starts # tpls_lengths is the minimum of tpls_diff and sp_tpls_diff # sp_tpls_diff one larger than tpls_diff tpls_lengths = np.minimum(tpls_diff, sp_tpls_diff[:-1]) # the last tpl has length (last stp begin - last tpl begin) tpls_lengths = np.append(tpls_lengths, sp_tpls_diff[-1]) except IndexError: # pfs ends with tpl # ignore the tpls after the last sp sp_tpls_diff ignore_index = tpls_place_in_sp == len(sp_starts) sp_tpls_diff = sp_starts[tpls_place_in_sp[~ignore_index]] - tpls_starts[~ignore_index] # tpls_lengths is the minimum of tpls_diff and sp_tpls_diff tpls_lengths = np.minimum(tpls_diff[: len(sp_tpls_diff)], sp_tpls_diff) tpls_lengths = np.append(tpls_lengths, tpls_diff[len(sp_tpls_diff) :]) # add the length of the last tpl tpls_lengths = np.append(tpls_lengths, len(pfs) - tpls_starts[-1]) # a valid linestring needs 2 points cond_to_remove = np.take(tpls_starts, np.where(tpls_lengths < 2)[0]) cond_all.iloc[cond_to_remove] = False # Note: cond_to_remove is the array index of pfs.index and not pfs.index itself pfs.loc[pfs.index[cond_to_remove], "tripleg_id"] = -1 # assign an incrementing id to all positionfixes that start a tripleg # create triplegs pfs.loc[cond_all, "tripleg_id"] = np.arange(cond_all.sum()) # fill the pd.NAs with the previously observed tripleg_id # pfs not belonging to tripleg are also propagated (with -1) pfs["tripleg_id"] = pfs["tripleg_id"].ffill() # assign back pd.NA to -1 pfs.loc[pfs["tripleg_id"] == -1, "tripleg_id"] = pd.NA # connect staypoints with triplegs if method == "between_staypoints": tpls = pfs.groupby("tripleg_id").agg( user_id=("user_id", "first"), started_at=("tracked_at", "min"), finished_at=("tracked_at", "max"), geom=(pfs.geometry.name, lambda x: LineString(x.tolist())), ) tpls = tpls.set_geometry("geom") tpls.crs = pfs.crs elif method == "overlap_staypoints": tpls, pfs = _generate_triplegs_overlap_staypoints(cond_temporal_gap, pfs, staypoints) # assert validity of triplegs tpls, pfs = _drop_invalid_triplegs(tpls, pfs) if not staypoints_exist: pfs.drop(columns="staypoint_id", inplace=True) # dtype consistency pfs["tripleg_id"] = pfs["tripleg_id"].astype("Int64") tpls.index = tpls.index.astype("int64") tpls.index.name = "id" # user_id of tpls should be the same as pfs tpls["user_id"] = tpls["user_id"].astype(pfs["user_id"].dtype) if len(tpls) == 0: warnings.warn("No triplegs can be generated, returning empty tpls.") return pfs, tpls return pfs, Triplegs(tpls)
def _generate_triplegs_overlap_staypoints(cond_temporal_gap, pfs, staypoints): """Connect staypoints with overlapping triplegs Parameters ---------- cond_temporal_gap : A boolean mask indicating gaps in the pfs data frame. pfs : Positionfixes staypoints : Staypoints Returns ------- tpls: Triplegs tpls with geometries. pfs: Positionfixes original pfs with overlaping tripleg_id with staypoint_id. Notes ----- In case of a staypoint with only one positionfix, the previous tripleg will have a spatial overlap with the staypoint, while the following tripleg will not overlap with the staypoint. """ # keep initial ids from the between staypoints method between_tpls_ids = pfs["tripleg_id"].copy() # conditions to identify overlap positions in the positionfixes: # not a new user & not a tripleg & not a gap at staypoint start & not a gap at staypoint end cond_not_tpl = ~pd.isna(pfs["staypoint_id"]) cond_overlap = ~(pfs["user_id"] != pfs["user_id"].shift(1)) & cond_not_tpl # temporal overlap: overlap tripleg end with start of next staypoint cond_overlap_start = cond_overlap & ~cond_temporal_gap & pd.isna(pfs["tripleg_id"]) pfs.loc[cond_overlap_start, "tripleg_id"] = between_tpls_ids.shift(1)[cond_overlap_start] # time: tpl's end pfs overlaps with sp, but tpl's start time is set as the time of the first pf after sp (see doctrting of generate_triplegs()) tpls = pfs.groupby("tripleg_id").agg( user_id=("user_id", "first"), started_at=("tracked_at", "min"), finished_at=("tracked_at", "max") ) # spatial overlap: overlap tripleg with the location of previous and next staypoint # geometry: tpl's share common start and end pfs with sp cond_overlap_end = cond_overlap & ~cond_temporal_gap.shift(-1).fillna(False) & pd.isna(pfs["tripleg_id"]) pfs.loc[cond_overlap_end, "tripleg_id"] = between_tpls_ids.shift(-1)[cond_overlap_end] cond_empty = pd.isna(pfs["tripleg_id"]) pfs.loc[cond_empty, "tripleg_id"] = between_tpls_ids[cond_empty] # replace geometry of staypoint positionfixes with staypoint geometry pfs_copy = pfs[["tripleg_id", "staypoint_id", pfs.geometry.name]].copy() pfs_copy.loc[cond_not_tpl, pfs_copy.geometry.name] = staypoints.loc[ pfs_copy.loc[cond_not_tpl, "staypoint_id"] ].geometry.values # create and set tripleg geometries tpls["geom"] = pfs_copy.groupby("tripleg_id")[pfs_copy.geometry.name].apply(lambda x: LineString(x)) tpls = tpls.set_geometry("geom") tpls.crs = pfs.crs return tpls, pfs def _generate_staypoints_sliding_user( df, geo_col, elevation_flag, dist_threshold, time_threshold, gap_threshold, distance_metric, include_last=False, ): """User level staypoint generation using sliding method, see generate_staypoints() function for parameter meaning.""" if distance_metric == "haversine": dist_func = point_haversine_dist else: raise ValueError("distance_metric unknown. We only support ['haversine']. " f"You passed {distance_metric}") df = df.sort_index(kind="stable").sort_values(by=["tracked_at"], kind="stable") # transform times to pandas Timedelta to simplify comparisons gap_threshold = pd.Timedelta(gap_threshold, unit="minutes") time_threshold = pd.Timedelta(time_threshold, unit="minutes") # to numpy as access time of numpy array is faster than pandas Series gap_times = ((df.tracked_at - df.tracked_at.shift(1)) > gap_threshold).to_numpy() # put x and y into numpy arrays to speed up the access in the for loop (shapely is slow) x = df[geo_col].x.to_numpy() y = df[geo_col].y.to_numpy() ret_sp = [] curr = start = 0 for curr in range(1, len(df)): # the gap of two consecutive positionfixes should not be too long if gap_times[curr]: start = curr continue delta_dist = dist_func(x[start], y[start], x[curr], y[curr], float_flag=True) if delta_dist >= dist_threshold: # we want the staypoint to have long enough duration if (df["tracked_at"].iloc[curr] - df["tracked_at"].iloc[start]) >= time_threshold: ret_sp.append(__create_new_staypoints(start, curr, df, elevation_flag, geo_col)) # distance large enough but time is too short -> not a staypoint # also initializer when new sp is added start = curr if include_last: # aggregate remaining positionfixes # additional control: we aggregate only if duration longer than time_threshold if (df["tracked_at"].iloc[curr] - df["tracked_at"].iloc[start]) >= time_threshold: new_sp = __create_new_staypoints(start, curr, df, elevation_flag, geo_col, last_flag=True) ret_sp.append(new_sp) ret_sp = pd.DataFrame(ret_sp) ret_sp["user_id"] = df["user_id"].unique()[0] return ret_sp def __create_new_staypoints(start, end, pfs, elevation_flag, geo_col, last_flag=False): """Create a staypoint with relevant information from start to end pfs.""" new_sp = {} # Here we consider pfs[end] time for stp 'finished_at', but only include # pfs[end - 1] for stp geometry and pfs linkage. new_sp["started_at"] = pfs["tracked_at"].iloc[start] new_sp["finished_at"] = pfs["tracked_at"].iloc[end] # if end is the last pfs, we want to include the info from it as well if last_flag: end = len(pfs) points = pfs[geo_col].iloc[start:end].unary_union if check_gdf_planar(pfs): new_sp[geo_col] = points.centroid else: new_sp[geo_col] = angle_centroid_multipoints(points)[0] if elevation_flag: new_sp["elevation"] = pfs["elevation"].iloc[start:end].median() new_sp["pfs_id"] = pfs.index[start:end].to_list() return new_sp def _drop_invalid_triplegs(tpls, pfs): """Remove triplegs with invalid geometries. Also remove the corresponding invalid tripleg ids from positionfixes. Parameters ---------- tpls : Triplegs pfs : Positionfixes Returns ------- tpls: Triplegs original tpls with invalid geometries removed. pfs: Positionfixes original pfs with invalid tripleg id set to pd.NA. Notes ----- Valid is defined using shapely (https://shapely.readthedocs.io/en/stable/manual.html#object.is_valid) via the geopandas accessor. """ invalid_tpls = tpls[~tpls.geometry.is_valid] if invalid_tpls.shape[0] > 0: # identify invalid tripleg ids invalid_tpls_ids = invalid_tpls.index.to_list() # reset tpls id in pfs invalid_pfs_ixs = pfs[pfs.tripleg_id.isin(invalid_tpls_ids)].index pfs.loc[invalid_pfs_ixs, "tripleg_id"] = pd.NA warn_string = ( f"The positionfixes with ids {invalid_pfs_ixs.values} lead to invalid tripleg geometries. The " f"resulting triplegs were omitted and the tripleg id of the positionfixes was set to nan" ) warnings.warn(warn_string) # return valid triplegs tpls = tpls[tpls.geometry.is_valid] return tpls, pfs