Files
ifc-weather-mapping-mechanism/meteorogical_data/device_data_rdf.py

97 lines
4.6 KiB
Python

from rdflib import Graph, Namespace, URIRef, Literal, RDF, RDFS, XSD
from SPARQLWrapper import SPARQLWrapper, POST, BASIC, URLENCODED
class device():
def __init__(self, device_data_json, database, device_id):
self.device_data_json = device_data_json
self.database = database
self.device_id = device_id
def read_ttl_and_store_in_graphdb(self, ttl_file_path, graphdb_endpoint, graphdb_repository, username=None,
password=None):
"""
Reads RDF data from a Turtle file and stores it in a GraphDB repository.
:param ttl_file_path: Path to the Turtle file.
:param graphdb_endpoint: URL of the GraphDB endpoint.
:param graphdb_repository: Name of the GraphDB repository.
:param username: (Optional) Username for GraphDB authentication.
:param password: (Optional) Password for GraphDB authentication.
"""
# Create a graph
graph = Graph()
# Parse the Turtle file
graph.parse(ttl_file_path, format="turtle")
# Serialize the graph to a string in N-Triples format
ntriples_data = graph.serialize(format='nt')
# If ntriples_data is bytes, decode it to string
if isinstance(ntriples_data, bytes):
ntriples_data = ntriples_data.decode('utf-8')
# Create a SPARQLWrapper instance for the GraphDB endpoint
sparql = SPARQLWrapper(f"{graphdb_endpoint}/repositories/{graphdb_repository}/statements")
sparql.setMethod(POST)
sparql.setRequestMethod(URLENCODED)
sparql.addParameter('update', f"""
INSERT DATA {{
{ntriples_data}
}}
""")
# Set credentials if provided
if username and password:
sparql.setHTTPAuth(BASIC)
sparql.setCredentials(username, password)
# Send the request to the GraphDB repository
sparql.query()
def parse_device_data(self):
# Create a new RDF graph
g = Graph()
# Define a custom namespace
ontology = Namespace("http://sincere.org/s1#")
resource = Namespace("http://sincere.org/s1/resource/")
g.bind('sincere', ontology)
g.add((resource["device_" + self.device_id], RDF.type, ontology.Device))
g.add((resource["device_" + self.device_id], ontology.database, Literal(self.database, datatype=XSD.string)))
g.add((resource["device_" + self.device_id], RDFS.label, Literal(self.device_id, datatype=XSD.string)))
for dev in self.device_data_json:
channel = dev["channel"]
channel_id = dev["channel_id"]
constant = dev["constant"]
device_id = dev["device_id"]
item = dev["item"]
unit = dev["unit"]
writable = dev["writable"]
g.add((resource["device_" + self.device_id], ontology.hasChannel, resource["channel_" + channel_id]))
g.add((resource["channel_" + channel_id], RDF.type, ontology.Channel))
g.add((resource["channel_" + channel_id], ontology.number, Literal(channel, datatype=XSD.integer)))
g.add((resource["channel_" + channel_id], ontology.number, Literal(channel, datatype=XSD.integer)))
g.add((resource["channel_" + channel_id], ontology.constant, Literal(constant, datatype=XSD.string)))
g.add((resource["channel_" + channel_id], RDFS.label, Literal(channel_id, datatype=XSD.string)))
g.add((resource["channel_" + channel_id], ontology.device_id, Literal(device_id, datatype=XSD.string)))
g.add((resource["channel_" + channel_id], ontology.item, Literal(item, datatype=XSD.string)))
g.add((resource["channel_" + channel_id], ontology.unit, Literal(unit, datatype=XSD.string)))
g.add((resource["channel_" + channel_id], ontology.writable, Literal(writable, datatype=XSD.boolean)))
# Create RDF triples
# Serialize and save the RDF graph to a TTL file
file_path = "output_device.ttl"
g.serialize(destination=file_path, format="turtle")
graphdb_endpoint = 'http://localhost:7200' # Replace with the URL of your GraphDB endpoint
graphdb_repository = 'weather_measurement' # Replace with the name of your GraphDB repository
username = 'admin' # Replace with your GraphDB username (if required)
password = 'root' # Replace with your GraphDB password (if required)
self.read_ttl_and_store_in_graphdb(file_path, graphdb_endpoint, graphdb_repository, username, password)
print("Data was mapped in GraphDB")
return 1