Back Alive!

This commit is contained in:
Mahasri Kalavala
2019-04-17 18:46:06 -04:00
parent 929e0b336e
commit 2a0dd39795
255 changed files with 137143 additions and 0 deletions

View File

@@ -0,0 +1,295 @@
"""
Support for Google Geocode sensors.
For more details about this platform, please refer to the documentation at
https://github.com/michaelmcarthur/GoogleGeocode-HASS
Written By Michael McArthur
https://github.com/michaelmcarthur/GoogleGeocode-HASS
"""
from datetime import datetime
from datetime import timedelta
import logging
import json
import requests
from requests import get
import voluptuous as vol
from homeassistant.components.sensor import PLATFORM_SCHEMA
from homeassistant.const import (
CONF_API_KEY, CONF_NAME, CONF_SCAN_INTERVAL, ATTR_ATTRIBUTION, ATTR_LATITUDE, ATTR_LONGITUDE)
import homeassistant.helpers.location as location
from homeassistant.util import Throttle
from homeassistant.helpers.entity import Entity
import homeassistant.helpers.config_validation as cv
_LOGGER = logging.getLogger(__name__)
CONF_ORIGIN = 'origin'
CONF_OPTIONS = 'options'
CONF_DISPLAY_ZONE = 'display_zone'
CONF_ATTRIBUTION = "Data provided by maps.google.com"
ATTR_STREET_NUMBER = 'Street Number'
ATTR_STREET = 'Street'
ATTR_CITY = 'City'
ATTR_POSTAL_TOWN = 'Postal Town'
ATTR_POSTAL_CODE = 'Postal Code'
ATTR_REGION = 'State'
ATTR_COUNTRY = 'Country'
ATTR_COUNTY = 'County'
ATTR_FORMATTED_ADDRESS = 'Formatted Address'
DEFAULT_NAME = 'Google Geocode'
DEFAULT_OPTION = 'street, city'
DEFAULT_DISPLAY_ZONE = 'display'
DEFAULT_KEY = 'no key'
current = '0,0'
zone_check = 'a'
SCAN_INTERVAL = timedelta(seconds=60)
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
vol.Required(CONF_ORIGIN): cv.string,
vol.Optional(CONF_API_KEY, default=DEFAULT_KEY): cv.string,
vol.Optional(CONF_OPTIONS, default=DEFAULT_OPTION): cv.string,
vol.Optional(CONF_DISPLAY_ZONE, default=DEFAULT_DISPLAY_ZONE): cv.string,
vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string,
vol.Optional(CONF_SCAN_INTERVAL, default=SCAN_INTERVAL):
cv.time_period,
})
TRACKABLE_DOMAINS = ['device_tracker', 'sensor']
def setup_platform(hass, config, add_devices, discovery_info=None):
"""Setup the sensor platform."""
name = config.get(CONF_NAME)
api_key = config.get(CONF_API_KEY)
origin = config.get(CONF_ORIGIN)
options = config.get(CONF_OPTIONS)
display_zone = config.get(CONF_DISPLAY_ZONE)
add_devices([GoogleGeocode(hass, origin, name, api_key, options, display_zone)])
class GoogleGeocode(Entity):
"""Representation of a Google Geocode Sensor."""
def __init__(self, hass, origin, name, api_key, options, display_zone):
"""Initialize the sensor."""
self._hass = hass
self._name = name
self._api_key = api_key
self._options = options.lower()
self._display_zone = display_zone.lower()
self._state = "Awaiting Update"
self._street_number = None
self._street = None
self._city = None
self._postal_town = None
self._postal_code = None
self._city = None
self._region = None
self._country = None
self._county = None
self._formatted_address = None
self._zone_check_current = None
# Check if origin is a trackable entity
if origin.split('.', 1)[0] in TRACKABLE_DOMAINS:
self._origin_entity_id = origin
else:
self._origin = origin
@property
def name(self):
"""Return the name of the sensor."""
return self._name
@property
def state(self):
"""Return the state of the sensor."""
return self._state
@property
def device_state_attributes(self):
"""Return the state attributes."""
return{
ATTR_STREET_NUMBER: self._street_number,
ATTR_STREET: self._street,
ATTR_CITY: self._city,
ATTR_POSTAL_TOWN: self._postal_town,
ATTR_POSTAL_CODE: self._postal_code,
ATTR_REGION: self._region,
ATTR_COUNTRY: self._country,
ATTR_COUNTY: self._county,
ATTR_ATTRIBUTION: CONF_ATTRIBUTION,
ATTR_FORMATTED_ADDRESS: self._formatted_address,
}
@Throttle(SCAN_INTERVAL)
def update(self):
"""Get the latest data and updates the states."""
if hasattr(self, '_origin_entity_id'):
self._origin = self._get_location_from_entity(
self._origin_entity_id
)
"""Update if location has changed."""
global current
global zone_check_count
global zone_check
global user_display
zone_check = self.hass.states.get(self._origin_entity_id).state
zone_check_count = 2
if zone_check == self._zone_check_current:
zone_check_count = 1
if zone_check == 'not_home':
zone_check_count = 2
if zone_check_count == 1:
pass
elif self._origin == None:
pass
elif current == self._origin:
pass
else:
_LOGGER.info("google request sent")
self._zone_check_current = self.hass.states.get(self._origin_entity_id).state
zone_check_count = 2
lat = self._origin
current = lat
self._reset_attributes()
if self._api_key == 'no key':
url = "https://maps.google.com/maps/api/geocode/json?latlng=" + lat
else:
url = "https://maps.googleapis.com/maps/api/geocode/json?latlng=" + lat + "&key=" + self._api_key
response = get(url)
json_input = response.text
decoded = json.loads(json_input)
street_number = ''
street = 'Unnamed Road'
alt_street = 'Unnamed Road'
city = ''
postal_town = ''
formatted_address = ''
state = ''
county = ''
country = ''
for result in decoded["results"]:
for component in result["address_components"]:
if 'street_number' in component["types"]:
street_number = component["long_name"]
self._street_number = street_number
if 'route' in component["types"]:
street = component["long_name"]
self._street = street
if 'sublocality_level_1' in component["types"]:
alt_street = component["long_name"]
if 'postal_town' in component["types"]:
postal_town = component["long_name"]
self._postal_town = postal_town
if 'locality' in component["types"]:
city = component["long_name"]
self._city = city
if 'administrative_area_level_1' in component["types"]:
state = component["long_name"]
self._region = state
if 'administrative_area_level_2' in component["types"]:
county = component["long_name"]
self._county = county
if 'country' in component["types"]:
country = component["long_name"]
self._country = country
if 'postal_code' in component["types"]:
postal_code = component["long_name"]
self._postal_code = postal_code
if 'formatted_address' in decoded['results'][0]:
formatted_address = decoded['results'][0]['formatted_address']
self._formatted_address = formatted_address
if 'error_message' in decoded:
self._state = decoded['error_message']
_LOGGER.error("You have exceded your daily requests plase create an api key.")
elif self._display_zone == 'hide' or zone_check == "not_home":
if street == 'Unnamed Road':
street = alt_street
self._street = alt_street
if city == '':
city = postal_town
if city == '':
city = county
display_options = self._options
user_display = []
if "street_number" in display_options:
user_display.append(street_number)
if "street" in display_options:
user_display.append(street)
if "city" in display_options:
self._append_to_user_display(city)
if "county" in display_options:
self._append_to_user_display(county)
if "state" in display_options:
self._append_to_user_display(state)
if "postal_code" in display_options:
self._append_to_user_display(postal_code)
if "country" in display_options:
self._append_to_user_display(country)
if "formatted_address" in display_options:
self._append_to_user_display(formatted_address)
user_display = ', '.join( x for x in user_display )
if user_display == '':
user_display = street
self._state = user_display
else:
self._state = zone_check[0].upper() + zone_check[1:]
def _get_location_from_entity(self, entity_id):
"""Get the origin from the entity state or attributes."""
entity = self._hass.states.get(entity_id)
if entity is None:
_LOGGER.error("Unable to find entity %s", entity_id)
return None
# Check if the entity has origin attributes
if location.has_location(entity):
return self._get_location_from_attributes(entity)
# When everything fails just return nothing
return None
def _reset_attributes(self):
"""Resets attributes."""
self._street = None
self._street_number = None
self._city = None
self._postal_town = None
self._postal_code = None
self._region = None
self._country = None
self._county = None
self._formatted_address = None
def _append_to_user_display(self, append_check):
"""Appends attribute to state if false."""
if append_check == "":
pass
else:
user_display.append(append_check)
@staticmethod
def _get_location_from_attributes(entity):
"""Get the lat/long string from an entities attributes."""
attr = entity.attributes
return "%s,%s" % (attr.get(ATTR_LATITUDE), attr.get(ATTR_LONGITUDE))

View File

@@ -0,0 +1,267 @@
"""
@ Author : Suresh Kalavala
@ Date : 05/24/2017
@ Description : Life360 Sensor - It queries Life360 API and retrieves
data at a specified interval and dumps into MQTT
@ Notes: Copy this file and place it in your
"Home Assistant Config folder\custom_components\sensor\" folder
Copy corresponding Life360 Package frommy repo,
and make sure you have MQTT installed and Configured
Make sure the life360 password doesn't contain '#' or '$' symbols
"""
from datetime import timedelta
import logging
import subprocess
import json
import voluptuous as vol
import homeassistant.components.mqtt as mqtt
from io import StringIO
from homeassistant.components.mqtt import (CONF_STATE_TOPIC, CONF_COMMAND_TOPIC, CONF_QOS, CONF_RETAIN)
from homeassistant.helpers import template
from homeassistant.exceptions import TemplateError
from homeassistant.components.sensor import PLATFORM_SCHEMA
from homeassistant.const import (
CONF_NAME, CONF_VALUE_TEMPLATE, CONF_UNIT_OF_MEASUREMENT,
STATE_UNKNOWN)
from homeassistant.helpers.entity import Entity
import homeassistant.helpers.config_validation as cv
_LOGGER = logging.getLogger(__name__)
DEPENDENCIES = ['mqtt']
DEFAULT_NAME = 'Life360 Sensor'
CONST_MQTT_TOPIC = "mqtt_topic"
CONST_STATE_ERROR = "error"
CONST_STATE_RUNNING = "running"
CONST_USERNAME = "username"
CONST_PASSWORD = "password"
COMMAND1 = "curl -s -X POST -H \"Authorization: Basic cFJFcXVnYWJSZXRyZTRFc3RldGhlcnVmcmVQdW1hbUV4dWNyRUh1YzptM2ZydXBSZXRSZXN3ZXJFQ2hBUHJFOTZxYWtFZHI0Vg==\" -F \"grant_type=password\" -F \"username=USERNAME360\" -F \"password=PASSWORD360\" https://api.life360.com/v3/oauth2/token.json | grep -Po '(?<=\"access_token\":\")\\w*'"
COMMAND2 = "curl -s -X GET -H \"Authorization: Bearer ACCESS_TOKEN\" https://api.life360.com/v3/circles.json | grep -Po '(?<=\"id\":\")[\\w-]*'"
COMMAND3 = "curl -s -X GET -H \"Authorization: Bearer ACCESS_TOKEN\" https://api.life360.com/v3/circles/ID"
SCAN_INTERVAL = timedelta(seconds=60)
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
vol.Required(CONST_USERNAME): cv.string,
vol.Required(CONST_PASSWORD): cv.string,
vol.Required(CONST_MQTT_TOPIC): cv.string,
vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string,
vol.Optional(CONF_UNIT_OF_MEASUREMENT): cv.string,
vol.Optional(CONF_VALUE_TEMPLATE): cv.template,
})
# pylint: disable=unused-argument
def setup_platform(hass, config, add_devices, discovery_info=None):
"""Set up the Life360 Sensor."""
name = config.get(CONF_NAME)
username = config.get(CONST_USERNAME)
password = config.get(CONST_PASSWORD)
mqtt_topic = config.get(CONST_MQTT_TOPIC)
unit = config.get(CONF_UNIT_OF_MEASUREMENT)
value_template = config.get(CONF_VALUE_TEMPLATE)
if value_template is not None:
value_template.hass = hass
data = Life360SensorData(username, password, COMMAND1, COMMAND2, COMMAND3, mqtt_topic, hass)
add_devices([Life360Sensor(hass, data, name, unit, value_template)])
class Life360Sensor(Entity):
"""Representation of a sensor."""
def __init__(self, hass, data, name, unit_of_measurement, value_template):
"""Initialize the sensor."""
self._hass = hass
self.data = data
self._name = name
self._state = STATE_UNKNOWN
self._unit_of_measurement = unit_of_measurement
self._value_template = value_template
self.update()
@property
def name(self):
"""Return the name of the sensor."""
return self._name
@property
def unit_of_measurement(self):
"""Return the unit the value is expressed in."""
return self._unit_of_measurement
@property
def state(self):
"""Return the state of the device."""
return self._state
def update(self):
"""Get the latest data and updates the state."""
self.data.update()
value = self.data.value
if value is None:
value = STATE_UNKNOWN
elif self._value_template is not None:
self._state = self._value_template.render_with_possible_json_value(
value, STATE_UNKNOWN)
else:
self._state = value
class Life360SensorData(object):
"""The class for handling the data retrieval."""
def __init__(self, username, password, command1, command2, command3, mqtt_topic, hass):
"""Initialize the data object."""
self.username = username
self.password = password
self.COMMAND_ACCESS_TOKEN = command1
self.COMMAND_ID = command2
self.COMMAND_MEMBERS = command3
self.hass = hass
self.value = None
self.mqtt_topic = mqtt_topic
self.mqtt_retain = True
self.mqtt_qos = 0
def update(self):
try:
""" Prepare and Execute Commands """
self.COMMAND_ACCESS_TOKEN = self.COMMAND_ACCESS_TOKEN.replace("USERNAME360", self.username)
self.COMMAND_ACCESS_TOKEN = self.COMMAND_ACCESS_TOKEN.replace("PASSWORD360", self.password)
access_token = self.exec_shell_command( self.COMMAND_ACCESS_TOKEN )
if access_token == None:
self.value = CONST_STATE_ERROR
return None
self.COMMAND_ID = self.COMMAND_ID.replace("ACCESS_TOKEN", access_token)
id = self.exec_shell_command( self.COMMAND_ID )
if id == None:
self.value = CONST_STATE_ERROR
return None
self.COMMAND_MEMBERS = self.COMMAND_MEMBERS.replace("ACCESS_TOKEN", access_token)
self.COMMAND_MEMBERS = self.COMMAND_MEMBERS.replace("ID", id)
payload = self.exec_shell_command( self.COMMAND_MEMBERS )
if payload != None:
self.save_payload_to_mqtt ( self.mqtt_topic, payload )
data = json.loads ( payload )
for member in data["members"]:
topic = StringBuilder()
topic.Append("owntracks/")
topic.Append(member["firstName"].lower())
topic.Append("/")
topic.Append(member["firstName"].lower())
topic = topic
msgPayload = StringBuilder()
msgPayload.Append("{")
msgPayload.Append("\"t\":\"p\"")
msgPayload.Append(",")
msgPayload.Append("\"tst\":")
msgPayload.Append(member['location']['timestamp'])
msgPayload.Append(",")
msgPayload.Append("\"acc\":")
msgPayload.Append(member['location']['accuracy'])
msgPayload.Append(",")
msgPayload.Append("\"_type\":\"location\"")
msgPayload.Append(",")
msgPayload.Append("\"alt\":\"0\"")
msgPayload.Append(",")
msgPayload.Append("\"_cp\":\"false\"")
msgPayload.Append(",")
msgPayload.Append("\"lon\":")
msgPayload.Append(member['location']['longitude'])
msgPayload.Append(",")
msgPayload.Append("\"lat\":")
msgPayload.Append(member['location']['latitude'])
msgPayload.Append(",")
msgPayload.Append("\"batt\":")
msgPayload.Append(member['location']['battery'])
msgPayload.Append(",")
if str(member['location']['wifiState']) == "1":
msgPayload.Append("\"conn\":\"w\"")
msgPayload.Append(",")
msgPayload.Append("\"vel\":")
msgPayload.Append(str(member['location']['speed']))
msgPayload.Append(",")
msgPayload.Append("\"charging\":")
msgPayload.Append(member['location']['charge'])
msgPayload.Append("}")
self.save_payload_to_mqtt ( str(topic), str(msgPayload) )
self.value = CONST_STATE_RUNNING
else:
self.value = CONST_STATE_ERROR
except Exception as e:
self.value = CONST_STATE_ERROR
def exec_shell_command( self, command ):
output = None
try:
output = subprocess.check_output( command, shell=True, timeout=50 )
output = output.strip().decode('utf-8')
except subprocess.CalledProcessError:
""" _LOGGER.error("Command failed: %s", command)"""
self.value = CONST_STATE_ERROR
output = None
except subprocess.TimeoutExpired:
""" _LOGGER.error("Timeout for command: %s", command)"""
self.value = CONST_STATE_ERROR
output = None
if output == None:
_LOGGER.error( "Life360 has not responsed well. Nothing to worry, will try again!" )
self.value = CONST_STATE_ERROR
return None
else:
return output
def save_payload_to_mqtt( self, topic, payload ):
try:
"""mqtt.async_publish ( self.hass, topic, payload, self.mqtt_qos, self.mqtt_retain )"""
_LOGGER.info("topic: %s", topic)
_LOGGER.info("payload: %s", payload)
mqtt.publish ( self.hass, topic, payload, self.mqtt_qos, self.mqtt_retain )
except:
_LOGGER.error( "Error saving Life360 data to mqtt." )
class StringBuilder:
_file_str = None
def __init__(self):
self._file_str = StringIO()
def Append(self, str):
self._file_str.write(str)
def __str__(self):
return self._file_str.getvalue()

View File

@@ -0,0 +1,269 @@
"""
@Author: Suresh Kalavala
@Date: 03/03/2018
Custom Sensor: Palo Alto device integration with Home Assistant.
For more details about this platform, please refer to the documentation at
https://home-assistant.io/components/sensor.paloalto/
"""
import ssl
import logging
import urllib.request
import voluptuous as vol
import homeassistant.helpers.config_validation as cv
import xml.etree.ElementTree as ET
from enum import Enum
from datetime import timedelta
from homeassistant.helpers.entity import Entity
from homeassistant.components.sensor import PLATFORM_SCHEMA
from homeassistant.const import (CONF_NAME, CONF_API_KEY, CONF_IP_ADDRESS,
CONF_SSL, CONF_VERIFY_SSL,
CONF_MONITORED_CONDITIONS)
_LOGGER = logging.getLogger(__name__)
DEFAULT_NAME = 'PaloAlto'
DEFAULT_SSL = False
DEFAULT_VERIFY_SSL = True
CONST_COMMAND = "COMMAND"
CONST_OPS_ENDPOINT = '/api/?type=op&cmd=COMMAND'
CONST_CONFIG_ENDPOINT = '/api/?type=config&action=get&xpath=COMMAND'
PA_OPS_ACTIVE_USERS = "<show><admins></admins></show>"
PA_CONF_SYS_INFO = "<show><system><info></info></system></show>"
PA_CONF_GP_USERS = "<show><global-protect-portal><current-user>" \
"</current-user></global-protect-portal></show>"
PA_CONF_TEMPERATURE = "<show><system><environmentals><thermal>" \
"</thermal></environmentals></system></show>"
SCAN_INTERVAL = timedelta(seconds=120)
MONITORED_CONDITIONS = {
'host_name': ['Host Name', 'x', 'mdi:fire'],
'up_time': ['Up Time', 'x', 'mdi:clock'],
'serial_no': ['Serial Number', 'x', 'mdi:counter'],
'sw_version': ['Software Version', 'x', 'mdi:counter'],
'gp_version': ['Global protect Version', 'x', 'mdi:counter'],
'logdb_version': ['LogDB Version', 'x', 'mdi:book-open'],
'operation_mode': ['Operation Mode', 'x', 'mdi:book-open'],
'core_temp': ['Core Temperature', 'x', 'mdi:oil-temperature'],
'sys_temp': ['System Temperature', 'x', 'mdi:oil-temperature'],
'gp_user_count': ['Global Protect User Count', 'vpn users', 'mdi:counter'],
'gp_users': ['Global Protect Users', 'vpn users', 'mdi:account-multiple'],
'loggedin_user_count': ['Loggedin User Count', 'users', 'mdi:counter'],
'loggedin_users': ['Loggedin Users', 'users', 'mdi:account-multiple'],
}
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string,
vol.Required(CONF_API_KEY): cv.string,
vol.Required(CONF_IP_ADDRESS): cv.string,
vol.Optional(CONF_SSL, default=DEFAULT_SSL): cv.boolean,
vol.Optional(CONF_MONITORED_CONDITIONS,
default=list(MONITORED_CONDITIONS)):
vol.All(cv.ensure_list, [vol.In(MONITORED_CONDITIONS)]),
vol.Optional(CONF_VERIFY_SSL, default=DEFAULT_VERIFY_SSL): cv.boolean,
})
# pylint: disable=unused-argument
def setup_platform(hass, config, add_devices, discovery_info=None):
"""Set up the Palo Alto VPN User Sensor."""
name = config.get(CONF_NAME)
host = config.get(CONF_IP_ADDRESS)
use_ssl = config.get(CONF_SSL)
verify_ssl = config.get(CONF_VERIFY_SSL)
api_key = config.get(CONF_API_KEY)
sensors = []
try:
api = PaloAltoApi(host, use_ssl, verify_ssl, api_key)
for condition in config[CONF_MONITORED_CONDITIONS]:
sensor = PaloAltoSensor(hass, api, name, condition)
sensors.append(sensor)
add_devices(sensors, True)
except Exception as err:
_LOGGER.error("Failed to setup Palo Alto Sensor. Error: " + str(err))
class PaloAltoSensor(Entity):
"""Representation of a sensor."""
def __init__(self, hass, api, name, variable):
"""Initialize the sensor."""
self._hass = hass
self._api = api
self._name = name
self._var_id = variable
variable_info = MONITORED_CONDITIONS[variable]
self._var_name = variable_info[0]
self._var_units = variable_info[1]
self._var_icon = variable_info[2]
@property
def name(self):
"""Return the name of the sensor."""
return "{} {}".format(self._name, self._var_name)
@property
def icon(self):
"""Icon to use in the frontend, if any."""
return self._var_icon
@property
def state(self):
"""Return the state of the device."""
return self._api.data[self._var_id]
@property
def available(self):
"""Could the device be accessed during the last update call."""
return self._api.available
def update(self):
"""Get the latest data and updates the state."""
self._api.update()
class PaloAltoApi(object):
"""The class for handling the data retrieval from Palo Alto Device."""
def __init__(self, host, use_ssl, verify_ssl, api_key):
"""Initialize the Palo Alto API."""
self._host = host
self._use_ssl = use_ssl
self._verify_ssl = verify_ssl
self._api_key = api_key
self._usersdata = None
self._sysinfo = None
self._gp_users = None
self._temperature = None
self.available = True
self._sensors = {}
@property
def data(self):
"""Return data."""
return self._sensors
def get_uri_scheme(self, use_ssl):
"""Return proper uril scheme based on config setting."""
return 'https://' if use_ssl else 'http://'
def get_resource(self, use_ssl, host, api_key, endpoint):
"""Prepare the URL."""
uri_scheme = self.get_uri_scheme(use_ssl)
if endpoint == EndPointType.Operational:
return "{}{}{}&key={}".format(uri_scheme, self._host,
CONST_OPS_ENDPOINT, self._api_key)
else:
return "{}{}{}&key={}".format(uri_scheme, self._host,
CONST_CONFIG_ENDPOINT, self._api_key)
def http_request(self, url):
"""HTTP request to the Palo Alto device."""
content = None
context = None
try:
if self._use_ssl and not self._verify_ssl:
context = ssl._create_unverified_context()
response = urllib.request.urlopen(url, context=context)
content = response.read()
except Exception as ex:
_LOGGER.error(str(ex))
content = None
return content
def update(self):
"""Get Operational and Configuration urls."""
ops_url = self.get_resource(self._use_ssl, self._host,
self._api_key, EndPointType.Operational)
users_url = ops_url.replace(CONST_COMMAND, PA_OPS_ACTIVE_USERS)
self._usersdata = self.http_request(users_url)
sysinfo_url = ops_url.replace(CONST_COMMAND, PA_CONF_SYS_INFO)
self._sysinfo = self.http_request(sysinfo_url)
gp_users_url = ops_url.replace(CONST_COMMAND, PA_CONF_GP_USERS)
self._gp_users = self.http_request(gp_users_url)
temperature_url = ops_url.replace(CONST_COMMAND, PA_CONF_TEMPERATURE)
self._temperature = self.http_request(temperature_url)
"""parse the xml data"""
self.parse_data()
def parse_globalprotect_users(self):
"""Parses global protect users xml."""
user_count = 0
vpn_users = []
root = ET.fromstring(self._gp_users)
nodes = root.findall('result/gp-portal-users/user')
for user in nodes:
user_count += 1
vpn_users.append(user.find('username').text)
if user_count != 0:
self._sensors["gp_users"] = ', '.join(vpn_users)
else:
self._sensors["gp_users"] = "None"
self._sensors["gp_user_count"] = user_count
def parse_temperature(self):
"""Parses environment/temperature values."""
root = ET.fromstring(self._temperature)
nodes = root.findall('result/thermal/Slot1/entry/DegreesC')
self._sensors["core_temp"] = round(float(nodes[0].text), 2)
self._sensors["sys_temp"] = round(float(nodes[1].text), 2)
def parse_system_info(self):
"""Parses System Information."""
root = ET.fromstring(self._sysinfo)
sys_node = root.findall('result/system')
self._sensors["up_time"] = sys_node[0].find('uptime').text
self._sensors["serial_no"] = sys_node[0].find('serial').text
self._sensors["host_name"] = sys_node[0].find('hostname').text
self._sensors["sw_version"] = sys_node[0].find('sw-version').text
self._sensors["logdb_version"] = sys_node[0].find(
'logdb-version').text
self._sensors["operation_mode"] = sys_node[0].find(
'operational-mode').text
self._sensors["gp_version"] = sys_node[0].find(
'global-protect-client-package-version').text
def parse_active_users(self):
"""Parses Active Users XML."""
root = ET.fromstring(self._usersdata)
nodes = root.findall('result/admins/entry')
count = 0
users = []
for item in nodes:
count += 1
users.append(item.find('admin').text)
if count > 0:
self._sensors["loggedin_users"] = ', '.join(users)
else:
self._sensors["loggedin_users"] = "None"
self._sensors["loggedin_user_count"] = count
def parse_data(self):
"""Parses data and populates sensors."""
self.parse_globalprotect_users()
self.parse_temperature()
self.parse_system_info()
self.parse_active_users()
class EndPointType(Enum):
"""Enum that indicates that type of endpoint that is."""
Operational = "operational"
Configuration = "configuration"

141
custom_components/sensor/udp.py Executable file
View File

@@ -0,0 +1,141 @@
"""
Custom Component, written by @skalavala - based on the existing TCP component.
Support for UDP socket based sensors.
For more details about this platform, please refer to the documentation at
https://home-assistant.io/components/sensor.udp/
"""
import logging
import socket
import select
import voluptuous as vol
from homeassistant.components.sensor import PLATFORM_SCHEMA
from homeassistant.const import (
CONF_NAME, CONF_HOST, CONF_PORT, CONF_PAYLOAD, CONF_TIMEOUT,
CONF_UNIT_OF_MEASUREMENT, CONF_VALUE_TEMPLATE)
from homeassistant.exceptions import TemplateError
from homeassistant.helpers.entity import Entity
import homeassistant.helpers.config_validation as cv
_LOGGER = logging.getLogger(__name__)
CONF_BUFFER_SIZE = 'buffer_size'
CONF_VALUE_ON = 'value_on'
DEFAULT_BUFFER_SIZE = 1024
DEFAULT_NAME = 'UDP Sensor'
DEFAULT_TIMEOUT = 10
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
vol.Required(CONF_HOST): cv.string,
vol.Required(CONF_PORT): cv.port,
vol.Required(CONF_PAYLOAD): cv.string,
vol.Optional(CONF_BUFFER_SIZE, default=DEFAULT_BUFFER_SIZE):
cv.positive_int,
vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string,
vol.Optional(CONF_TIMEOUT, default=DEFAULT_TIMEOUT): cv.positive_int,
vol.Optional(CONF_UNIT_OF_MEASUREMENT): cv.string,
vol.Optional(CONF_VALUE_ON): cv.string,
vol.Optional(CONF_VALUE_TEMPLATE): cv.template,
})
def setup_platform(hass, config, add_entities, discovery_info=None):
"""Set up the UDP Sensor."""
add_entities([UdpSensor(hass, config)])
class UdpSensor(Entity):
"""Implementation of a UDP socket based sensor."""
required = tuple()
def __init__(self, hass, config):
"""Set all the config values if they exist and get initial state."""
value_template = config.get(CONF_VALUE_TEMPLATE)
if value_template is not None:
value_template.hass = hass
self._hass = hass
self._config = {
CONF_NAME: config.get(CONF_NAME),
CONF_HOST: config.get(CONF_HOST),
CONF_PORT: config.get(CONF_PORT),
CONF_TIMEOUT: config.get(CONF_TIMEOUT),
CONF_PAYLOAD: config.get(CONF_PAYLOAD),
CONF_UNIT_OF_MEASUREMENT: config.get(CONF_UNIT_OF_MEASUREMENT),
CONF_VALUE_TEMPLATE: value_template,
CONF_VALUE_ON: config.get(CONF_VALUE_ON),
CONF_BUFFER_SIZE: config.get(CONF_BUFFER_SIZE),
}
self._state = None
self.update()
@property
def name(self):
"""Return the name of this sensor."""
name = self._config[CONF_NAME]
if name is not None:
return name
return super(UdpSensor, self).name
@property
def state(self):
"""Return the state of the device."""
return self._state
@property
def unit_of_measurement(self):
"""Return the unit of measurement of this entity."""
return self._config[CONF_UNIT_OF_MEASUREMENT]
def update(self):
"""Get the latest value for this sensor."""
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock:
sock.settimeout(self._config[CONF_TIMEOUT])
try:
sock.connect(
(self._config[CONF_HOST], self._config[CONF_PORT]))
except socket.error as err:
_LOGGER.error(
"Unable to connect to %s on port %s: %s",
self._config[CONF_HOST], self._config[CONF_PORT], err)
return
try:
sock.send(self._config[CONF_PAYLOAD].encode())
except socket.error as err:
_LOGGER.error(
"Unable to send payload %r to %s on port %s: %s",
self._config[CONF_PAYLOAD], self._config[CONF_HOST],
self._config[CONF_PORT], err)
return
readable, _, _ = select.select(
[sock], [], [], self._config[CONF_TIMEOUT])
if not readable:
_LOGGER.warning(
"Timeout (%s second(s)) waiting for a response after "
"sending %r to %s on port %s.",
self._config[CONF_TIMEOUT], self._config[CONF_PAYLOAD],
self._config[CONF_HOST], self._config[CONF_PORT])
return
value = sock.recv(self._config[CONF_BUFFER_SIZE]).decode()
if self._config[CONF_VALUE_TEMPLATE] is not None:
try:
self._state = self._config[CONF_VALUE_TEMPLATE].render(
value=value)
return
except TemplateError:
_LOGGER.error(
"Unable to render template of %r with value: %r",
self._config[CONF_VALUE_TEMPLATE], value)
return
self._state = value