User:EmericusPetro/sandbox/RFC OpenStreetMap RDF 2023 conventions (proof of concept)
< User:EmericusPetro | sandbox(Redirected from User:EmericusPetro/sandbox/RFC OpenStreetMap RDF 2022 conventions (proof of concept))
Jump to navigation
Jump to search
RFC: OpenStreetMap RDF 2022 conventions (proof of concept)
The software in this page implement the algorithm behind an RFC (Request for Comments) about conventions of OpenStreetMap data encoded with RDF format. Not only will the software have bugs about the convention (which attempts to find consensus of existing good practices), but the conventions use by the code on this page is not normative. |
Script
This page contains public domain proof of concept of a proxy over the production OpenStreetMap API v0.6 which both serves the existing formats (e.g. XML, JSON) but if requested to generate Turtle, it will under the hood request the canonical XML output and convert for you. By default, all requests will de facto API will be cached to avoid you accidentally overload backend servers.
Python
osmapi2rdfproxy.py
#!/usr/bin/env python3
# ==============================================================================
#
# FILE: osmapi2rdfproxy.py
#
# USAGE: hug -f ./osmapi2rdfproxy.py --help
# # Expose http://localhost:8000/ as API endpoint:
# hug --port 8000 -f ./osmapi2rdfproxy.py
# # Disable cache of de facto API calls
# RDFPROXY_TTL="-1" hug -f ./osmapi2rdfproxy.py
#
# DESCRIPTION: Proxy over de facto public OpenStreetMap v0.6 API that adds
# turtle format as additional output intented to be used for
# local testing and/or feedback on the conventions.
# This is not intended for production or near-production use,
# however it implements python Hug (https://www.hug.rest/)
# (great at benchmarks, yet less code) and requests-cache
# (https://requests-cache.readthedocs.io/) so it is out of the
# box a great starting point, even if you don't know python.
#
# OPTIONS: ---
#
# REQUIREMENTS: - python3
# - hug (pip install hug -U)
# - requests-cache (pip install requests-cache)
# - osmrdf2023.py (Python library)
# BUGS: ---
# NOTES: ---
# AUTHORS: Emerson Rocha <rocha[at]ieee.org>
# COLLABORATORS: ---
# LICENSE: Public Domain dedication or Zero-Clause BSD
# SPDX-License-Identifier: Unlicense OR 0BSD
# VERSION: v0.3.0
# CREATED: 2022-11-25 15:53:00Z v0.1.0 started
# REVISION: 2022-11-26 20:47:00Z v0.2.0 node, way, relation basic turtle,
# only attached tags (no <nd> <member> yet)
# 2022-12-21 01:46:00Z v0.3.0 osmrdf2022.py -> osmrdf2023.py
# ==============================================================================
# from poc.osmrdf2023 import (
from osmrdf2023 import (
osmrdf_node_xml2ttl,
osmrdf_relation_xml2ttl,
osmrdf_way_xml2ttl
)
import json
import os
import requests
import requests_cache
import hug
# @TODO Some way to generate metadata from Tags
# Taginfo?
# - https://taginfo.openstreetmap.org/taginfo/apidoc
# maybe parse infoboxes directly. Example:
# wiki.openstreetmap.org/w/index.php?action=raw&title=Tag:highway%3Dbusway
# or https://github.com/earwig/mwparserfromhell
# user configuration ________________________________________________________
# TIP: enviroment variable DE_FACTO_API_BASE="" can be customized!
DE_FACTO_API_BASE = os.getenv(
'DE_FACTO_API', 'https://www.openstreetmap.org/api/0.6')
# @see https://requests-cache.readthedocs.io/en/stable/
# File osmapi_cache.sqlite will cache backend calls
requests_cache.install_cache(
'osmapi_cache',
# https://requests-cache.readthedocs.io/en/stable/user_guide/backends.html
backend=os.getenv('RDFPROXY_CACHE', 'sqlite'),
# https://requests-cache.readthedocs.io/en/stable/user_guide/expiration.html
expire_after=os.getenv('RDFPROXY_TTL', '604800'), # 7 days
)
# ### Hug overrides ____________________________________________________________
@hug.format.content_type('application/xml')
def format_as_xml(data, request=None, response=None):
"""format_as_xml
@FIXME make Hug return XML formating instead of text
"""
return str(data).encode('utf8')
@hug.format.content_type('text/turtle')
def format_as_turtle(data, request=None, response=None):
"""format_as_turtle custom Turtle output format for Hug
"""
return str(data).encode('utf8')
# ### Tell Hug suffix can be used to infer output strategy _____________________
suffix_output = hug.output_format.suffix({
'.json': hug.output_format.json,
'.xml': format_as_xml,
'.ttl': hug.output_format.text,
'': hug.output_format.text, # @FIXME use format_as_xml()
})
# ### Logic for API endpoints __________________________________________________
# @NOTE: This section mostly:
# - Request+cache openstreetmap.org/api/0.6/ API calls and serve as it is
# - If user ask .ttl, uses osmrdf2023.py to generate
# @BUG While osmapi2rdfproxy.py is intented to only generate Turtle for
# discussed types of data, the actuall OpenStreetMap API have more
# endpoints than the ones here.
@hug.get('/changeset/{changeset_uid}', output=suffix_output)
def api_changeset(changeset_uid):
"""api_changeset /api/0.6/changeset/ (no changes, just proxy + cache)
@example http://localhost:8000/changeset/1.json
"""
content = requests.get(
DE_FACTO_API_BASE + '/changeset/' + changeset_uid)
if changeset_uid.endswith('.json'):
result = json.loads(content.text)
else:
result = content.text
return result
# http://localhost:8000/node/1.json
@hug.get('/node/{node_uid}', output=suffix_output)
def api_node(node_uid):
# print(DE_FACTO_API_BASE + '/node/' + node_uid)
content = requests.get(
DE_FACTO_API_BASE + '/node/' + node_uid)
if node_uid.endswith('.json'):
result = json.loads(content.text)
else:
result = content.text
return result
# http://localhost:8000/node/1.ttl
# rdfpipe --input-format='ttl' --output-format=longturtle http://localhost:8000/node/2.ttl
@hug.get('/node/{node_uid}.ttl', output=format_as_turtle)
def api_node_ttl(node_uid):
content = requests.get(
DE_FACTO_API_BASE + '/node/' + node_uid + '.xml')
result = osmrdf_node_xml2ttl(content.text)
return result
# @see https://wiki.openstreetmap.org/wiki/Relation
# @see https://wiki.openstreetmap.org/wiki/Category:Relations
# http://localhost:8000/relation/10000.json
@hug.get('/relation/{relation_uid}', output=suffix_output)
def api_relation(relation_uid):
# print(DE_FACTO_API_BASE + '/relation/' + relation_uid)
content = requests.get(
DE_FACTO_API_BASE + '/relation/' + relation_uid)
if relation_uid.endswith('.json'):
result = json.loads(content.text)
else:
result = content.text
return result
# http://localhost:8000/relation/10000.ttl
# rdfpipe --input-format='ttl' --output-format=longturtle http://localhost:8000/relation/10000.ttl
@hug.get('/relation/{relation_uid}.ttl', output=format_as_turtle)
def api_relation_ttl(relation_uid):
content = requests.get(
DE_FACTO_API_BASE + '/relation/' + relation_uid + '.xml')
result = osmrdf_relation_xml2ttl(content.text)
return result
# http://localhost:8000/way/100.json
@hug.get('/way/{way_uid}', output=suffix_output)
def api_way(way_uid):
# print(DE_FACTO_API_BASE + '/way/' + way_uid)
content = requests.get(
DE_FACTO_API_BASE + '/way/' + way_uid)
if way_uid.endswith('.json'):
result = json.loads(content.text)
else:
result = content.text
return result
# http://localhost:8000/way/100.ttl
# rdfpipe --input-format='ttl' --output-format=longturtle http://localhost:8000/way/100.ttl
@hug.get('/way/{way_id}.ttl', output=format_as_turtle)
def api_node_ttl(way_id):
content = requests.get(
DE_FACTO_API_BASE + '/way/' + way_id + '.xml')
result = osmrdf_way_xml2ttl(content.text)
return result
osmdump2rdfcli.py
#!/usr/bin/env python3
# ==============================================================================
#
# FILE: osmdump2rdfcli.py
#
# USAGE: ./osmdump2rdfcli.py --help
#
# DESCRIPTION: ---
# OPTIONS: ---
#
# REQUIREMENTS: - python3
# BUGS: ---
# NOTES: ---
# AUTHORS: Emerson Rocha <rocha[at]ieee.org>
# COLLABORATORS: ---
# LICENSE: Public Domain dedication or Zero-Clause BSD
# SPDX-License-Identifier: Unlicense OR 0BSD
# VERSION: v0.3.0
# CREATED: 2022-11-27 03:14 UTC v0.2.0 started
# REVISION: 2022-12-21 01:46:00Z v0.3.0 osmrdf2022.py -> osmrdf2023.py
# ==============================================================================
import argparse
import sys
# from poc.osmrdf2023 import (
from osmrdf2023 import (
# osmrdf_xmldump2_ttl,
osmrdf_xmldump2_ttl_v2,
OSMElementFilter
)
STDIN = sys.stdin.buffer
NOMEN = 'osmdump2rdfcli'
PROGRAM_EXE = __file__
DESCRIPTION = f"""
{PROGRAM_EXE} Proof of concept for OSM RDF 2022, CLI alternative \
(intended to run on dumps) to the proxy version.
"""
__EPILOGUM__ = f"""
------------------------------------------------------------------------------
EXEMPLŌRUM GRATIĀ
------------------------------------------------------------------------------
Read from file on disk . . . . . . . . . . . . . . . . . . . . . . . . . . . .
{PROGRAM_EXE} tmp/STP.osm
{PROGRAM_EXE} tmp/STP.osm > tmp/STP.osm.ttl
{PROGRAM_EXE} --filter-xml-tag='way' tmp/STP.osm > tmp/STP~ways.osm.ttl
Pipe from other commands . . . . . . . . . . . . . . . . . . . . . . . . . . .
cat tmp/STP.osm | {PROGRAM_EXE}
bzcat tmp/BRA-north.osm.bz2 | {PROGRAM_EXE}
bzcat tmp/BRA-north.osm.bz2 | {PROGRAM_EXE} --filter-xml-tag='relation'
Re-tag (without full inference) . . . . . . . . . . . . . . . . . . . . . . .
{PROGRAM_EXE} --retagger-file=tmp/tagger.rdfstxt.tsv tmp/STP.osm
Download external data examples ______________________________________________
Geofabrik download + decompress . . . . . . . . . . . . . . . . . . . . . . .
curl --output tmp/STP.osm.bz2 \
https://download.geofabrik.de/africa/sao-tome-and-principe-latest.osm.bz2
bunzip2 tmp/STP.osm.bz2
Geofabrik download (but not decompress) . . . . . . . . . . . . . . . . . . .
curl --output tmp/BRA-north.osm.bz2 \
https://download.geofabrik.de/south-america/brazil/norte-latest.osm.bz2
Overpass download examples . . . . . . . . . . . . . . . . . . . . . . . . . .
(See http://overpass-api.de/command_line.html)
curl --output tmp/target.osm --silent --globoff \
"https://overpass-api.de/api/interpreter?data=node[name=\"Gielgen\"];out;"
curl --output tmp/speed-200.osm --silent --globoff \
"https://overpass-api.de/api/interpreter?data=way[maxspeed=\"200\"];out;"
Other ________________________________________________________________________
Create not-so-smart-inference (hardcoded auto tagger) . . . . . . . . . . . . .
echo "-<TAB>*<TAB>*<TAB>created_by=*<NEWLINE>\
+<TAB><way>|<relation><TAB>*<TAB>ISO3166-1:alpha3=BRA<TAB>" \
> tmp/tagger.rdfstxt.tsv
Mapping . . . . . . . . . . . . . . . . . . . . . . . .
echo "-<TAB>*<TAB>*<TAB>created_by=*<NEWLINE>\
+<TAB><way>|<relation><TAB>*<TAB>ISO3166-1:alpha3=BRA<TAB>" \
> tmp/tagger.rdfstxt.tsv
------------------------------------------------------------------------------
EXEMPLŌRUM GRATIĀ
------------------------------------------------------------------------------
""".format(__file__)
# https://overpass-turbo.eu/s/1ohl
# way({{bbox}})[highway=residential]
# [maxspeed](if:t["maxspeed"]>120);
# out geom;
# @see also https://gis.stackexchange.com/questions/127315/filtering-overpass-api-by-country
# https://overpass-turbo.eu/s/1ohm
# area["ISO3166-1:alpha3"="BRA"]->.boundaryarea;
# (
# way(area.boundaryarea)[maxspeed](if:t["maxspeed"]>120);
# );
# out meta;
class Cli:
EXIT_OK = 0
EXIT_ERROR = 1
EXIT_SYNTAX = 2
venandum_insectum: bool = False # noqa: E701
def __init__(self):
"""
Constructs all the necessary attributes for the Cli object.
"""
def make_args(self, hxl_output=True):
parser = argparse.ArgumentParser(
prog=NOMEN,
description=DESCRIPTION,
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=__EPILOGUM__
)
parser.add_argument(
'infile',
help='Input file',
# required=False,
nargs='?'
)
parser.add_argument(
'--real-infile-path',
help='(Quick workaround for edge cases) in case infile becomes'
'ambigous on shell scripting, use this to force real source path',
dest='real_infile',
nargs='?',
default=None,
required=False,
)
parser.add_argument(
'--retagger-file',
help='(Poors man\'s inference) sPath to a TSV file to apply OSM '
'Tags before output RDF',
dest='retagger_file',
nargs='?',
default=None,
required=False,
)
parser.add_argument(
'--filter-xml-tag',
help='Filter XML tags',
dest='xml_tags',
nargs='?',
type=lambda x: x.split(','),
default=None
)
parser.add_argument(
'--filter-xml-tag-not',
help='Filter XML tags (not)',
dest='xml_tags_not',
nargs='?',
type=lambda x: x.split(','),
default=None
)
return parser.parse_args()
def execute_cli(
self, pyargs, stdin=STDIN, stdout=sys.stdout,
stderr=sys.stderr):
"""execute_cli"""
if pyargs.real_infile is not None:
_infile = pyargs.real_infile
_stdin = False
else:
if stdin.isatty():
_infile = pyargs.infile
_stdin = False
else:
_infile = None
_stdin = True
filter = OSMElementFilter()
if pyargs.xml_tags:
filter.set_filter_xml_tags(pyargs.xml_tags)
if pyargs.xml_tags_not:
filter.set_filter_xml_tags_not(pyargs.xml_tags_not)
retagger = None
if pyargs.retagger_file:
_file = open(pyargs.retagger_file,mode='r')
retagger = _file.read()
_file.close()
if _stdin:
osmrdf_xmldump2_ttl_v2(stdin, filter)
else:
osmrdf_xmldump2_ttl_v2(_infile, filter, retagger)
# print('todo')
if __name__ == "__main__":
est_cli = Cli()
args = est_cli.make_args()
# print(' >>>> args', args)
# raise ValueError(args)
est_cli.execute_cli(args)
# osmrdf_xmldump2_ttl('./tmp/sao-tome-and-principe-latest.osm')
# ./osmdump2rdfcli.py --help
# ./osmdump2rdfcli.py > ./tmp/sao-tome-and-principe-latest.osm.ttl
# rdfpipe --input-format='ttl' --output-format=longturtle ./tmp/sao-tome-and-principe-latest.osm.ttl > ./tmp/sao-tome-and-principe-latest~longturtle.osm.ttl
osmrdf2023.py
#!/usr/bin/env python3
# ==============================================================================
#
# FILE: osmrdf2023.py
#
# USAGE: # this is a library. Import into your code:
# from osmrdf2022 import *
#
# DESCRIPTION: ---
#
# OPTIONS: ---
#
# REQUIREMENTS: - python3
# - lxml
# BUGS: - No big XML dumps output format support (not yet)
# - No support for PBF Format (...not yet)
# NOTES: ---
# AUTHORS: Emerson Rocha <rocha[at]ieee.org>
# COLLABORATORS: ---
# LICENSE: Public Domain dedication or Zero-Clause BSD
# SPDX-License-Identifier: Unlicense OR 0BSD
# VERSION: v0.3.0
# CREATED: 2022-11-25 19:22:00Z v0.1.0 started
# REVISION: 2022-11-26 20:47:00Z v0.2.0 node, way, relation basic turtle,
# only attached tags (no <nd> <member> yet)
# 2022-12-21 01:46:00Z v0.3.0 osmrdf2022.py -> osmrdf2023.py
# ==============================================================================
import sys
from typing import List, Type
import xml.etree.ElementTree as XMLElementTree
from lxml import etree
# See also: https://wiki.openstreetmap.org/wiki/Sophox#How_OSM_data_is_stored
# See also https://wiki.openstreetmap.org/wiki/Elements
RDF_TURTLE_PREFIXES = [
'PREFIX geo: <http://www.opengis.net/ont/geosparql#>',
'PREFIX osmnode: <https://www.openstreetmap.org/node/>',
'PREFIX osmrel: <https://www.openstreetmap.org/relation/>',
'PREFIX osmway: <https://www.openstreetmap.org/way/>',
'PREFIX osmm: <https://example.org/todo-meta/>',
'PREFIX osmt: <https://wiki.openstreetmap.org/wiki/Key:>',
'PREFIX osmx: <https://example.org/todo-xref/>',
'PREFIX wikidata: <http://www.wikidata.org/entity/>',
'PREFIX xsd: <http://www.w3.org/2001/XMLSchema#>',
]
OSM_ELEMENT_PREFIX = {
'node': 'osmnode:',
'relation': 'osmrel:',
'tag': 'osmt:',
'way': 'osmway:'
}
# Using Sophox
OSM_ELEMENT_TYPE_LITERAL = {
'node': 'n',
'way': 'w',
'rel': 'r'
}
# Undocumented
# - osmx:hasnodes
# - osmx:hasmembers
# - osmx:hasrole{CUSTOM}
# @SEE blank nodes https://www.w3.org/TR/turtle/#h2_sec-examples
class OSMApiv06Xml:
"""OSMApiv06Xml
Not so optimized quick parser for XML files that can fit into memory
"""
iterator: None
xmlroot: None # xml.etree.ElementTree.Element
root_tag: str # Example: osm
root_attrib: dict
child_1_tag: str
child_1_attr: dict
# @TODO for full dumps, will have 1-n child items
def __init__(self, file_or_string: str) -> None:
# @TODO maybe eventually implement from file (the large ones)
# self.iterator = XMLElementTree.iterparse(
# source=file_or_string,
# events=('start', 'end')
# )
# self.iterator = XMLElementTree.fromstring(
# file_or_string,
# events=('start', 'end')
# )
root = XMLElementTree.fromstring(file_or_string)
self.xmlroot = root
self.root_tag = root.tag
self.root_attrib = root.attrib
def node(self):
for child in self.xmlroot:
# print('>>>>> el', child.tag, child.attrib)
# print('>>>>> el tags', child.findall("tag"))
xml_tags = None
_eltags = child.findall("tag")
if _eltags:
xml_tags = []
for item in _eltags:
xml_tags.append((item.attrib['k'], item.attrib['v']))
# print('>>>>> el nd', child.findall("nd"))
xml_nds = None
_elnds = child.findall("nd")
if _elnds:
xml_nds = []
for item in _elnds:
xml_nds.append(int(item.attrib['ref']))
xml_members = None
_elmembers = child.findall("member")
if _elmembers:
xml_members = []
for item in _elmembers:
# @FIXME this is incomplete
_type = item.attrib['type']
_ref = int(item.attrib['ref'])
_role = item.attrib['role'] if 'role' in item.attrib else None
xml_members.append((_type, _ref, _role))
# print('>>>>> el2', dict(child.attrib))
# # @TODO restrict here to node, way, relation, ...
# print('>>>>> el3', OSMElement(
# child.tag, dict(child.attrib)).__dict__)
return OSMElement(
child.tag,
dict(child.attrib),
xml_tags,
xml_nds,
xml_members
)
break
class OSMElement:
"""OSMElement generic container for primitives
Note: this will not do additional checks if input data is valid
"""
_basegroup: str
_tag: str
_el_osm_tags: List[tuple]
_el_osm_nds: List[int]
_el_osm_members: List[tuple]
_xml_filter: Type['OSMElementFilter']
_tagcaster: Type['OSMElementTagValueCast']
id: int
changeset: int
timestamp: str # maybe chage later
user: str
userid: str
version: str
visible: bool
lat: float
lon: float
def __init__(
self, tag: str, meta: dict,
xml_tags: List[tuple] = None,
xml_nds: List[int] = None,
xml_members: List[tuple] = None,
xml_filter: Type['OSMElementFilter'] = None,
tagcaster: Type['OSMElementTagValueCast'] = None,
):
if not isinstance(meta, dict):
meta = dict(meta)
self.id = int(meta['id']) if 'id' in meta else None
# self.version = float(meta['version']) if 'version' in meta else None
self.version = meta['version'] if 'version' in meta else None
self.changeset = int(
meta['changeset']) if 'changeset' in meta else None
self.timestamp = meta['timestamp'] if 'timestamp' in meta else None
self.user = meta['user'] if 'user' in meta else None
# uid = userid
self.userid = int(meta['uid']) if 'uid' in meta else None
self.lat = float(meta['lat']) if 'lat' in meta else None
self.lon = float(meta['lon']) if 'lon' in meta else None
self._tag = tag
self._basegroup = '{0}{1}'.format(
OSM_ELEMENT_PREFIX[tag], str(self.id))
self._el_osm_tags = xml_tags
self._el_osm_nds = xml_nds
self._el_osm_members = xml_members
self._xml_filter = xml_filter
self._tagcaster = tagcaster
def can_output(self) -> bool:
if not self._xml_filter.can_tag(self._tag):
return False
return True
def to_ttl(self) -> list:
data = []
data.append(self._basegroup)
if self.changeset:
data.append(f' osmm:changeset {self.changeset} ;')
if self.lat and self.lon:
data.append(
f' osmm:loc "Point({self.lat} {self.lon})"^^geo:wktLiteral ;')
if self.timestamp:
data.append(
f' osmm:timestamp "{self.timestamp}"^^xsd:dateTime ;')
if self._tag in OSM_ELEMENT_TYPE_LITERAL.keys():
data.append(
f' osmm:type "{OSM_ELEMENT_TYPE_LITERAL[self._tag]}" ;')
if self.user:
data.append(
f' osmm:user "{self.user}" ;')
if self.userid:
data.append(
f' osmm:userid {self.userid} ;')
if self.version:
data.append(
f' osmm:version {self.version} ;')
# data.append(' # TODO implement the tags')
if self._el_osm_tags:
for key, value in self._el_osm_tags:
_escp_key = osmrdf_tagkey_encode(key)
data.append(
f' osmt:{_escp_key} "{value}" ;')
if self._tagcaster and self._tagcaster.can_cast(_escp_key):
data.append(self._tagcaster.to_ttl(_escp_key, value))
if self._el_osm_nds:
_parts = []
for ref in self._el_osm_nds:
_parts.append(f'osmnode:{ref}')
data.append(
f' osmx:hasnodes ({" ".join(_parts)}) ;')
if self._el_osm_members:
_parts = []
for _type, _ref, _role in self._el_osm_members:
_prefix = OSM_ELEMENT_PREFIX[_type]
_parts.append(f'[osmx:hasrole{_role} {_prefix}{_ref}]')
data.append(
f' osmx:hasmembers ({" ".join(_parts)}) ;')
data.append('.')
return data
class OSMElementFilter:
"""Helper for OSMElement limit what to output
"""
xml_tags: List = None
xml_tags_not: List = None
def __init__(self) -> None:
pass
def set_filter_xml_tags(self, tags: list):
self.xml_tags = tags
return self
def set_filter_xml_tags_not(self, tags: list):
self.xml_tags_not = tags
return self
def can_tag(self, tag: str) -> bool:
if not self.xml_tags and not self.xml_tags_not:
return True
if (not self.xml_tags or tag in self.xml_tags) and \
(not self.xml_tags_not or tag not in self.xml_tags_not):
return True
return False
class OSMElementTagger:
"""Poor man's tagger (no external inference required)
@example
+ <way> * is_in=BRA
- <node>|<way>|<relation> * created_by=*
+ <way>|<relation> * shacl:lessThanOrEquals:maxspeed=120
"""
rules: list = None
def __init__(self, rules=None) -> None:
# if rules:
# self.rules =
if rules:
self.parse_rules(rules)
def parse_rules(self, rules_tsv: str):
parts = rules_tsv.splitlines()
# print('todooo')
try:
rules = []
for index, line in enumerate(parts):
line = line.split("\t")
# print(line)
_op = line[0]
_xml_tag = line[1].replace('<', '').replace('>', '').split('|')
_xml_attrs = line[2]
# print(line[3])
_xml_c_attr_key, _xml_c_attr_value = line[3].split('=')
# _xml_c_attr_key, _xml_c_attr_value = ['', '']
if _xml_tag[0] == '*':
_xml_tag = None
if _xml_attrs[0] == '*':
_xml_attrs = None
rules.append({
'i': index,
'op': _op,
'xt': _xml_tag,
'xa': _xml_attrs,
'xack': _xml_c_attr_key,
'xacv': _xml_c_attr_value,
})
if rules:
self.rules = rules
except Exception as err:
print(f"ERROR OSMElementTagger: {err}")
print('--- start of file ---')
print(rules_tsv)
print('--- end of file ---')
sys.exit()
pass
# print(rules_tsv, self.rules)
# sys.exit()
def retag(self, element: str, de_facto_tags: List[tuple] = None):
new_tags = de_facto_tags
if self.rules:
new_tags_temp = []
left_rules = list(range(0, len(self.rules)))
# print(left_rules, len(self.rules))
for rule in self.rules:
if rule['xt'] is not None and element not in rule['xt']:
left_rules.remove(rule['i'])
continue
# TODO implement attribute check
# new_tags_temp = new_tags
new_tags_temp = []
for tag_key, tag_value in new_tags:
if rule['op'] == '-':
if tag_key in rule['xack']:
left_rules.remove(rule['i'])
continue
if rule['op'] == '+':
if tag_key in rule['xack']:
tag_value = rule['xacv']
left_rules.remove(rule['i'])
break
else:
pass
# continue
new_tags_temp.append((tag_key, tag_value))
new_tags = new_tags_temp
if len(left_rules) > 0:
for rule_index in left_rules:
if self.rules[rule_index]['op'] == '+':
# print('TODO add ', self.rules[rule_index])
new_tags.append(
(self.rules[rule_index]['xack'], self.rules[rule_index]['xacv']))
# pass
# raise SyntaxError(self.rules[rule_index]['op'])
pass
# if element == 'relation':
# print('todo', element, de_facto_tags,
# self.rules, new_tags_temp, left_rules)
# sys.exit()
return new_tags
class OSMElementTagValueCast:
"""Coerse value of vanilla OpenStreetMap tags
"""
rules: list = None
def __init__(self, rules=None) -> None:
# if rules:
# self.rules =
if rules:
self.parse_rules(rules)
def parse_rules(self, rules_tsv: str):
parts = rules_tsv.splitlines()
# @TODO make it
def can_cast(self, tagkey: str) -> bool:
if tagkey in ['wikidata']:
return True
return False
def to_ttl(self, tagkey: str, tagvalue: str) -> str:
# return f' osmx:{tagkey} "{tagvalue}" ;'
return f' osmx:{tagkey} wikidata:{tagvalue} ;'
def osmrdf_node_xml2ttl(data_xml: str):
osmx = OSMApiv06Xml(data_xml)
osmnode = osmx.node()
output = []
output.extend(RDF_TURTLE_PREFIXES)
output.append('')
output.extend(osmnode.to_ttl())
output.append('')
# DEBUG: next 2 lines will print the XML node, commented
# comment = "# " + "\n# ".join(data_xml.split("\n"))
# output.append(comment)
return "\n".join(output)
def osmrdf_relation_xml2ttl(data_xml: str):
osmx = OSMApiv06Xml(data_xml)
osmnode = osmx.node()
output = []
output.extend(RDF_TURTLE_PREFIXES)
output.append('')
output.extend(osmnode.to_ttl())
output.append('')
# DEBUG: next 2 lines will print the XML node, commented
comment = "# " + "\n# ".join(data_xml.split("\n"))
output.append(comment)
return "\n".join(output)
def osmrdf_tagkey_encode(raw_tag: str) -> str:
# @TODO improve-me
# return raw_tag.replace(':', '%3A').replace(' ', '%20')
return raw_tag.replace(' ', '%20')
def osmrdf_way_xml2ttl(data_xml: str):
osmx = OSMApiv06Xml(data_xml)
osmnode = osmx.node()
# print(osmnode)
# print(type(osmnode))
# print(osmnode.to_ttl())
# print(type(osmnode.to_ttl()))
output = []
output.extend(RDF_TURTLE_PREFIXES)
output.append('')
output.extend(osmnode.to_ttl())
output.append('')
# DEBUG: next 2 lines will print the XML node, commented
# comment = "# " + "\n# ".join(data_xml.split("\n"))
# output.append(comment)
return "\n".join(output)
def osmrdf_xmldump2_ttl(xml_file_path, xml_filter: OSMElementFilter = None):
"""osmrdf_xmldump2_ttl _summary_
@deprecated will be replaced later
Args:
xml_file_path (_type_): _description_
xml_filter (OSMElementFilter, optional): _description_. Defaults to None.
"""
# @TODO document-me
from xml.etree import cElementTree as ET
all_records = []
print('\n'.join(RDF_TURTLE_PREFIXES))
print('')
count = 0
xml_tags = []
xml_nds = []
xml_members = []
for event, elem in ET.iterparse(xml_file_path, events=("start", "end")):
# if elem not in ['node', 'way', 'relation']
if elem.tag in ['bounds', 'osm']:
continue
# if elem.tag in ['nd', 'member', 'tag']:
if elem.tag in ['nd', 'member']:
# @FIXME way
continue
if event == 'start':
_eltags = elem.findall("tag")
if _eltags:
for item in _eltags:
xml_tags.append((item.attrib['k'], item.attrib['v']))
_elnds = elem.findall("nd")
if _elnds:
xml_nds = []
for item in _elnds:
xml_nds.append(int(item.attrib['ref']))
# xml_members = None
_elmembers = elem.findall("member")
if _elmembers:
xml_members = []
for item in _elmembers:
# @FIXME this is incomplete
_type = item.attrib['type']
_ref = int(item.attrib['ref'])
_role = item.attrib['role'] if 'role' in item.attrib else None
xml_members.append((_type, _ref, _role))
if event == 'end':
if elem.tag == 'tag':
continue
# print(elem, elem.attrib)
child = elem
# if xml_tags:
# print (xml_tags)
# sys.exit()
el = OSMElement(
child.tag,
dict(child.attrib),
xml_tags=xml_tags,
xml_nds=xml_nds,
xml_members=xml_members,
xml_filter=xml_filter
)
xml_tags = []
xml_nds = []
xml_members = []
if el.can_output():
print('\n'.join(el.to_ttl()) + '\n')
count += 1
# if count > 10:
# break
def osmrdf_xmldump2_ttl_v2(
xml_file_path,
xml_filter: OSMElementFilter = None, retagger: str = None):
# context = etree.iterparse(xml_file_path, events=('end',), tag='node')
# context = etree.iterparse(xml_file_path, events=('end',), tag=('way'))
print('\n'.join(RDF_TURTLE_PREFIXES))
print('')
# _tags = ('node', 'way', 'relation')
_tags = ('way', 'relation')
_rules = """+ <way> * is_in=BRA
- <node>|<way>|<relation> * created_by=*
+ <way>|<relation> * shacl:lessThanOrEquals:maxspeed=120"""
# retagger = OSMElementTagger(_rules)
retagger = OSMElementTagger(retagger)
tagcaster = OSMElementTagValueCast(None)
# context = etree.iterparse(xml_file_path, events=('end',), tag=_tags)
context = etree.iterparse(xml_file_path, events=('end',))
count = 0
xml_tags = []
xml_nds = []
xml_members = []
for _event, elem in context:
# if elem.tag in ['osm', 'bounds', 'nd', 'member', 'tag']:
if elem.tag in ['osm', 'bounds', 'nd', 'member', 'tag', 'tagi']:
continue
_eltags = elem.findall("tag")
if _eltags:
for item in _eltags:
xml_tags.append((item.attrib['k'], item.attrib['v']))
xml_tags = retagger.retag(elem.tag, xml_tags)
# "Implicit" tag "<tagi />"
_elimplicittags = elem.findall("tagi")
if _elimplicittags:
for item in _elimplicittags:
xml_tags.append((item.attrib['k'], item.attrib['v']))
xml_tags = retagger.retag(elem.tag, xml_tags)
_elnds = elem.findall("nd")
if _elnds:
xml_nds = []
for item in _elnds:
xml_nds.append(int(item.attrib['ref']))
_elmembers = elem.findall("member")
if _elmembers:
xml_members = []
for item in _elmembers:
# @FIXME this is incomplete
_type = item.attrib['type']
_ref = int(item.attrib['ref'])
_role = item.attrib['role'] if 'role' in item.attrib else None
xml_members.append((_type, _ref, _role))
el = OSMElement(
elem.tag,
dict(elem.attrib),
xml_tags=xml_tags,
xml_nds=xml_nds,
xml_members=xml_members,
xml_filter=xml_filter,
tagcaster=tagcaster,
)
xml_tags = []
xml_nds = []
xml_members = []
if el.can_output():
print('\n'.join(el.to_ttl()) + '\n')
count += 1
elem.clear()
while elem.getprevious() is not None:
del elem.getparent()[0]
# @TODO after Protobuf, maybe try some alternative which could allow
# search specific parts. See:
# - https://wiki.openstreetmap.org/wiki/SQLite
# - https://wiki.openstreetmap.org/wiki/SpatiaLite
# - https://github.com/osmzoso/osm2sqlite
#
Algorithm
This page migth contain only snippets of proof of concept, not the conventions themselves or their discussions
References
- Original repository (Python version): https://github.com/fititnt/openstreetmap-rdf-schema-rfc