mysmarthome/custom_components/telegram_bot/__init__.py

988 lines
37 KiB
Python

"""Support to send and receive Telegram messages."""
from functools import partial
import importlib
import io
from ipaddress import ip_network
import logging
import requests
from requests.auth import HTTPBasicAuth, HTTPDigestAuth
from telegram import (
Bot,
InlineKeyboardButton,
InlineKeyboardMarkup,
ReplyKeyboardMarkup,
ReplyKeyboardRemove,
)
from telegram.error import TelegramError
from telegram.parsemode import ParseMode
from telegram.utils.request import Request
import voluptuous as vol
from homeassistant.const import (
ATTR_COMMAND,
ATTR_LATITUDE,
ATTR_LONGITUDE,
CONF_API_KEY,
CONF_PLATFORM,
CONF_URL,
HTTP_BEARER_AUTHENTICATION,
HTTP_DIGEST_AUTHENTICATION,
)
from homeassistant.core import HomeAssistant, ServiceCall
from homeassistant.exceptions import TemplateError
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.typing import ConfigType
_LOGGER = logging.getLogger(__name__)
ATTR_DATA = "data"
ATTR_MESSAGE = "message"
ATTR_TITLE = "title"
ATTR_ARGS = "args"
ATTR_AUTHENTICATION = "authentication"
ATTR_CALLBACK_QUERY = "callback_query"
ATTR_CALLBACK_QUERY_ID = "callback_query_id"
ATTR_CAPTION = "caption"
ATTR_CHAT_ID = "chat_id"
ATTR_CHAT_INSTANCE = "chat_instance"
ATTR_DISABLE_NOTIF = "disable_notification"
ATTR_DISABLE_WEB_PREV = "disable_web_page_preview"
ATTR_EDITED_MSG = "edited_message"
ATTR_FILE = "file"
ATTR_FROM_FIRST = "from_first"
ATTR_FROM_LAST = "from_last"
ATTR_KEYBOARD = "keyboard"
ATTR_KEYBOARD_INLINE = "inline_keyboard"
ATTR_MESSAGEID = "message_id"
ATTR_MSG = "message"
ATTR_MSGID = "id"
ATTR_PARSER = "parse_mode"
ATTR_PASSWORD = "password"
ATTR_REPLY_TO_MSGID = "reply_to_message_id"
ATTR_REPLYMARKUP = "reply_markup"
ATTR_SHOW_ALERT = "show_alert"
ATTR_STICKER_ID = "sticker_id"
ATTR_TARGET = "target"
ATTR_TEXT = "text"
ATTR_URL = "url"
ATTR_USER_ID = "user_id"
ATTR_USERNAME = "username"
ATTR_VERIFY_SSL = "verify_ssl"
ATTR_TIMEOUT = "timeout"
ATTR_MESSAGE_TAG = "message_tag"
ATTR_CHANNEL_POST = "channel_post"
CONF_ALLOWED_CHAT_IDS = "allowed_chat_ids"
CONF_PROXY_URL = "proxy_url"
CONF_PROXY_PARAMS = "proxy_params"
CONF_TRUSTED_NETWORKS = "trusted_networks"
DOMAIN = "telegram_bot"
SERVICE_SEND_MESSAGE = "send_message"
SERVICE_SEND_PHOTO = "send_photo"
SERVICE_SEND_STICKER = "send_sticker"
SERVICE_SEND_ANIMATION = "send_animation"
SERVICE_SEND_VIDEO = "send_video"
SERVICE_SEND_VOICE = "send_voice"
SERVICE_SEND_DOCUMENT = "send_document"
SERVICE_SEND_LOCATION = "send_location"
SERVICE_EDIT_MESSAGE = "edit_message"
SERVICE_EDIT_CAPTION = "edit_caption"
SERVICE_EDIT_REPLYMARKUP = "edit_replymarkup"
SERVICE_ANSWER_CALLBACK_QUERY = "answer_callback_query"
SERVICE_DELETE_MESSAGE = "delete_message"
SERVICE_LEAVE_CHAT = "leave_chat"
EVENT_TELEGRAM_CALLBACK = "telegram_callback"
EVENT_TELEGRAM_COMMAND = "telegram_command"
EVENT_TELEGRAM_TEXT = "telegram_text"
EVENT_TELEGRAM_SENT = "telegram_sent"
PARSER_HTML = "html"
PARSER_MD = "markdown"
DEFAULT_TRUSTED_NETWORKS = [ip_network("149.154.160.0/20"), ip_network("91.108.4.0/22")]
CONFIG_SCHEMA = vol.Schema(
{
DOMAIN: vol.All(
cv.ensure_list,
[
vol.Schema(
{
vol.Required(CONF_PLATFORM): vol.In(
("broadcast", "polling", "webhooks")
),
vol.Required(CONF_API_KEY): cv.string,
vol.Required(CONF_ALLOWED_CHAT_IDS): vol.All(
cv.ensure_list, [vol.Coerce(int)]
),
vol.Optional(ATTR_PARSER, default=PARSER_MD): cv.string,
vol.Optional(CONF_PROXY_URL): cv.string,
vol.Optional(CONF_PROXY_PARAMS): dict,
# webhooks
vol.Optional(CONF_URL): cv.url,
vol.Optional(
CONF_TRUSTED_NETWORKS, default=DEFAULT_TRUSTED_NETWORKS
): vol.All(cv.ensure_list, [ip_network]),
}
)
],
)
},
extra=vol.ALLOW_EXTRA,
)
BASE_SERVICE_SCHEMA = vol.Schema(
{
vol.Optional(ATTR_TARGET): vol.All(cv.ensure_list, [vol.Coerce(int)]),
vol.Optional(ATTR_PARSER): cv.string,
vol.Optional(ATTR_DISABLE_NOTIF): cv.boolean,
vol.Optional(ATTR_DISABLE_WEB_PREV): cv.boolean,
vol.Optional(ATTR_KEYBOARD): vol.All(cv.ensure_list, [cv.string]),
vol.Optional(ATTR_KEYBOARD_INLINE): cv.ensure_list,
vol.Optional(ATTR_TIMEOUT): cv.positive_int,
vol.Optional(ATTR_MESSAGE_TAG): cv.string,
},
extra=vol.ALLOW_EXTRA,
)
SERVICE_SCHEMA_SEND_MESSAGE = BASE_SERVICE_SCHEMA.extend(
{vol.Required(ATTR_MESSAGE): cv.template, vol.Optional(ATTR_TITLE): cv.template}
)
SERVICE_SCHEMA_SEND_FILE = BASE_SERVICE_SCHEMA.extend(
{
vol.Optional(ATTR_URL): cv.template,
vol.Optional(ATTR_FILE): cv.template,
vol.Optional(ATTR_CAPTION): cv.template,
vol.Optional(ATTR_USERNAME): cv.string,
vol.Optional(ATTR_PASSWORD): cv.string,
vol.Optional(ATTR_AUTHENTICATION): cv.string,
vol.Optional(ATTR_VERIFY_SSL): cv.boolean,
}
)
SERVICE_SCHEMA_SEND_STICKER = SERVICE_SCHEMA_SEND_FILE.extend(
{vol.Optional(ATTR_STICKER_ID): cv.string}
)
SERVICE_SCHEMA_SEND_LOCATION = BASE_SERVICE_SCHEMA.extend(
{
vol.Required(ATTR_LONGITUDE): cv.template,
vol.Required(ATTR_LATITUDE): cv.template,
}
)
SERVICE_SCHEMA_EDIT_MESSAGE = SERVICE_SCHEMA_SEND_MESSAGE.extend(
{
vol.Required(ATTR_MESSAGEID): vol.Any(
cv.positive_int, vol.All(cv.string, "last")
),
vol.Required(ATTR_CHAT_ID): vol.Coerce(int),
}
)
SERVICE_SCHEMA_EDIT_CAPTION = vol.Schema(
{
vol.Required(ATTR_MESSAGEID): vol.Any(
cv.positive_int, vol.All(cv.string, "last")
),
vol.Required(ATTR_CHAT_ID): vol.Coerce(int),
vol.Required(ATTR_CAPTION): cv.template,
vol.Optional(ATTR_KEYBOARD_INLINE): cv.ensure_list,
},
extra=vol.ALLOW_EXTRA,
)
SERVICE_SCHEMA_EDIT_REPLYMARKUP = vol.Schema(
{
vol.Required(ATTR_MESSAGEID): vol.Any(
cv.positive_int, vol.All(cv.string, "last")
),
vol.Required(ATTR_CHAT_ID): vol.Coerce(int),
vol.Required(ATTR_KEYBOARD_INLINE): cv.ensure_list,
},
extra=vol.ALLOW_EXTRA,
)
SERVICE_SCHEMA_ANSWER_CALLBACK_QUERY = vol.Schema(
{
vol.Required(ATTR_MESSAGE): cv.template,
vol.Required(ATTR_CALLBACK_QUERY_ID): vol.Coerce(int),
vol.Optional(ATTR_SHOW_ALERT): cv.boolean,
},
extra=vol.ALLOW_EXTRA,
)
SERVICE_SCHEMA_DELETE_MESSAGE = vol.Schema(
{
vol.Required(ATTR_CHAT_ID): vol.Coerce(int),
vol.Required(ATTR_MESSAGEID): vol.Any(
cv.positive_int, vol.All(cv.string, "last")
),
},
extra=vol.ALLOW_EXTRA,
)
SERVICE_SCHEMA_LEAVE_CHAT = vol.Schema({vol.Required(ATTR_CHAT_ID): vol.Coerce(int)})
SERVICE_MAP = {
SERVICE_SEND_MESSAGE: SERVICE_SCHEMA_SEND_MESSAGE,
SERVICE_SEND_PHOTO: SERVICE_SCHEMA_SEND_FILE,
SERVICE_SEND_STICKER: SERVICE_SCHEMA_SEND_STICKER,
SERVICE_SEND_ANIMATION: SERVICE_SCHEMA_SEND_FILE,
SERVICE_SEND_VIDEO: SERVICE_SCHEMA_SEND_FILE,
SERVICE_SEND_VOICE: SERVICE_SCHEMA_SEND_FILE,
SERVICE_SEND_DOCUMENT: SERVICE_SCHEMA_SEND_FILE,
SERVICE_SEND_LOCATION: SERVICE_SCHEMA_SEND_LOCATION,
SERVICE_EDIT_MESSAGE: SERVICE_SCHEMA_EDIT_MESSAGE,
SERVICE_EDIT_CAPTION: SERVICE_SCHEMA_EDIT_CAPTION,
SERVICE_EDIT_REPLYMARKUP: SERVICE_SCHEMA_EDIT_REPLYMARKUP,
SERVICE_ANSWER_CALLBACK_QUERY: SERVICE_SCHEMA_ANSWER_CALLBACK_QUERY,
SERVICE_DELETE_MESSAGE: SERVICE_SCHEMA_DELETE_MESSAGE,
SERVICE_LEAVE_CHAT: SERVICE_SCHEMA_LEAVE_CHAT,
}
def load_data(
hass,
url=None,
filepath=None,
username=None,
password=None,
authentication=None,
num_retries=5,
verify_ssl=None,
):
"""Load data into ByteIO/File container from a source."""
try:
if url is not None:
# Load data from URL
params = {"timeout": 15}
if authentication == HTTP_BEARER_AUTHENTICATION and password is not None:
params["headers"] = {"Authorization": f"Bearer {password}"}
elif username is not None and password is not None:
if authentication == HTTP_DIGEST_AUTHENTICATION:
params["auth"] = HTTPDigestAuth(username, password)
else:
params["auth"] = HTTPBasicAuth(username, password)
if verify_ssl is not None:
params["verify"] = verify_ssl
retry_num = 0
while retry_num < num_retries:
req = requests.get(url, **params)
if not req.ok:
_LOGGER.warning(
"Status code %s (retry #%s) loading %s",
req.status_code,
retry_num + 1,
url,
)
else:
data = io.BytesIO(req.content)
if data.read():
data.seek(0)
data.name = url
return data
_LOGGER.warning("Empty data (retry #%s) in %s)", retry_num + 1, url)
retry_num += 1
_LOGGER.warning("Can't load data in %s after %s retries", url, retry_num)
elif filepath is not None:
if hass.config.is_allowed_path(filepath):
return open(filepath, "rb")
_LOGGER.warning("'%s' are not secure to load data from!", filepath)
else:
_LOGGER.warning("Can't load data. No data found in params!")
except (OSError, TypeError) as error:
_LOGGER.error("Can't load data into ByteIO: %s", error)
return None
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
"""Set up the Telegram bot component."""
if not config[DOMAIN]:
return False
for p_config in config[DOMAIN]:
p_type = p_config.get(CONF_PLATFORM)
platform = importlib.import_module(f".{p_config[CONF_PLATFORM]}", __name__)
_LOGGER.info("Setting up %s.%s", DOMAIN, p_type)
try:
receiver_service = await platform.async_setup_platform(hass, p_config)
if receiver_service is False:
_LOGGER.error("Failed to initialize Telegram bot %s", p_type)
return False
except Exception: # pylint: disable=broad-except
_LOGGER.exception("Error setting up platform %s", p_type)
return False
bot = initialize_bot(p_config)
notify_service = TelegramNotificationService(
hass, bot, p_config.get(CONF_ALLOWED_CHAT_IDS), p_config.get(ATTR_PARSER)
)
async def async_send_telegram_message(service: ServiceCall) -> None:
"""Handle sending Telegram Bot message service calls."""
def _render_template_attr(data, attribute):
if attribute_templ := data.get(attribute):
if any(
isinstance(attribute_templ, vtype) for vtype in (float, int, str)
):
data[attribute] = attribute_templ
else:
attribute_templ.hass = hass
try:
data[attribute] = attribute_templ.async_render(
parse_result=False
)
except TemplateError as exc:
_LOGGER.error(
"TemplateError in %s: %s -> %s",
attribute,
attribute_templ.template,
exc,
)
data[attribute] = attribute_templ.template
msgtype = service.service
kwargs = dict(service.data)
for attribute in (
ATTR_MESSAGE,
ATTR_TITLE,
ATTR_URL,
ATTR_FILE,
ATTR_CAPTION,
ATTR_LONGITUDE,
ATTR_LATITUDE,
):
_render_template_attr(kwargs, attribute)
_LOGGER.debug("New telegram message %s: %s", msgtype, kwargs)
if msgtype == SERVICE_SEND_MESSAGE:
await hass.async_add_executor_job(
partial(notify_service.send_message, **kwargs)
)
elif msgtype in [
SERVICE_SEND_PHOTO,
SERVICE_SEND_ANIMATION,
SERVICE_SEND_VIDEO,
SERVICE_SEND_VOICE,
SERVICE_SEND_DOCUMENT,
]:
await hass.async_add_executor_job(
partial(notify_service.send_file, msgtype, **kwargs)
)
elif msgtype == SERVICE_SEND_STICKER:
await hass.async_add_executor_job(
partial(notify_service.send_sticker, **kwargs)
)
elif msgtype == SERVICE_SEND_LOCATION:
await hass.async_add_executor_job(
partial(notify_service.send_location, **kwargs)
)
elif msgtype == SERVICE_ANSWER_CALLBACK_QUERY:
await hass.async_add_executor_job(
partial(notify_service.answer_callback_query, **kwargs)
)
elif msgtype == SERVICE_DELETE_MESSAGE:
await hass.async_add_executor_job(
partial(notify_service.delete_message, **kwargs)
)
else:
await hass.async_add_executor_job(
partial(notify_service.edit_message, msgtype, **kwargs)
)
# Register notification services
for service_notif, schema in SERVICE_MAP.items():
hass.services.async_register(
DOMAIN, service_notif, async_send_telegram_message, schema=schema
)
return True
def initialize_bot(p_config):
"""Initialize telegram bot with proxy support."""
api_key = p_config.get(CONF_API_KEY)
proxy_url = p_config.get(CONF_PROXY_URL)
proxy_params = p_config.get(CONF_PROXY_PARAMS)
if proxy_url is not None:
request = Request(
con_pool_size=8, proxy_url=proxy_url, urllib3_proxy_kwargs=proxy_params
)
else:
request = Request(con_pool_size=8)
return Bot(token=api_key, request=request)
class TelegramNotificationService:
"""Implement the notification services for the Telegram Bot domain."""
def __init__(self, hass, bot, allowed_chat_ids, parser):
"""Initialize the service."""
self.allowed_chat_ids = allowed_chat_ids
self._default_user = self.allowed_chat_ids[0]
self._last_message_id = {user: None for user in self.allowed_chat_ids}
self._parsers = {PARSER_HTML: ParseMode.HTML, PARSER_MD: ParseMode.MARKDOWN}
self._parse_mode = self._parsers.get(parser)
self.bot = bot
self.hass = hass
def _get_msg_ids(self, msg_data, chat_id):
"""Get the message id to edit.
This can be one of (message_id, inline_message_id) from a msg dict,
returning a tuple.
**You can use 'last' as message_id** to edit
the message last sent in the chat_id.
"""
message_id = inline_message_id = None
if ATTR_MESSAGEID in msg_data:
message_id = msg_data[ATTR_MESSAGEID]
if (
isinstance(message_id, str)
and (message_id == "last")
and (self._last_message_id[chat_id] is not None)
):
message_id = self._last_message_id[chat_id]
else:
inline_message_id = msg_data["inline_message_id"]
return message_id, inline_message_id
def _get_target_chat_ids(self, target):
"""Validate chat_id targets or return default target (first).
:param target: optional list of integers ([12234, -12345])
:return list of chat_id targets (integers)
"""
if target is not None:
if isinstance(target, int):
target = [target]
chat_ids = [t for t in target if t in self.allowed_chat_ids]
if chat_ids:
return chat_ids
_LOGGER.warning(
"Disallowed targets: %s, using default: %s", target, self._default_user
)
return [self._default_user]
def _get_msg_kwargs(self, data):
"""Get parameters in message data kwargs."""
def _make_row_inline_keyboard(row_keyboard):
"""Make a list of InlineKeyboardButtons.
It can accept:
- a list of tuples like:
`[(text_b1, data_callback_b1),
(text_b2, data_callback_b2), ...]
- a string like: `/cmd1, /cmd2, /cmd3`
- or a string like: `text_b1:/cmd1, text_b2:/cmd2`
"""
buttons = []
if isinstance(row_keyboard, str):
for key in row_keyboard.split(","):
if ":/" in key:
# commands like: 'Label:/cmd' become ('Label', '/cmd')
label = key.split(":/")[0]
command = key[len(label) + 1 :]
buttons.append(
InlineKeyboardButton(label, callback_data=command)
)
else:
# commands like: '/cmd' become ('CMD', '/cmd')
label = key.strip()[1:].upper()
buttons.append(InlineKeyboardButton(label, callback_data=key))
elif isinstance(row_keyboard, list):
for entry in row_keyboard:
text_btn, data_btn = entry
buttons.append(
InlineKeyboardButton(text_btn, callback_data=data_btn)
)
else:
raise ValueError(str(row_keyboard))
return buttons
# Defaults
params = {
ATTR_PARSER: self._parse_mode,
ATTR_DISABLE_NOTIF: False,
ATTR_DISABLE_WEB_PREV: None,
ATTR_REPLY_TO_MSGID: None,
ATTR_REPLYMARKUP: None,
ATTR_TIMEOUT: None,
ATTR_MESSAGE_TAG: None,
}
if data is not None:
if ATTR_PARSER in data:
params[ATTR_PARSER] = self._parsers.get(
data[ATTR_PARSER], self._parse_mode
)
if ATTR_TIMEOUT in data:
params[ATTR_TIMEOUT] = data[ATTR_TIMEOUT]
if ATTR_DISABLE_NOTIF in data:
params[ATTR_DISABLE_NOTIF] = data[ATTR_DISABLE_NOTIF]
if ATTR_DISABLE_WEB_PREV in data:
params[ATTR_DISABLE_WEB_PREV] = data[ATTR_DISABLE_WEB_PREV]
if ATTR_REPLY_TO_MSGID in data:
params[ATTR_REPLY_TO_MSGID] = data[ATTR_REPLY_TO_MSGID]
if ATTR_MESSAGE_TAG in data:
params[ATTR_MESSAGE_TAG] = data[ATTR_MESSAGE_TAG]
# Keyboards:
if ATTR_KEYBOARD in data:
keys = data.get(ATTR_KEYBOARD)
keys = keys if isinstance(keys, list) else [keys]
if keys:
params[ATTR_REPLYMARKUP] = ReplyKeyboardMarkup(
[[key.strip() for key in row.split(",")] for row in keys]
)
else:
params[ATTR_REPLYMARKUP] = ReplyKeyboardRemove(True)
elif ATTR_KEYBOARD_INLINE in data:
keys = data.get(ATTR_KEYBOARD_INLINE)
keys = keys if isinstance(keys, list) else [keys]
params[ATTR_REPLYMARKUP] = InlineKeyboardMarkup(
[_make_row_inline_keyboard(row) for row in keys]
)
return params
def _send_msg(self, func_send, msg_error, message_tag, *args_msg, **kwargs_msg):
"""Send one message."""
try:
out = func_send(*args_msg, **kwargs_msg)
if not isinstance(out, bool) and hasattr(out, ATTR_MESSAGEID):
chat_id = out.chat_id
message_id = out[ATTR_MESSAGEID]
self._last_message_id[chat_id] = message_id
_LOGGER.debug(
"Last message ID: %s (from chat_id %s)",
self._last_message_id,
chat_id,
)
event_data = {
ATTR_CHAT_ID: chat_id,
ATTR_MESSAGEID: message_id,
}
if message_tag is not None:
event_data[ATTR_MESSAGE_TAG] = message_tag
self.hass.bus.fire(EVENT_TELEGRAM_SENT, event_data)
elif not isinstance(out, bool):
_LOGGER.warning(
"Update last message: out_type:%s, out=%s", type(out), out
)
return out
except TelegramError as exc:
_LOGGER.error(
"%s: %s. Args: %s, kwargs: %s", msg_error, exc, args_msg, kwargs_msg
)
def send_message(self, message="", target=None, **kwargs):
"""Send a message to one or multiple pre-allowed chat IDs."""
title = kwargs.get(ATTR_TITLE)
text = f"{title}\n{message}" if title else message
params = self._get_msg_kwargs(kwargs)
for chat_id in self._get_target_chat_ids(target):
_LOGGER.debug("Send message in chat ID %s with params: %s", chat_id, params)
self._send_msg(
self.bot.send_message,
"Error sending message",
params[ATTR_MESSAGE_TAG],
chat_id,
text,
parse_mode=params[ATTR_PARSER],
disable_web_page_preview=params[ATTR_DISABLE_WEB_PREV],
disable_notification=params[ATTR_DISABLE_NOTIF],
reply_to_message_id=params[ATTR_REPLY_TO_MSGID],
reply_markup=params[ATTR_REPLYMARKUP],
timeout=params[ATTR_TIMEOUT],
)
def delete_message(self, chat_id=None, **kwargs):
"""Delete a previously sent message."""
chat_id = self._get_target_chat_ids(chat_id)[0]
message_id, _ = self._get_msg_ids(kwargs, chat_id)
_LOGGER.debug("Delete message %s in chat ID %s", message_id, chat_id)
deleted = self._send_msg(
self.bot.delete_message, "Error deleting message", None, chat_id, message_id
)
# reduce message_id anyway:
if self._last_message_id[chat_id] is not None:
# change last msg_id for deque(n_msgs)?
self._last_message_id[chat_id] -= 1
return deleted
def edit_message(self, type_edit, chat_id=None, **kwargs):
"""Edit a previously sent message."""
chat_id = self._get_target_chat_ids(chat_id)[0]
message_id, inline_message_id = self._get_msg_ids(kwargs, chat_id)
params = self._get_msg_kwargs(kwargs)
_LOGGER.debug(
"Edit message %s in chat ID %s with params: %s",
message_id or inline_message_id,
chat_id,
params,
)
if type_edit == SERVICE_EDIT_MESSAGE:
message = kwargs.get(ATTR_MESSAGE)
title = kwargs.get(ATTR_TITLE)
text = f"{title}\n{message}" if title else message
_LOGGER.debug("Editing message with ID %s", message_id or inline_message_id)
return self._send_msg(
self.bot.edit_message_text,
"Error editing text message",
params[ATTR_MESSAGE_TAG],
text,
chat_id=chat_id,
message_id=message_id,
inline_message_id=inline_message_id,
parse_mode=params[ATTR_PARSER],
disable_web_page_preview=params[ATTR_DISABLE_WEB_PREV],
reply_markup=params[ATTR_REPLYMARKUP],
timeout=params[ATTR_TIMEOUT],
)
if type_edit == SERVICE_EDIT_CAPTION:
return self._send_msg(
self.bot.edit_message_caption,
"Error editing message attributes",
params[ATTR_MESSAGE_TAG],
chat_id=chat_id,
message_id=message_id,
inline_message_id=inline_message_id,
caption=kwargs.get(ATTR_CAPTION),
reply_markup=params[ATTR_REPLYMARKUP],
timeout=params[ATTR_TIMEOUT],
parse_mode=params[ATTR_PARSER],
)
return self._send_msg(
self.bot.edit_message_reply_markup,
"Error editing message attributes",
params[ATTR_MESSAGE_TAG],
chat_id=chat_id,
message_id=message_id,
inline_message_id=inline_message_id,
reply_markup=params[ATTR_REPLYMARKUP],
timeout=params[ATTR_TIMEOUT],
)
def answer_callback_query(
self, message, callback_query_id, show_alert=False, **kwargs
):
"""Answer a callback originated with a press in an inline keyboard."""
params = self._get_msg_kwargs(kwargs)
_LOGGER.debug(
"Answer callback query with callback ID %s: %s, alert: %s",
callback_query_id,
message,
show_alert,
)
self._send_msg(
self.bot.answer_callback_query,
"Error sending answer callback query",
params[ATTR_MESSAGE_TAG],
callback_query_id,
text=message,
show_alert=show_alert,
timeout=params[ATTR_TIMEOUT],
)
def send_file(self, file_type=SERVICE_SEND_PHOTO, target=None, **kwargs):
"""Send a photo, sticker, video, or document."""
params = self._get_msg_kwargs(kwargs)
file_content = load_data(
self.hass,
url=kwargs.get(ATTR_URL),
filepath=kwargs.get(ATTR_FILE),
username=kwargs.get(ATTR_USERNAME),
password=kwargs.get(ATTR_PASSWORD),
authentication=kwargs.get(ATTR_AUTHENTICATION),
verify_ssl=kwargs.get(ATTR_VERIFY_SSL),
)
if file_content:
for chat_id in self._get_target_chat_ids(target):
_LOGGER.debug("Sending file to chat ID %s", chat_id)
if file_type == SERVICE_SEND_PHOTO:
self._send_msg(
self.bot.send_photo,
"Error sending photo",
params[ATTR_MESSAGE_TAG],
chat_id=chat_id,
photo=file_content,
caption=kwargs.get(ATTR_CAPTION),
disable_notification=params[ATTR_DISABLE_NOTIF],
reply_markup=params[ATTR_REPLYMARKUP],
timeout=params[ATTR_TIMEOUT],
parse_mode=params[ATTR_PARSER],
)
elif file_type == SERVICE_SEND_STICKER:
self._send_msg(
self.bot.send_sticker,
"Error sending sticker",
params[ATTR_MESSAGE_TAG],
chat_id=chat_id,
sticker=file_content,
disable_notification=params[ATTR_DISABLE_NOTIF],
reply_markup=params[ATTR_REPLYMARKUP],
timeout=params[ATTR_TIMEOUT],
)
elif file_type == SERVICE_SEND_VIDEO:
self._send_msg(
self.bot.send_video,
"Error sending video",
params[ATTR_MESSAGE_TAG],
chat_id=chat_id,
video=file_content,
caption=kwargs.get(ATTR_CAPTION),
disable_notification=params[ATTR_DISABLE_NOTIF],
reply_markup=params[ATTR_REPLYMARKUP],
timeout=params[ATTR_TIMEOUT],
parse_mode=params[ATTR_PARSER],
)
elif file_type == SERVICE_SEND_DOCUMENT:
self._send_msg(
self.bot.send_document,
"Error sending document",
params[ATTR_MESSAGE_TAG],
chat_id=chat_id,
document=file_content,
caption=kwargs.get(ATTR_CAPTION),
disable_notification=params[ATTR_DISABLE_NOTIF],
reply_markup=params[ATTR_REPLYMARKUP],
timeout=params[ATTR_TIMEOUT],
parse_mode=params[ATTR_PARSER],
)
elif file_type == SERVICE_SEND_VOICE:
self._send_msg(
self.bot.send_voice,
"Error sending voice",
params[ATTR_MESSAGE_TAG],
chat_id=chat_id,
voice=file_content,
caption=kwargs.get(ATTR_CAPTION),
disable_notification=params[ATTR_DISABLE_NOTIF],
reply_markup=params[ATTR_REPLYMARKUP],
timeout=params[ATTR_TIMEOUT],
)
elif file_type == SERVICE_SEND_ANIMATION:
self._send_msg(
self.bot.send_animation,
"Error sending animation",
params[ATTR_MESSAGE_TAG],
chat_id=chat_id,
animation=file_content,
caption=kwargs.get(ATTR_CAPTION),
disable_notification=params[ATTR_DISABLE_NOTIF],
reply_markup=params[ATTR_REPLYMARKUP],
timeout=params[ATTR_TIMEOUT],
parse_mode=params[ATTR_PARSER],
)
file_content.seek(0)
else:
_LOGGER.error("Can't send file with kwargs: %s", kwargs)
def send_sticker(self, target=None, **kwargs):
"""Send a sticker from a telegram sticker pack."""
params = self._get_msg_kwargs(kwargs)
stickerid = kwargs.get(ATTR_STICKER_ID)
if stickerid:
for chat_id in self._get_target_chat_ids(target):
self._send_msg(
self.bot.send_sticker,
"Error sending sticker",
params[ATTR_MESSAGE_TAG],
chat_id=chat_id,
sticker=stickerid,
disable_notification=params[ATTR_DISABLE_NOTIF],
reply_markup=params[ATTR_REPLYMARKUP],
timeout=params[ATTR_TIMEOUT],
)
else:
self.send_file(SERVICE_SEND_STICKER, target, **kwargs)
def send_location(self, latitude, longitude, target=None, **kwargs):
"""Send a location."""
latitude = float(latitude)
longitude = float(longitude)
params = self._get_msg_kwargs(kwargs)
for chat_id in self._get_target_chat_ids(target):
_LOGGER.debug(
"Send location %s/%s to chat ID %s", latitude, longitude, chat_id
)
self._send_msg(
self.bot.send_location,
"Error sending location",
params[ATTR_MESSAGE_TAG],
chat_id=chat_id,
latitude=latitude,
longitude=longitude,
disable_notification=params[ATTR_DISABLE_NOTIF],
timeout=params[ATTR_TIMEOUT],
)
def leave_chat(self, chat_id=None):
"""Remove bot from chat."""
chat_id = self._get_target_chat_ids(chat_id)[0]
_LOGGER.debug("Leave from chat ID %s", chat_id)
leaved = self._send_msg(
self.bot.leave_chat, "Error leaving chat", None, chat_id
)
return leaved
class BaseTelegramBotEntity:
"""The base class for the telegram bot."""
def __init__(self, hass, allowed_chat_ids):
"""Initialize the bot base class."""
self.allowed_chat_ids = allowed_chat_ids
self.hass = hass
def _get_message_data(self, msg_data):
"""Return boolean msg_data_is_ok and dict msg_data."""
if not msg_data:
return False, None
bad_fields = (
"text" not in msg_data and "data" not in msg_data and "chat" not in msg_data
)
if bad_fields or "from" not in msg_data:
# Message is not correct.
_LOGGER.error("Incoming message does not have required data (%s)", msg_data)
return False, None
if (
msg_data["from"].get("id") not in self.allowed_chat_ids
and msg_data["chat"].get("id") not in self.allowed_chat_ids
# and msg_data["message"]["chat"].get("id") not in self.allowed_chat_ids
):
# Neither from id nor chat id was in allowed_chat_ids,
# origin is not allowed.
_LOGGER.error("Incoming message is not allowed (%s)", msg_data)
return True, None
data = {
ATTR_USER_ID: msg_data["from"]["id"],
ATTR_FROM_FIRST: msg_data["from"]["first_name"],
}
if "message_id" in msg_data:
data[ATTR_MSGID] = msg_data["message_id"]
if "last_name" in msg_data["from"]:
data[ATTR_FROM_LAST] = msg_data["from"]["last_name"]
if "chat" in msg_data:
data[ATTR_CHAT_ID] = msg_data["chat"]["id"]
elif ATTR_MESSAGE in msg_data and "chat" in msg_data[ATTR_MESSAGE]:
data[ATTR_CHAT_ID] = msg_data[ATTR_MESSAGE]["chat"]["id"]
return True, data
def _get_channel_post_data(self, msg_data):
"""Return boolean msg_data_is_ok and dict msg_data."""
if not msg_data:
return False, None
if "sender_chat" in msg_data and "chat" in msg_data and "text" in msg_data:
if (
msg_data["sender_chat"].get("id") not in self.allowed_chat_ids
and msg_data["chat"].get("id") not in self.allowed_chat_ids
):
# Neither sender_chat id nor chat id was in allowed_chat_ids,
# origin is not allowed.
_LOGGER.error("Incoming message is not allowed (%s)", msg_data)
return True, None
data = {
ATTR_MSGID: msg_data["message_id"],
ATTR_CHAT_ID: msg_data["chat"]["id"],
ATTR_TEXT: msg_data["text"],
}
return True, data
_LOGGER.error("Incoming message does not have required data (%s)", msg_data)
return False, None
def process_message(self, data):
"""Check for basic message rules and fire an event if message is ok."""
if ATTR_MSG in data or ATTR_EDITED_MSG in data:
event = EVENT_TELEGRAM_COMMAND
if ATTR_MSG in data:
data = data.get(ATTR_MSG)
else:
data = data.get(ATTR_EDITED_MSG)
message_ok, event_data = self._get_message_data(data)
if event_data is None:
return message_ok
if ATTR_MSGID in data:
event_data[ATTR_MSGID] = data[ATTR_MSGID]
if "text" in data:
if data["text"][0] == "/":
pieces = data["text"].split(" ")
event_data[ATTR_COMMAND] = pieces[0]
event_data[ATTR_ARGS] = pieces[1:]
else:
event_data[ATTR_TEXT] = data["text"]
event = EVENT_TELEGRAM_TEXT
else:
_LOGGER.warning("Message without text data received: %s", data)
event_data[ATTR_TEXT] = str(data)
event = EVENT_TELEGRAM_TEXT
self.hass.bus.async_fire(event, event_data)
return True
if ATTR_CALLBACK_QUERY in data:
event = EVENT_TELEGRAM_CALLBACK
data = data.get(ATTR_CALLBACK_QUERY)
message_ok, event_data = self._get_message_data(data)
if event_data is None:
return message_ok
query_data = event_data[ATTR_DATA] = data[ATTR_DATA]
if query_data[0] == "/":
pieces = query_data.split(" ")
event_data[ATTR_COMMAND] = pieces[0]
event_data[ATTR_ARGS] = pieces[1:]
event_data[ATTR_MSG] = data[ATTR_MSG]
event_data[ATTR_CHAT_INSTANCE] = data[ATTR_CHAT_INSTANCE]
event_data[ATTR_MSGID] = data[ATTR_MSGID]
self.hass.bus.async_fire(event, event_data)
return True
if ATTR_CHANNEL_POST in data:
event = EVENT_TELEGRAM_TEXT
data = data.get(ATTR_CHANNEL_POST)
message_ok, event_data = self._get_channel_post_data(data)
if event_data is None:
return message_ok
self.hass.bus.async_fire(event, event_data)
return True
_LOGGER.warning("Message with unknown data received: %s", data)
return True