first commit

This commit is contained in:
Alvaro Garcia Molino
2024-12-17 14:42:57 +01:00
commit c16f0b53e0
4 changed files with 359 additions and 0 deletions

41
core/config.py Normal file
View File

@@ -0,0 +1,41 @@
# -*- coding: utf-8 -*-
"""
class from config file
"""
from pydantic import Field
from pydantic_settings import BaseSettings, SettingsConfigDict
from rich.console import Console
console = Console()
class Settings(BaseSettings):
model_config = SettingsConfigDict()
timezone: str = Field(alias="TIMEZONE", default="Europe/Madrid")
agilent_ip: str = '192.168.2.26'
loop_time: int = 5
# Finot API platform
api_finot: bool = Field(alias="API_FINOT", default=False)
api_finot_loginId: str = "xxxxxxxxxxxxxx-in.eu" # login, please change
api_finot_password: str = "XXXXXXXXXXX" # password, please change
api_finot_refresh_token: str = "RhWyBEf_xxxxxxxxx-xxxxxxxxxxxxxxxxxxxxxxxxxxx" # refresh token
# Agilent channels mapping
agilent_output: dict = {
'TC1':101,
'TC2':102,
'VDC1':103
}
agilent_requests: dict = {
101: 'MEAS:TEMP?',
102: 'MEAS:TEMP?',
103: 'MEAS:VOLT:DC?'
}

53
core/formatter.py Normal file
View File

@@ -0,0 +1,53 @@
# -*- coding: utf-8 -*-
import logging
class CustomFormatter(logging.Formatter):
blue = "\x1b[38;5;39m"
grey = "\x1b[38;20m"
yellow = "\x1b[33;20m"
red = "\x1b[31;20m"
green = "\x1b[;32m"
bold_red = "\x1b[31;1m"
reset = "\x1b[0m"
format_error = "%(asctime)s - %(name)s - %(levelname)s - %(message)s (%(filename)s:%(lineno)d)"
format = "%(asctime)s - %(name)s "
FORMATS = {
logging.DEBUG: format + blue + "OUTPUT: " + reset + " %(message)s",
logging.INFO: format + green + "%(levelname)s: " + reset + " %(message)s",
logging.WARNING: format + yellow + "%(levelname)s: " + reset + " %(message)s",
logging.ERROR: red + format_error + reset,
logging.CRITICAL: bold_red + format_error + reset,
}
def format(self, record):
log_fmt = self.FORMATS.get(record.levelno)
formatter = logging.Formatter(log_fmt)
return formatter.format(record)
class Logger:
def __init__(self, name):
self.logger = logging.getLogger(name)
self.logger.setLevel(logging.DEBUG)
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
ch.setFormatter(CustomFormatter())
self.logger.addHandler(ch)
def info(self, msg):
return self.logger.info(msg)
def warning(self, msg):
return self.logger.warning(msg)
def error(self, msg):
return self.logger.error(msg)
def critical(self, msg):
return self.logger.critical(msg)
def debug(self, msg):
return self.logger.debug(msg)

89
main.py Normal file
View File

@@ -0,0 +1,89 @@
# -*- coding: utf-8 -*-
import pyvisa
import time
import asyncio
from src.api_finot_requests import ApiFinotCredentials
from core.config import Settings
config: Settings = Settings()
api_finot_obj = ApiFinotCredentials()
def connect_instrument(rm):
try:
my_instrument = rm.open_resource(
f'TCPIP::{config.agilent_ip}::inst0::INSTR', open_timeout=5)
my_instrument.timeout = 2
time.sleep(1)
return my_instrument
except Exception as err:
print("Error de conexión, reconectar.", err)
return False
async def agilent_visa():
rm = pyvisa.ResourceManager()
rm.list_resources()
print(rm.list_resources())
# connect to instrument
my_instrument = connect_instrument(rm)
while my_instrument is False:
my_instrument = connect_instrument(rm)
print("Reconectar...")
await asyncio.sleep(.5)
return False
# resetea el equipo
my_instrument.write("*RST")
# inicia canales por defecto como vdc
my_instrument.write('ROUT:SCAN (@101, 102, 103)')
try:
my_instrument.write('*IDN?')
print("IDN?", my_instrument.read())
time.sleep(.5)
except Exception as e:
print(e)
await asyncio.sleep(.5)
return False
# check channels estan iniciados
channels = my_instrument.query('ROUT:SCAN?')
channels = channels.strip()
print("Channels:", channels[channels.find('@')+1:-1].split(','))
while True:
finot_output = {}
try:
for mag, channel in config.agilent_output.items():
finot_output[mag] = float(
my_instrument.query(f"{config.agilent_requests[channel]} (@{channel})"))
await api_finot_obj.insert_values_api_finot(finot_output)
time.sleep(config.loop_time)
except Exception as e:
print(e)
print("Reset connection...")
time.sleep(4)
my_instrument = connect_instrument(rm)
async def create_entitie():
api_finot_obj = ApiFinotCredentials()
await api_finot_obj.create_file_api_finot(config.agilent_output)
# main
if __name__ == "__main__":
# Create entitie if not exist
# asyncio.run(create_entitie())
try:
asyncio.run(agilent_visa())
except KeyboardInterrupt:
exit(0)

176
src/api_finot_requests.py Normal file
View File

@@ -0,0 +1,176 @@
# -*- coding: utf-8 -*-
import requests
from pydantic import BaseModel
from core.formatter import Logger
from datetime import datetime
import pytz
from core.config import Settings
config: Settings = Settings()
gmt = pytz.timezone('Europe/Madrid')
logger = Logger("api-finot")
base_url = 'https://api.panoptis.finot.cloud'
headers = {
'Content-Type': 'application/json',
'User-Agent': 'Apidog/1.0.0 (https://apidog.com)'
}
#CREATE A NEW ONE
ENTITIE_ID = "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
class ApiFinotCredentials(BaseModel):
loginId: str = config.api_finot_loginId
password: str = config.api_finot_password
token: str = None
refresh_token: str = config.api_finot_refresh_token
def getTokenWithCredentials(self):
logger.info("Get API Finot token w/ credentials")
r = requests.post(
f'{base_url}/auth/jwt/login',
json = {
"loginId": self.loginId,
"password": self.password
},
headers = headers
).json()
status = r.get('status')
if status == 200:
logger.info(r.get('result').get('token'))
self.token = r.get('result').get('token')
self.refresh_token = r.get('result').get('refreshToken')
logger.info(f"New refreshToken: {self.refresh_token}")
else:
logger.info(f"Error: {r}")
def getRefreshToken(self):
logger.info("Refreshing API Finot token")
r = requests.post(
f'{base_url}/auth/jwt/refresh',
json = {
"refreshToken": self.refresh_token
},
headers = headers
).json()
status = r.get('status')
if status == 200:
logger.info(status)
self.token = r.get('result').get('token')
elif status == 401:
logger.info(f"Invalid refreshToken: {r}")
self.getTokenWithCredentials()
else:
logger.info(f"Error: {r}")
async def create_file_api_finot(self, agilent_output: dict):
now = datetime.now(gmt)
fecha_hora_utc = now.astimezone(pytz.utc)
formatted_datetime = fecha_hora_utc.strftime('%Y-%m-%dT%H:%M:%SZ')
body = {
"id": ENTITIE_ID,
"type": "XXXXXXXXXXXXXX", # Name of entitie
"location": {
"type": "geo:json",
"value": {
"type": "Point",
"coordinates": [
-3.577428584591273,
40.59121368845003
] # Change the Coordinates
}
},
"measurement": {
"type": "Text",
"value": True
}
}
meta_channel = {
"type": "Number",
"value": 0,
"metadata": {
"isDynamic": {
"type": "Boolean",
"value": True
},
"unitCode": {
"type": "Text",
"value": "CEL"
}
}
}
for mag, channel in agilent_output.items():
body.update({
mag: meta_channel})
status = 0
while status != 200:
headers.update({
'X-Tenant': 'tenant1',
'X-TimeIndex-Attribute': formatted_datetime,
'Authorization': f'Bearer {self.token}'
})
logger.info(body)
r = requests.post(
f'{base_url}/inventory/v1.1/objects',
json = body,
headers = headers
).json()
if r.get('status') == 401:
status = 401
self.getRefreshToken()
else:
status = 200
async def insert_values_api_finot(self, finot_output):
fecha_hora_madrid_obj = datetime.now()
fecha_hora_utc = fecha_hora_madrid_obj.astimezone(
pytz.timezone('Europe/Madrid'))
formatted_datetime = fecha_hora_utc.strftime(
'%Y-%m-%dT%H:%M:%SZ')
logger.info(f"Insert values API Finot: {finot_output}")
finot_registers = {
key: {
'type': 'Number', 'value': value
} for key, value in finot_output.items()
}
if config.api_finot is True:
status = 0
while status != 200:
headers.update({
'X-Tenant': 'tenant1',
'X-TimeIndex-Attribute': formatted_datetime,
'Authorization': f'Bearer {self.token}'
})
r = requests.patch(
f'{base_url}/inventory/v1/objects/{ENTITIE_ID}',
json = finot_registers,
headers = headers
).json()
logger.info(r.get('status'))
if r.get('status') == 401:
status = 401
self.getRefreshToken()
else:
status = 200