97 lines
4.6 KiB
Python
97 lines
4.6 KiB
Python
from rdflib import Graph, Namespace, URIRef, Literal, RDF, RDFS, XSD
|
|
from SPARQLWrapper import SPARQLWrapper, POST, BASIC, URLENCODED
|
|
|
|
class device():
|
|
|
|
|
|
def __init__(self, device_data_json, database, device_id):
|
|
self.device_data_json = device_data_json
|
|
self.database = database
|
|
self.device_id = device_id
|
|
|
|
def read_ttl_and_store_in_graphdb(self, ttl_file_path, graphdb_endpoint, graphdb_repository, username=None,
|
|
password=None):
|
|
"""
|
|
Reads RDF data from a Turtle file and stores it in a GraphDB repository.
|
|
|
|
:param ttl_file_path: Path to the Turtle file.
|
|
:param graphdb_endpoint: URL of the GraphDB endpoint.
|
|
:param graphdb_repository: Name of the GraphDB repository.
|
|
:param username: (Optional) Username for GraphDB authentication.
|
|
:param password: (Optional) Password for GraphDB authentication.
|
|
"""
|
|
# Create a graph
|
|
graph = Graph()
|
|
|
|
# Parse the Turtle file
|
|
graph.parse(ttl_file_path, format="turtle")
|
|
|
|
# Serialize the graph to a string in N-Triples format
|
|
ntriples_data = graph.serialize(format='nt')
|
|
|
|
# If ntriples_data is bytes, decode it to string
|
|
if isinstance(ntriples_data, bytes):
|
|
ntriples_data = ntriples_data.decode('utf-8')
|
|
|
|
# Create a SPARQLWrapper instance for the GraphDB endpoint
|
|
sparql = SPARQLWrapper(f"{graphdb_endpoint}/repositories/{graphdb_repository}/statements")
|
|
sparql.setMethod(POST)
|
|
sparql.setRequestMethod(URLENCODED)
|
|
sparql.addParameter('update', f"""
|
|
INSERT DATA {{
|
|
{ntriples_data}
|
|
}}
|
|
""")
|
|
|
|
# Set credentials if provided
|
|
if username and password:
|
|
sparql.setHTTPAuth(BASIC)
|
|
sparql.setCredentials(username, password)
|
|
|
|
# Send the request to the GraphDB repository
|
|
sparql.query()
|
|
|
|
def parse_device_data(self):
|
|
# Create a new RDF graph
|
|
g = Graph()
|
|
# Define a custom namespace
|
|
ontology = Namespace("http://sincere.org/s1#")
|
|
resource = Namespace("http://sincere.org/s1/resource/")
|
|
g.bind('sincere', ontology)
|
|
|
|
g.add((resource["device_" + self.device_id], RDF.type, ontology.Device))
|
|
g.add((resource["device_" + self.device_id], ontology.database, Literal(self.database, datatype=XSD.string)))
|
|
g.add((resource["device_" + self.device_id], RDFS.label, Literal(self.device_id, datatype=XSD.string)))
|
|
|
|
for dev in self.device_data_json:
|
|
channel = dev["channel"]
|
|
channel_id = dev["channel_id"]
|
|
constant = dev["constant"]
|
|
device_id = dev["device_id"]
|
|
item = dev["item"]
|
|
unit = dev["unit"]
|
|
writable = dev["writable"]
|
|
g.add((resource["device_" + self.device_id], ontology.hasChannel, resource["channel_" + channel_id]))
|
|
g.add((resource["channel_" + channel_id], RDF.type, ontology.Channel))
|
|
g.add((resource["channel_" + channel_id], ontology.number, Literal(channel, datatype=XSD.integer)))
|
|
g.add((resource["channel_" + channel_id], ontology.number, Literal(channel, datatype=XSD.integer)))
|
|
g.add((resource["channel_" + channel_id], ontology.constant, Literal(constant, datatype=XSD.string)))
|
|
g.add((resource["channel_" + channel_id], RDFS.label, Literal(channel_id, datatype=XSD.string)))
|
|
g.add((resource["channel_" + channel_id], ontology.device_id, Literal(device_id, datatype=XSD.string)))
|
|
g.add((resource["channel_" + channel_id], ontology.item, Literal(item, datatype=XSD.string)))
|
|
g.add((resource["channel_" + channel_id], ontology.unit, Literal(unit, datatype=XSD.string)))
|
|
g.add((resource["channel_" + channel_id], ontology.writable, Literal(writable, datatype=XSD.boolean)))
|
|
|
|
# Create RDF triples
|
|
# Serialize and save the RDF graph to a TTL file
|
|
file_path = "output_device.ttl"
|
|
g.serialize(destination=file_path, format="turtle")
|
|
|
|
graphdb_endpoint = 'http://localhost:7200' # Replace with the URL of your GraphDB endpoint
|
|
graphdb_repository = 'weather_measurement' # Replace with the name of your GraphDB repository
|
|
username = 'admin' # Replace with your GraphDB username (if required)
|
|
password = 'root' # Replace with your GraphDB password (if required)
|
|
|
|
self.read_ttl_and_store_in_graphdb(file_path, graphdb_endpoint, graphdb_repository, username, password)
|
|
print("Data was mapped in GraphDB")
|
|
return 1 |