Source code for trackintel.io.dataset_reader

# -*- coding: utf-8 -*-

import glob
import os
from zipfile import ZipFile

import geopandas as gpd
import numpy as np
import pandas as pd
from shapely.geometry import LineString
from sklearn.neighbors import NearestNeighbors
from tqdm import tqdm

from trackintel.preprocessing.util import calc_temp_overlap
from trackintel import Positionfixes, Staypoints, Triplegs
from trackintel.io import read_positionfixes_gpd

FEET2METER = 0.3048
CRS_WGS84 = 4326
CRS_CH1903 = 21781
MZMV_encoding = "latin1"


[docs] def read_geolife(geolife_path, print_progress=False): """ Read raw geolife data and return trackintel positionfixes. This functions parses all geolife data available in the directory ``geolife_path`` Parameters ---------- geolife_path: str Path to the directory with the geolife data print_progress: Bool, default False Show per-user progress if set to True. Returns ------- gdf: Positionfixes Contains all loaded geolife positionfixes labels: dict Dictionary with the available mode labels. Keys are user ids of users that have a "labels.txt" in their folder. Notes ----- The geopandas dataframe has the following columns and datatype: 'elevation': float64 (in meters); 'tracked_at': datetime64[ns]; 'user_id': int64; 'geom': shapely geometry; 'accuracy': None; For some users, travel mode labels are provided as .txt file. These labels are read and returned as label dictionary. The label dictionary contains the user ids as keys and DataFrames with the available labels as values. Labels can be added to each user at the tripleg level, see :func:`trackintel.io.geolife_add_modes_to_triplegs` for more details. The folder structure within the geolife directory needs to be identical with the folder structure available from the official download. The means that the top level folder (provided with 'geolife_path') contains the folders for the different users: | geolife_path | ├── 000 | │ ├── Trajectory | │ │ ├── 20081023025304.plt | │ │ ├── 20081024020959.plt | │ │ ├── 20081026134407.plt | │ │ └── ... | ├── 001 | │ ├── Trajectory | │ │ └── ... | │ ... | ├── 010 | │ ├── labels.txt | │ ├── Trajectory | │ │ └── ... | └── ... the geolife dataset as it can be downloaded from: https://www.microsoft.com/en-us/research/publication/geolife-gps-trajectory-dataset-user-guide/ References ---------- [1] Yu Zheng, Lizhu Zhang, Xing Xie, Wei-Ying Ma. Mining interesting locations and travel sequences from GPS trajectories. In Proceedings of International conference on World Wild Web (WWW 2009), Madrid Spain. ACM Press: 791-800. [2] Yu Zheng, Quannan Li, Yukun Chen, Xing Xie, Wei-Ying Ma. Understanding Mobility Based on GPS Data. In Proceedings of ACM conference on Ubiquitous Computing (UbiComp 2008), Seoul, Korea. ACM Press: 312-321. [3] Yu Zheng, Xing Xie, Wei-Ying Ma, GeoLife: A Collaborative Social Networking Service among User, location and trajectory. Invited paper, in IEEE Data Engineering Bulletin. 33, 2, 2010, pp. 32-40. Example ---------- >>> from trackintel.io import read_geolife >>> pfs, mode_labels = read_geolife(os.path.join('downloads', 'Geolife Trajectories 1.3')) """ # u are strings in the format "052", "002". uids = [u for u in os.listdir(geolife_path) if os.path.isdir(os.path.join(geolife_path, u))] if len(uids) == 0: raise FileNotFoundError(f"No user folders found at path {geolife_path}") for user_id in uids: try: int(user_id) except ValueError as err: errmsg = ( f"Invalid user_id '{user_id}' found in geolife path '{os.path.join(geolife_path, user_id)}'." "The geolife path can only contain folders named with integers that represent the user id." ) raise ValueError(errmsg) from err labels = _get_labels(geolife_path, uids) # get the dfs in form of an generator and concatinate them gdf = pd.concat(_get_df(geolife_path, uids, print_progress), axis=0, ignore_index=True) gdf = Positionfixes(gdf, geometry="geom", crs=CRS_WGS84) gdf["accuracy"] = np.nan gdf.index.name = "id" return gdf, labels
def _get_labels(geolife_path, uids): """Generate dictionary with the available mode labels. Parameters ---------- geolife_path : str Path to the directory with the geolife data. uids : iterable User folders in the geolife data directory. Returns ------- dict dict containing the mode labels with the uids in the keys. Notes ----- No further checks are done on user ids, they must be convertable to ints. """ labels_rename = {"Start Time": "started_at", "End Time": "finished_at", "Transportation Mode": "mode"} label_dict = {} # output dict for the labels # get paths to all "labels.txt" files. possible_label_paths = ((os.path.join(geolife_path, user_id, "labels.txt"), user_id) for user_id in uids) label_paths = ((path, user_id) for path, user_id in possible_label_paths if os.path.exists(path)) # insert all labels into the output dict for path, user_id in label_paths: labels = pd.read_csv(path, delimiter="\t") labels.rename(columns=labels_rename, inplace=True) labels["started_at"] = pd.to_datetime(labels["started_at"], format="%Y/%m/%d %H:%M:%S", utc=True) labels["finished_at"] = pd.to_datetime(labels["finished_at"], format="%Y/%m/%d %H:%M:%S", utc=True) label_dict[int(user_id)] = labels return label_dict def _get_df(geolife_path, uids, print_progress): """Create a generator that yields single trajectory dataframes. Parameters ---------- geolife_path : str Path to the directory with the geolife data. uids : iterable User folders in the geolife data directory. print_progress : bool Show per-user progress if set to True. Yields ------- pd.DataFrame A single DataFrame from a single trajectory file. Notes ----- No further checks are done on user ids, they must be convertable to ints. """ disable = not print_progress names = ["latitude", "longitude", "zeros", "elevation", "date days", "date", "time"] usecols = ["latitude", "longitude", "elevation", "date", "time"] for user_id in tqdm(uids, disable=disable): pattern = os.path.join(geolife_path, user_id, "Trajectory", "*.plt") for traj_file in glob.glob(pattern): data = pd.read_csv(traj_file, skiprows=6, header=None, names=names, usecols=usecols) data["tracked_at"] = pd.to_datetime(data["date"] + " " + data["time"], format="%Y-%m-%d %H:%M:%S", utc=True) data["geom"] = gpd.points_from_xy(data["longitude"], data["latitude"]) data["user_id"] = int(user_id) data["elevation"] = data["elevation"] * FEET2METER data.drop(columns=["date", "time", "longitude", "latitude"], inplace=True) yield data
[docs] def geolife_add_modes_to_triplegs( triplegs, labels, ratio_threshold=0.5, max_triplegs=20, max_duration_tripleg=7 * 24 * 60 * 60 ): """ Add available mode labels to geolife data. The Geolife dataset provides a set of tripleg labels that are defined by a duration but are not matched to the Geolife tracking data. This function matches the labels to triplegs based on their temporal overlap. Parameters ---------- triplegs : Triplegs Geolife triplegs. labels : dictionary Geolife labels as provided by the trackintel `read_geolife` function. ratio_threshold : float, default 0.5 How much a label needs to overlap a tripleg to assign a the to this tripleg. max_triplegs : int, default 20 Number of neighbors that are considered in the search for matching triplegs. max_duration_tripleg : float, default 7 * 24 * 60 * 60 (seconds) Used for a primary filter. All triplegs that are further away in time than 'max_duration_tripleg' from a label won't be considered for matching. Returns ------- tpls : Triplegs triplegs with mode labels. Notes ------ In the case that several labels overlap with the same tripleg the label with the highest overlap (relative to the tripleg) is chosen Example ---------- >>> from trackintel.io import read_geolife, geolife_add_modes_to_triplegs >>> pfs, mode_labels = read_geolife(os.path.join('downloads', 'Geolife Trajectories 1.3')) >>> pfs, sp = pfs.generate_staypoints() >>> pfs, tpls = pfs.generate_triplegs(sp) >>> tpls = geolife_add_modes_to_triplegs(tpls, mode_labels) """ tpls = triplegs.copy() # temp time fields for nn query tpls["started_at_s"] = (tpls["started_at"] - pd.Timestamp("1970-01-01", tz="utc")) // pd.Timedelta("1s") tpls["finished_at_s"] = (tpls["finished_at"] - pd.Timestamp("1970-01-01", tz="utc")) // pd.Timedelta("1s") # tpls_id_mode_list is used to collect tripleg-mode matches. It will be filled with dictionaries with the # following keys: [id', 'label_id', 'mode'] tpls_id_mode_list = list() for user_this in labels.keys(): tpls_this = tpls[tpls["user_id"] == user_this] labels_this = labels[user_this] labels_this["started_at_s"] = ( labels_this["started_at"] - pd.Timestamp("1970-01-01", tz="utc") ) // pd.Timedelta("1s") labels_this["finished_at_s"] = ( labels_this["finished_at"] - pd.Timestamp("1970-01-01", tz="utc") ) // pd.Timedelta("1s") # fit search tree on timestamps if tpls_this.shape[0] < max_triplegs: max_triplegs = tpls_this.shape[0] nn = NearestNeighbors(n_neighbors=max_triplegs, metric="chebyshev") nn.fit(tpls_this[["started_at_s", "finished_at_s"]]) # find closest neighbours for timestamps in labels distances, candidates = nn.kneighbors(labels_this[["started_at_s", "finished_at_s"]]) # filter anything above max_duration_tripleg (max distance start or end) pre_filter = distances > max_duration_tripleg candidates = pd.DataFrame(candidates, dtype="Int64") candidates[pre_filter] = np.nan candidates.dropna(how="all", inplace=True) # collect the tripleg - mode matches in the tpls_id_mode_list.extend(_calc_overlap_for_candidates(candidates, tpls_this, labels_this, ratio_threshold)) if len(tpls_id_mode_list) == 0: tpls["mode"] = np.nan else: tpls_id_mode = pd.DataFrame(tpls_id_mode_list) # chose label with highest overlap tpls_id_mode = tpls_id_mode.sort_values(by=["id", "ratio"]) # keep last (df sorted ascending) tpls_id_mode = tpls_id_mode.drop_duplicates(subset="id", keep="last").set_index("id") tpls = tpls.join(tpls_id_mode) tpls = tpls.astype({"label_id": "Int64"}) tpls.drop(["started_at_s", "finished_at_s"], axis=1, inplace=True) try: tpls.drop(["ratio"], axis=1, inplace=True) except KeyError: pass return tpls
def _calc_overlap_for_candidates(candidates, tpls_this, labels_this, ratio_threshold): """ Iterate all candidate triplegs and labels for a single user. Parameters ---------- candidates : DataFrame A dataframe that has the following properties: index = Reference to position in the label_this dataframe columns: nb of neighbors, sorted by temporal distance values: Reference to position in the tpls_this dataframe tpls_this : Triplegs triplegs of a single user labels_this : DataFrame labels of a single user ratio_threshold : float, optional How much a label needs to overlap a tripleg to assign a the to this tripleg. Returns ------- tpls_id_mode_list : list tpls_id_mode_list is used to collect tripleg-mode matches. It will be filled with dictionaries with the following keys: [id', 'label_id', 'mode'] Notes ----- Candidates is a matrix with one row per label and where each column corresponds to a potential tripleg match. All potential tripleg matches that are overlapped (in time) by more than ratio_threshold by a label are assigned this label. """ tpls_id_mode_list = [] # iterate all rows for label_pos, row in candidates.iterrows(): potential_label = labels_this.iloc[label_pos, :] # for every row, iterate all columns. Unused column index would indicate the nth column. for _, tpls_pos in row.items(): # skip if tripleg was prefiltered and set to nan if pd.isna(tpls_pos): continue potential_tripleg = tpls_this.iloc[tpls_pos, :] ratio_this = calc_temp_overlap( potential_tripleg["started_at"], potential_tripleg["finished_at"], potential_label["started_at"], potential_label["finished_at"], ) if ratio_this >= ratio_threshold: # assign label to tripleg (by storing matching in dictionary) tpls_id_mode_list.append( { "id": potential_tripleg.name, "label_id": potential_label.name, "mode": potential_label["mode"], "ratio": ratio_this, } ) return tpls_id_mode_list def read_mzmv(mzmv_path): """Read the data from Swiss "Mikrozensus Mobilität und Verkehr" Parameters ---------- mzmv_path : str Path to unzipped data folder of MZMV (everything else should be left zipped). Returns ------- trips : Trips sp : Staypoints tpls : Triplegs Notes ----- !important! As geometry for triplegs we set the verification points (column `VP_XY`), the quality of these points is low but they are available for all entries (required by trackintel model). In the worst case there are only start and end points of the tripleg available. Prefer whenever possible the column `geometry` as this geometry is created by a routing tool but it is not available for all entries. To fit the trackintel model, we rename `HHNR` to `user_id`, rename [`f51100time`, `f51400time`] to [`started_at`, `finished_at`]. Since the MZMV data contains only time information and no date information, the data is set to 1970-01-01. Additional geometry available are the verification points in the columns "VP_XY" and "VP_XY_CH1903" """ shp = os.path.join(mzmv_path, "5_Routen(Geometriefiles)\\CH_routen.zip") db_csv = os.path.join(mzmv_path, "4_DB_csv\\CH_CSV.zip") zf = ZipFile(db_csv) with zf.open("wege.csv") as f: trips = pd.read_csv(f, encoding=MZMV_encoding) rename_columns = {"HHNR": "user_id", "f51100time": "started_at", "f51400time": "finished_at"} trips.rename(columns=rename_columns, inplace=True) # string to time + added date trips["started_at"] = _mzmv_to_datetime(trips["started_at"]) trips["finished_at"] = _mzmv_to_datetime(trips["finished_at"]) # make copy to merge trip_id to triplegs trips.index.name = "trip_id" trip_id_merge = trips[["user_id", "WEGNR"]].reset_index() with zf.open("etappen.csv") as f: tpls = pd.read_csv(f, encoding=MZMV_encoding) tpls.rename(columns=rename_columns, inplace=True) tpls["started_at"] = _mzmv_to_datetime(tpls["started_at"]) tpls["finished_at"] = _mzmv_to_datetime(tpls["finished_at"]) tpls = pd.merge(tpls, trip_id_merge, on=["user_id", "WEGNR"]) def treat_negative_durations(df): """ correct negative durations Negative durations can occur if a trip was started on day 1 and finished on the next day. This is corrected by adding a day to the timestamp. """ duration_neg = (df.finished_at - df.started_at).dt.total_seconds() < 0 df.loc[duration_neg, "finished_at"] = df.loc[duration_neg, "finished_at"] + pd.Timedelta("1 days 00:00:00") treat_negative_durations(tpls) treat_negative_durations(trips) sp = _mzmv_generate_sp(tpls, zf) # Read Geometry: # # possible to pass zip folder as argument as folder contains only one file geometry = gpd.read_file(shp)[["HHNR", "ETNR", "geometry"]] # takes long geometry.rename(columns=rename_columns, inplace=True) tpls = pd.merge(tpls, geometry, on=["user_id", "ETNR"], how="left") vp = _mzmv_verification_points(zf, "verifikationspunkte.csv", tpls) tpls = pd.merge(tpls, vp, on=["user_id", "ETNR"], how="left") # # with the geometry we can build GeoDataFrame tpls = Triplegs(tpls, geometry="geometry", crs=CRS_WGS84) tpls.index.name = "tripleg_id" # # set invalid geometries as missing geometries tpls.loc[~tpls["geometry"].is_valid, "geometry"] = None tpls.loc[~tpls["VP_XY"].is_valid, "VP_XY"] = None # get the mandatory columns for trips prev_trip = sp.loc[sp["prev_trip_id"].notna(), ["prev_trip_id"]].reset_index(names="destination_staypoint_id") next_trip = sp.loc[sp["next_trip_id"].notna(), ["next_trip_id"]].reset_index(names="origin_staypoint_id") trips = trips.merge(prev_trip, left_index=True, right_on="prev_trip_id") trips = trips.merge(next_trip, left_index=True, right_on="next_trip_id") return trips, sp, tpls def _mzmv_verification_points(zf, filepath, tpls): """Extracts verifications points as LineStrings. Start and endpoint of tripleg is added from tpls to gain valid LineStrings. Parameters ---------- zf : zipfile.ZipFile ZipFile with which we can open filepath filepath : str path to verification points file within zf. tpls : DataFrame tpls with renamed columns. Returns ------- pd.DataFrame Filled with aggregated verification points per tripleg. Columns `VP_XY`, `VP_XY_CH1903` contain geometries. """ # MZMV stores 6 points + possible border point in one row # this method aggregates up to 6 points (w/o border) into geometry num_points = 6 with zf.open(filepath) as f: vp = pd.read_csv(f, encoding=MZMV_encoding) vp.rename(columns={"HHNR": "user_id"}, inplace=True) # to be inline with tpls geom_cols = ["{}X", "{}Y", "{}X_CH1903", "{}Y_CH1903"] # e.g. point 2 has geometry R2_X, R2_Y, R2_X_CH1903, R2_Y_CH1903 na_997 = [c.format(f"R{i}_") for i in range(1, num_points + 1) for c in geom_cols] # insert nan to later drop point w/o geometry for col in na_997: vp.loc[vp[col] == -997, col] = np.nan # we only keep information to join back + geometry cols = ["user_id", "ETNR"] + geom_cols group = [[c.format(f"R{i}_") for c in cols] for i in range(1, num_points + 1)] # rename target removes the number in the groups rename_target = [c.format("") for c in cols] rename_maps = [{g_col: r_col for (g_col, r_col) in zip(g, rename_target)} for g in group] # split by verification point number, rename for concat, drop verification point w/o geometry gcol = ["X", "Y", "X_CH1903", "Y_CH1903"] vps = [vp[g].rename(columns=rm).dropna(subset=gcol, how="all") for (g, rm) in zip(group, rename_maps)] vp = pd.concat(vps) # inner order: R1, R2, ..., R6 # gather start and end from tpls and put them into the same format sp = tpls[["user_id", "ETNR", "S_X", "S_Y", "S_X_CH1903", "S_Y_CH1903"]].copy() ep = tpls[["user_id", "ETNR", "Z_X", "Z_Y", "Z_X_CH1903", "Z_Y_CH1903"]].copy() sp.rename(columns={"S_X": "X", "S_Y": "Y", "S_X_CH1903": "X_CH1903", "S_Y_CH1903": "Y_CH1903"}, inplace=True) ep.rename(columns={"Z_X": "X", "Z_Y": "Y", "Z_X_CH1903": "X_CH1903", "Z_Y_CH1903": "Y_CH1903"}, inplace=True) vp = pd.concat((sp, vp, ep)) # right order is important (S, R1, ..., R6, E) vp["VP_XY"] = gpd.points_from_xy(x=vp["X"], y=vp["Y"], crs=CRS_WGS84) vp["VP_XY_CH1903"] = gpd.points_from_xy(x=vp["X_CH1903"], y=vp["Y_CH1903"], crs=CRS_CH1903) vp.drop(columns=["X", "Y", "X_CH1903", "Y_CH1903"], inplace=True) # aggregate points in the same etappe into a linestring aggfuncs = {"VP_XY": lambda xy: LineString(xy.to_list()), "VP_XY_CH1903": lambda xy: LineString(xy.to_list())} # groupby keeps innner order! -> (S, R1, ..., R6, E) are passed to LineString in right order vp = vp.groupby(["user_id", "ETNR"], as_index=False).agg(aggfuncs) return vp def _mzmv_to_datetime(col): """Convert time from mzmv to pd. Timestamp on fix date 1970-01-01. Parameters ---------- col : pd.Series Times stored as strings. Returns ------- pd.Series Times stored as pd.Timestamp[ns] """ postfix = [" 1970-01-01"] * len(col) # no broadcasting possible midnight = col == "24:00:00" col = col.str.cat(postfix) # 24:00:00 is no valid time # to keep everything on same day loose 1 sec col[midnight] = "23:59:59 1970-01-01" return pd.to_datetime(col, format="%H:%M:%S %Y-%m-%d", utc=True) def _mzmv_generate_sp(tpls, zf): """Generate staypoints only from tpls. Parameters ---------- tpls : pd.DataFrame DataFrame with renamed columns. Sorted by "user_id" and "WEGNR" ascending. Must contain column "trip_id". zf : zipfile.ZipFile zipfile with which we can open "haushalte.csv" and "zielpersonen.csv" to add column purpose to staypoints. Returns ------- staypoints : Staypoints staypoints with mandatory columns, all the columns for staypoints from mzmv, and additionally "prev_trip_id", "trip_id", "next_trip_id", "is_activity", "purpose", "purpose_tpls" Notes ----- Encoding for values of "purpose_tpls" can be looked up in the documentation of MZMV. """ assert "trip_id" in tpls.columns # small regression test tpls.sort_values(by=["user_id", "ETNR"], inplace=True) first_tpls = tpls["ETNR"] == 1 # first tripleg of user (ETNR is unique per user) last_tpls = first_tpls.shift(-1, fill_value=True) # create staypoints from starts of tpls # if previous staypoint is different user/trip -> staypoint is activity tpls["S_is_activity"] = (tpls[["user_id", "WEGNR"]] != tpls[["user_id", "WEGNR"]].shift(1)).any(axis=1) # quick and dirty copy trip ids and delete most of in next step tpls["S_prev_trip_id"] = tpls["trip_id"].shift(1) tpls["S_next_trip_id"] = tpls["trip_id"] tpls["S_trip_id"] = tpls["trip_id"] # to not overwrite it # set prev_trip_id of first trips to nan tpls.loc[first_tpls, "S_prev_trip_id"] = np.nan # staypoints that aren't activity are in a trip (don't have prev or next) tpls.loc[~tpls["S_is_activity"], "S_prev_trip_id"] = np.nan tpls.loc[~tpls["S_is_activity"], "S_next_trip_id"] = np.nan # activity is outside of trips -> no trip id tpls.loc[tpls["S_is_activity"], "S_trip_id"] = np.nan tpls["S_finished_at"] = tpls["started_at"] # time you leave staypoint # end of next etappe within same trip is *always* identical to start in previous one. # end of trip within same user is *always* identical to start in previous one. tpls["S_started_at"] = tpls["finished_at"].shift(1, fill_value=pd.NaT) # first trip -> we only know finish time for staypoints --> create zero duration staypoint tpls.loc[first_tpls, "S_started_at"] = tpls.loc[first_tpls, "S_finished_at"] # add purpose of triplegs to staypoints "f52900" is purpose column in MZMV at end of tripleg tpls["S_purpose_tpls"] = tpls["f52900"].shift(1) tpls.loc[first_tpls, "S_purpose_tpls"] = None # **all** the columns that are associated with the staypoints col = [ "X", "Y", "QAL", "BFS", "PLZ", "Ort", "Str", "hnr", "LND", "X_CH1903", "Y_CH1903", "SPRACHE", "REGION", "KANTON", "NUTS3", "AGGLO2000", "AGGLO_GROESSE2000", "STRUKTUR_2000", "STRUKTUR_AGG_2000", "struktur_bfs9_2000", "AGGLO2012", "AGGLO_GROESSE2012", "staedt_char_2012", "stat_stadt_2012", "DEGURBA", "is_activity", "started_at", "finished_at", "prev_trip_id", "next_trip_id", "trip_id", "purpose_tpls", ] # W_X_CH1903, X coordinate of home, CH1903 as integers are better to join s_col = ["user_id", "WEGNR", "ETNR", "W_X_CH1903", "W_Y_CH1903"] + ["S_" + c for c in col] sp = tpls[s_col].copy() sp.rename(columns={"S_" + c: c for c in col}, inplace=True) # what now is missing is the last staypoint of the last trip per user tpls["Z_is_activity"] = True # we filter later tpls["Z_prev_trip_id"] = tpls["trip_id"] tpls["Z_next_trip_id"] = np.nan # is always last trip tpls["Z_trip_id"] = np.nan # outside of trips tpls["Z_started_at"] = tpls["finished_at"] tpls["Z_finished_at"] = pd.NaT tpls["Z_purpose_tpls"] = tpls["f52900"] z_col = ["user_id", "WEGNR", "ETNR", "W_X_CH1903", "W_Y_CH1903"] + ["Z_" + c for c in col] sp_last = tpls.loc[last_tpls, z_col] sp_last.rename(columns={"Z_" + c: c for c in col}, inplace=True) sp = pd.concat((sp, sp_last)) # now we add column purpose to show if work or home # home we can easily get by comparing home-coordinates from triplegs with zf.open("zielpersonen.csv") as f: usecols = ["HHNR", "A_X_CH1903", "A_Y_CH1903", "AU_X_CH1903", "AU_Y_CH1903"] zielpersonen = pd.read_csv(f, encoding=MZMV_encoding, usecols=usecols).rename(columns={"HHNR": "user_id"}) sp = pd.merge(sp, zielpersonen, how="left", on=["user_id"]) # A_<...> coordinates for work, AU_<...> coordinates for education work = ((sp["A_X_CH1903"] == sp["X_CH1903"]) & (sp["A_Y_CH1903"] == sp["Y_CH1903"])) | ( (sp["AU_X_CH1903"] == sp["X_CH1903"]) & (sp["AU_Y_CH1903"] == sp["Y_CH1903"]) ) home = (sp["W_X_CH1903"] == sp["X_CH1903"]) & (sp["W_Y_CH1903"] == sp["Y_CH1903"]) sp.loc[work, "purpose"] = "work" sp.loc[home, "purpose"] = "home" # potentially overwrite work sp.reset_index(drop=True, inplace=True) # drop tpls index sp["XY"] = gpd.points_from_xy(sp["X"], sp["Y"], crs=CRS_WGS84) sp["XY_CH1904"] = gpd.points_from_xy(sp["X_CH1903"], sp["Y_CH1903"], crs=CRS_CH1903) sp = Staypoints(sp, geometry="XY", crs=CRS_WGS84) sp.index.name = "staypoint_id" # fix datetime dtype sp["finished_at"] = pd.to_datetime(sp.finished_at) # clean up sp_drop = ["X", "Y", "X_CH1903", "Y_CH1903"] sp_drop += ["W_X_CH1903", "W_Y_CH1903", "A_X_CH1903", "A_Y_CH1903", "AU_X_CH1903", "AU_Y_CH1903"] sp.drop(columns=sp_drop, inplace=True) added_cols = [b + c for b in ("S_", "Z_") for c in col[-7:]] tpls.drop(columns=added_cols, inplace=True) return sp
[docs] def read_gpx(path): """ Read gpx data and return it as Positionfixes of a single user Parameters ---------- path : str Path to directory of gpx files. Non gpx files are ignored. Returns ------- Positionfixes """ pattern = os.path.join(path, "*.gpx") # sorted to make result deterministic files = sorted(glob.glob(pattern)) if not files: raise FileNotFoundError(f'Found no gpx files in path "{path}"') pfs_list = [] track_fid_offset = 0 for file in files: pfs = _read_single_gpx_file(file) # give each track an unique ID pfs["track_fid"] += track_fid_offset track_fid_offset = pfs["track_fid"].max() + 1 pfs_list.append(pfs) return pd.concat(pfs_list, ignore_index=True)
def _read_single_gpx_file(path): """ Read track points out from single gpx file Extension types are not supported by fiona and therefore dropped. Parameters ---------- path : str Path to `.gpx` file with track Returns ------- Positionfixes """ gdf = gpd.read_file(path, layer="track_points") gdf = gdf.dropna(axis="columns", how="all") # drop empty columns gdf["user_id"] = 0 # maybe we have a smarter way for this return read_positionfixes_gpd(gdf, tracked_at="time")