updating to latest

This commit is contained in:
root
2021-11-04 01:18:18 -04:00
parent f92b773514
commit 7eadb4c49c
153 changed files with 19015 additions and 5168 deletions

View File

@@ -0,0 +1,74 @@
"""HACS Configuration Schemas."""
# pylint: disable=dangerous-default-value
import voluptuous as vol
from custom_components.hacs.const import LOCALE
# Configuration:
TOKEN = "token"
SIDEPANEL_TITLE = "sidepanel_title"
SIDEPANEL_ICON = "sidepanel_icon"
FRONTEND_REPO = "frontend_repo"
FRONTEND_REPO_URL = "frontend_repo_url"
APPDAEMON = "appdaemon"
NETDAEMON = "netdaemon"
# Options:
COUNTRY = "country"
DEBUG = "debug"
RELEASE_LIMIT = "release_limit"
EXPERIMENTAL = "experimental"
# Config group
PATH_OR_URL = "frontend_repo_path_or_url"
def hacs_base_config_schema(config: dict = {}) -> dict:
"""Return a shcema configuration dict for HACS."""
if not config:
config = {
TOKEN: "xxxxxxxxxxxxxxxxxxxxxxxxxxx",
}
return {
vol.Required(TOKEN, default=config.get(TOKEN)): str,
}
def hacs_config_option_schema(options: dict = {}) -> dict:
"""Return a shcema for HACS configuration options."""
if not options:
options = {
APPDAEMON: False,
COUNTRY: "ALL",
DEBUG: False,
EXPERIMENTAL: False,
NETDAEMON: False,
RELEASE_LIMIT: 5,
SIDEPANEL_ICON: "hacs:hacs",
SIDEPANEL_TITLE: "HACS",
FRONTEND_REPO: "",
FRONTEND_REPO_URL: "",
}
return {
vol.Optional(SIDEPANEL_TITLE, default=options.get(SIDEPANEL_TITLE)): str,
vol.Optional(SIDEPANEL_ICON, default=options.get(SIDEPANEL_ICON)): str,
vol.Optional(RELEASE_LIMIT, default=options.get(RELEASE_LIMIT)): int,
vol.Optional(COUNTRY, default=options.get(COUNTRY)): vol.In(LOCALE),
vol.Optional(APPDAEMON, default=options.get(APPDAEMON)): bool,
vol.Optional(NETDAEMON, default=options.get(NETDAEMON)): bool,
vol.Optional(DEBUG, default=options.get(DEBUG)): bool,
vol.Optional(EXPERIMENTAL, default=options.get(EXPERIMENTAL)): bool,
vol.Exclusive(FRONTEND_REPO, PATH_OR_URL): str,
vol.Exclusive(FRONTEND_REPO_URL, PATH_OR_URL): str,
}
def hacs_config_combined() -> dict:
"""Combine the configuration options."""
base = hacs_base_config_schema()
options = hacs_config_option_schema()
for option in options:
base[option] = options[option]
return base

View File

@@ -0,0 +1,234 @@
"""Helpers to download repository content."""
import os
import pathlib
import tempfile
import zipfile
import async_timeout
import backoff
from queueman import QueueManager, concurrent
from custom_components.hacs.exceptions import HacsException
from custom_components.hacs.helpers.functions.filters import (
filter_content_return_one_of_type,
)
from custom_components.hacs.helpers.functions.save import async_save_file
from custom_components.hacs.share import get_hacs
from custom_components.hacs.utils.logger import getLogger
_LOGGER = getLogger()
class FileInformation:
def __init__(self, url, path, name):
self.download_url = url
self.path = path
self.name = name
@backoff.on_exception(backoff.expo, Exception, max_tries=5)
async def async_download_file(url):
"""Download files, and return the content."""
hacs = get_hacs()
if url is None:
return
if "tags/" in url:
url = url.replace("tags/", "")
_LOGGER.debug("Downloading %s", url)
result = None
with async_timeout.timeout(60, loop=hacs.hass.loop):
request = await hacs.session.get(url)
# Make sure that we got a valid result
if request.status == 200:
result = await request.read()
else:
raise HacsException(f"Got status code {request.status} when trying to download {url}")
return result
def should_try_releases(repository):
"""Return a boolean indicating whether to download releases or not."""
if repository.data.zip_release:
if repository.data.filename.endswith(".zip"):
if repository.ref != repository.data.default_branch:
return True
if repository.ref == repository.data.default_branch:
return False
if repository.data.category not in ["plugin", "theme"]:
return False
if not repository.data.releases:
return False
return True
def gather_files_to_download(repository):
"""Return a list of file objects to be downloaded."""
files = []
tree = repository.tree
ref = f"{repository.ref}".replace("tags/", "")
releaseobjects = repository.releases.objects
category = repository.data.category
remotelocation = repository.content.path.remote
if should_try_releases(repository):
for release in releaseobjects or []:
if ref == release.tag_name:
for asset in release.assets or []:
files.append(asset)
if files:
return files
if repository.content.single:
for treefile in tree:
if treefile.filename == repository.data.file_name:
files.append(
FileInformation(treefile.download_url, treefile.full_path, treefile.filename)
)
return files
if category == "plugin":
for treefile in tree:
if treefile.path in ["", "dist"]:
if remotelocation == "dist" and not treefile.filename.startswith("dist"):
continue
if not remotelocation:
if not treefile.filename.endswith(".js"):
continue
if treefile.path != "":
continue
if not treefile.is_directory:
files.append(
FileInformation(
treefile.download_url, treefile.full_path, treefile.filename
)
)
if files:
return files
if repository.data.content_in_root:
if not repository.data.filename:
if category == "theme":
tree = filter_content_return_one_of_type(repository.tree, "", "yaml", "full_path")
for path in tree:
if path.is_directory:
continue
if path.full_path.startswith(repository.content.path.remote):
files.append(FileInformation(path.download_url, path.full_path, path.filename))
return files
async def download_zip_files(repository, validate):
"""Download ZIP archive from repository release."""
contents = []
queue = QueueManager()
try:
for release in repository.releases.objects:
repository.logger.info(f"ref: {repository.ref} --- tag: {release.tag_name}")
if release.tag_name == repository.ref.split("/")[1]:
contents = release.assets
if not contents:
return validate
for content in contents or []:
queue.add(async_download_zip_file(repository, content, validate))
await queue.execute()
except (Exception, BaseException) as exception: # pylint: disable=broad-except
validate.errors.append(f"Download was not completed [{exception}]")
return validate
async def async_download_zip_file(repository, content, validate):
"""Download ZIP archive from repository release."""
try:
filecontent = await async_download_file(content.download_url)
if filecontent is None:
validate.errors.append(f"[{content.name}] was not downloaded.")
return
result = await async_save_file(
f"{tempfile.gettempdir()}/{repository.data.filename}", filecontent
)
with zipfile.ZipFile(
f"{tempfile.gettempdir()}/{repository.data.filename}", "r"
) as zip_file:
zip_file.extractall(repository.content.path.local)
os.remove(f"{tempfile.gettempdir()}/{repository.data.filename}")
if result:
repository.logger.info(f"Download of {content.name} completed")
return
validate.errors.append(f"[{content.name}] was not downloaded.")
except (Exception, BaseException) as exception: # pylint: disable=broad-except
validate.errors.append(f"Download was not completed [{exception}]")
return validate
async def download_content(repository):
"""Download the content of a directory."""
queue = QueueManager()
contents = gather_files_to_download(repository)
repository.logger.debug(repository.data.filename)
if not contents:
raise HacsException("No content to download")
for content in contents:
if repository.data.content_in_root and repository.data.filename:
if content.name != repository.data.filename:
continue
queue.add(dowload_repository_content(repository, content))
await queue.execute()
return repository.validate
@concurrent(10)
async def dowload_repository_content(repository, content):
"""Download content."""
try:
repository.logger.debug(f"Downloading {content.name}")
filecontent = await async_download_file(content.download_url)
if filecontent is None:
repository.validate.errors.append(f"[{content.name}] was not downloaded.")
return
# Save the content of the file.
if repository.content.single or content.path is None:
local_directory = repository.content.path.local
else:
_content_path = content.path
if not repository.data.content_in_root:
_content_path = _content_path.replace(f"{repository.content.path.remote}", "")
local_directory = f"{repository.content.path.local}/{_content_path}"
local_directory = local_directory.split("/")
del local_directory[-1]
local_directory = "/".join(local_directory)
# Check local directory
pathlib.Path(local_directory).mkdir(parents=True, exist_ok=True)
local_file_path = (f"{local_directory}/{content.name}").replace("//", "/")
result = await async_save_file(local_file_path, filecontent)
if result:
repository.logger.info(f"Download of {content.name} completed")
return
repository.validate.errors.append(f"[{content.name}] was not downloaded.")
except (Exception, BaseException) as exception: # pylint: disable=broad-except
repository.validate.errors.append(f"Download was not completed [{exception}]")

View File

@@ -0,0 +1,53 @@
"""Filter functions."""
def filter_content_return_one_of_type(content, namestartswith, filterfiltype, attr="name"):
"""Only match 1 of the filter."""
contents = []
filetypefound = False
for filename in content:
if isinstance(filename, str):
if filename.startswith(namestartswith):
if filename.endswith(f".{filterfiltype}"):
if not filetypefound:
contents.append(filename)
filetypefound = True
continue
else:
contents.append(filename)
else:
if getattr(filename, attr).startswith(namestartswith):
if getattr(filename, attr).endswith(f".{filterfiltype}"):
if not filetypefound:
contents.append(filename)
filetypefound = True
continue
else:
contents.append(filename)
return contents
def find_first_of_filetype(content, filterfiltype, attr="name"):
"""Find the first of the file type."""
filename = ""
for _filename in content:
if isinstance(_filename, str):
if _filename.endswith(f".{filterfiltype}"):
filename = _filename
break
else:
if getattr(_filename, attr).endswith(f".{filterfiltype}"):
filename = getattr(_filename, attr)
break
return filename
def get_first_directory_in_directory(content, dirname):
"""Return the first directory in dirname or None."""
directory = None
for path in content:
if path.full_path.startswith(dirname) and path.full_path != dirname:
if path.is_directory:
directory = path.filename
break
return directory

View File

@@ -0,0 +1,37 @@
"""Helper to get default repositories."""
from typing import List
from aiogithubapi import (
GitHubAuthenticationException,
GitHubNotModifiedException,
GitHubRatelimitException,
)
from custom_components.hacs.const import REPOSITORY_HACS_DEFAULT
from custom_components.hacs.enums import HacsCategory, HacsDisabledReason
from custom_components.hacs.share import get_hacs
async def async_get_list_from_default(default: HacsCategory) -> List:
"""Get repositories from default list."""
hacs = get_hacs()
repositories = []
try:
repositories = await hacs.async_github_get_hacs_default_file(default)
hacs.log.debug("Got %s elements for %s", len(repositories), default)
except GitHubNotModifiedException:
hacs.log.debug("Content did not change for %s/%s", REPOSITORY_HACS_DEFAULT, default)
except GitHubRatelimitException as exception:
hacs.log.error(exception)
hacs.disable_hacs(HacsDisabledReason.RATE_LIMIT)
except GitHubAuthenticationException as exception:
hacs.log.error(exception)
hacs.disable_hacs(HacsDisabledReason.INVALID_TOKEN)
except BaseException as exception: # pylint: disable=broad-except
hacs.log.error(exception)
return repositories

View File

@@ -0,0 +1,230 @@
"""Return repository information if any."""
import json
from aiogithubapi import AIOGitHubAPIException, AIOGitHubAPINotModifiedException, GitHub
from aiogithubapi.const import ACCEPT_HEADERS
from custom_components.hacs.exceptions import HacsException, HacsNotModifiedException
from custom_components.hacs.helpers.functions.template import render_template
from custom_components.hacs.share import get_hacs
def info_file(repository):
"""get info filename."""
if repository.data.render_readme:
for filename in ["readme", "readme.md", "README", "README.md", "README.MD"]:
if filename in repository.treefiles:
return filename
return ""
for filename in ["info", "info.md", "INFO", "INFO.md", "INFO.MD"]:
if filename in repository.treefiles:
return filename
return ""
async def get_info_md_content(repository):
"""Get the content of info.md"""
filename = info_file(repository)
if not filename:
return ""
try:
info = await repository.repository_object.get_contents(filename, repository.ref)
if info is None:
return ""
info = info.content.replace("<svg", "<disabled").replace("</svg", "</disabled")
return render_template(info, repository)
except (
ValueError,
AIOGitHubAPIException,
Exception, # pylint: disable=broad-except
):
if repository.hacs.system.action:
raise HacsException("::error:: No info file found")
return ""
async def get_repository(session, token, repository_full_name, etag=None):
"""Return a repository object or None."""
hacs = get_hacs()
try:
github = GitHub(
token,
session,
headers={
"User-Agent": f"HACS/{hacs.version}",
"Accept": ACCEPT_HEADERS["preview"],
},
)
repository = await github.get_repo(repository_full_name, etag)
return repository, github.client.last_response.etag
except AIOGitHubAPINotModifiedException as exception:
raise HacsNotModifiedException(exception) from exception
except (ValueError, AIOGitHubAPIException, Exception) as exception:
raise HacsException(exception) from exception
async def get_tree(repository, ref):
"""Return the repository tree."""
try:
tree = await repository.get_tree(ref)
return tree
except (ValueError, AIOGitHubAPIException) as exception:
raise HacsException(exception)
async def get_releases(repository, prerelease=False, returnlimit=5):
"""Return the repository releases."""
try:
releases = await repository.get_releases(prerelease, returnlimit)
return releases
except (ValueError, AIOGitHubAPIException) as exception:
raise HacsException(exception)
def get_frontend_version():
"""get the frontend version from the manifest."""
manifest = read_hacs_manifest()
frontend = 0
for requirement in manifest.get("requirements", []):
if requirement.startswith("hacs_frontend"):
frontend = requirement.split("==")[1]
break
return frontend
def read_hacs_manifest():
"""Reads the HACS manifest file and returns the contents."""
hacs = get_hacs()
content = {}
with open(f"{hacs.core.config_path}/custom_components/hacs/manifest.json") as manifest:
content = json.loads(manifest.read())
return content
async def get_integration_manifest(repository):
"""Return the integration manifest."""
if repository.data.content_in_root:
manifest_path = "manifest.json"
else:
manifest_path = f"{repository.content.path.remote}/manifest.json"
if not manifest_path in [x.full_path for x in repository.tree]:
raise HacsException(f"No file found '{manifest_path}'")
try:
manifest = await repository.repository_object.get_contents(manifest_path, repository.ref)
manifest = json.loads(manifest.content)
except (Exception, BaseException) as exception: # pylint: disable=broad-except
raise HacsException(f"Could not read manifest.json [{exception}]")
try:
repository.integration_manifest = manifest
repository.data.authors = manifest["codeowners"]
repository.data.domain = manifest["domain"]
repository.data.manifest_name = manifest["name"]
repository.data.config_flow = manifest.get("config_flow", False)
if repository.hacs.system.action:
if manifest.get("documentation") is None:
raise HacsException("::error:: manifest.json is missing documentation")
if manifest.get("homeassistant") is not None:
raise HacsException(
"::error:: The homeassistant key in manifest.json is no longer valid"
)
# if manifest.get("issue_tracker") is None:
# raise HacsException("The 'issue_tracker' is missing in manifest.json")
# Set local path
repository.content.path.local = repository.localpath
except KeyError as exception:
raise HacsException(f"Missing expected key {exception} in '{manifest_path}'")
def find_file_name(repository):
"""Get the filename to target."""
if repository.data.category == "plugin":
get_file_name_plugin(repository)
elif repository.data.category == "integration":
get_file_name_integration(repository)
elif repository.data.category == "theme":
get_file_name_theme(repository)
elif repository.data.category == "appdaemon":
get_file_name_appdaemon(repository)
elif repository.data.category == "python_script":
get_file_name_python_script(repository)
if repository.hacs.system.action:
repository.logger.info(f"filename {repository.data.file_name}")
repository.logger.info(f"location {repository.content.path.remote}")
def get_file_name_plugin(repository):
"""Get the filename to target."""
tree = repository.tree
releases = repository.releases.objects
if repository.data.content_in_root:
possible_locations = [""]
else:
possible_locations = ["release", "dist", ""]
# Handler for plug requirement 3
if repository.data.filename:
valid_filenames = [repository.data.filename]
else:
valid_filenames = [
f"{repository.data.name.replace('lovelace-', '')}.js",
f"{repository.data.name}.js",
f"{repository.data.name}.umd.js",
f"{repository.data.name}-bundle.js",
]
for location in possible_locations:
if location == "release":
if not releases:
continue
release = releases[0]
if not release.assets:
continue
asset = release.assets[0]
for filename in valid_filenames:
if filename == asset.name:
repository.data.file_name = filename
repository.content.path.remote = "release"
break
else:
for filename in valid_filenames:
if f"{location+'/' if location else ''}{filename}" in [x.full_path for x in tree]:
repository.data.file_name = filename.split("/")[-1]
repository.content.path.remote = location
break
def get_file_name_integration(repository):
"""Get the filename to target."""
def get_file_name_theme(repository):
"""Get the filename to target."""
tree = repository.tree
for treefile in tree:
if treefile.full_path.startswith(
repository.content.path.remote
) and treefile.full_path.endswith(".yaml"):
repository.data.file_name = treefile.filename
def get_file_name_appdaemon(repository):
"""Get the filename to target."""
def get_file_name_python_script(repository):
"""Get the filename to target."""
tree = repository.tree
for treefile in tree:
if treefile.full_path.startswith(
repository.content.path.remote
) and treefile.full_path.endswith(".py"):
repository.data.file_name = treefile.filename

View File

@@ -0,0 +1,10 @@
"""Helper to check if path is safe to remove."""
from custom_components.hacs.share import get_hacs
from ...utils.path import is_safe
def is_safe_to_remove(path: str) -> bool:
"""Helper to check if path is safe to remove."""
hacs = get_hacs()
return is_safe(hacs, path)

View File

@@ -0,0 +1,35 @@
"""Helper functions: misc"""
import re
from ...utils import version
RE_REPOSITORY = re.compile(
r"(?:(?:.*github.com.)|^)([A-Za-z0-9-]+\/[\w.-]+?)(?:(?:\.git)?|(?:[^\w.-].*)?)$"
)
def get_repository_name(repository) -> str:
"""Return the name of the repository for use in the frontend."""
if repository.repository_manifest.name is not None:
return repository.repository_manifest.name
if repository.data.category == "integration":
if repository.integration_manifest:
if "name" in repository.integration_manifest:
return repository.integration_manifest["name"]
return repository.data.full_name.split("/")[-1].replace("-", " ").replace("_", " ").title()
def version_left_higher_then_right(left: str, right: str) -> bool:
"""Return a bool if source is newer than target, will also be true if identical."""
return version.version_left_higher_then_right(left, right)
def extract_repository_from_url(url: str) -> str or None:
"""Extract the owner/repo part form a URL."""
match = re.match(RE_REPOSITORY, url)
if not match:
return None
return match.group(1).lower()

View File

@@ -0,0 +1,13 @@
# pylint: disable=missing-class-docstring,missing-module-docstring,missing-function-docstring,no-member
import os
from custom_components.hacs.share import get_hacs
def path_exsist(path) -> bool:
return os.path.exists(path)
async def async_path_exsist(path) -> bool:
hass = get_hacs().hass
return await hass.async_add_executor_job(path_exsist, path)

View File

@@ -0,0 +1,68 @@
"""Register a repository."""
from __future__ import annotations
from typing import TYPE_CHECKING
from aiogithubapi import AIOGitHubAPIException
from custom_components.hacs.exceptions import (
HacsException,
HacsExpectedException,
HacsRepositoryExistException,
)
from custom_components.hacs.share import get_hacs
from ...repositories import RERPOSITORY_CLASSES
if TYPE_CHECKING:
from ..classes.repository import HacsRepository
# @concurrent(15, 5)
async def register_repository(full_name, category, check=True, ref=None):
"""Register a repository."""
hacs = get_hacs()
if full_name in hacs.common.skip:
if full_name != "hacs/integration":
raise HacsExpectedException(f"Skipping {full_name}")
if category not in RERPOSITORY_CLASSES:
raise HacsException(f"{category} is not a valid repository category.")
repository: HacsRepository = RERPOSITORY_CLASSES[category](full_name)
if check:
try:
await repository.async_registration(ref)
if hacs.status.new:
repository.data.new = False
if repository.validate.errors:
hacs.common.skip.append(repository.data.full_name)
if not hacs.status.startup:
hacs.log.error("Validation for %s failed.", full_name)
if hacs.system.action:
raise HacsException(f"::error:: Validation for {full_name} failed.")
return repository.validate.errors
if hacs.system.action:
repository.logger.info("%s Validation completed", repository)
else:
repository.logger.info("%s Registration completed", repository)
except HacsRepositoryExistException:
return
except AIOGitHubAPIException as exception:
hacs.common.skip.append(repository.data.full_name)
raise HacsException(f"Validation for {full_name} failed with {exception}.") from None
if str(repository.data.id) != "0" and (exists := hacs.get_by_id(repository.data.id)):
hacs.async_remove_repository(exists)
else:
if hacs.hass is not None and ((check and repository.data.new) or hacs.status.new):
hacs.hass.bus.async_fire(
"hacs/repository",
{
"action": "registration",
"repository": repository.data.full_name,
"repository_id": repository.data.id,
},
)
hacs.async_add_repository(repository)

View File

@@ -0,0 +1,50 @@
"""Download."""
import gzip
import os
import shutil
import aiofiles
from custom_components.hacs.utils.logger import getLogger
_LOGGER = getLogger()
async def async_save_file(location, content):
"""Save files."""
_LOGGER.debug("Saving %s", location)
mode = "w"
encoding = "utf-8"
errors = "ignore"
if not isinstance(content, str):
mode = "wb"
encoding = None
errors = None
try:
async with aiofiles.open(location, mode=mode, encoding=encoding, errors=errors) as outfile:
await outfile.write(content)
outfile.close()
# Create gz for .js files
if os.path.isfile(location):
if location.endswith(".js") or location.endswith(".css"):
with open(location, "rb") as f_in:
with gzip.open(location + ".gz", "wb") as f_out:
shutil.copyfileobj(f_in, f_out)
# Remove with 2.0
if "themes" in location and location.endswith(".yaml"):
filename = location.split("/")[-1]
base = location.split("/themes/")[0]
combined = f"{base}/themes/{filename}"
if os.path.exists(combined):
_LOGGER.info("Removing old theme file %s", combined)
os.remove(combined)
except (Exception, BaseException) as error: # pylint: disable=broad-except
_LOGGER.error("Could not write data to %s - %s", location, error)
return False
return os.path.exists(location)

View File

@@ -0,0 +1,79 @@
"""Storage handers."""
# pylint: disable=import-outside-toplevel
from homeassistant.helpers.json import JSONEncoder
from homeassistant.helpers.storage import Store
from homeassistant.util import json as json_util
from custom_components.hacs.const import VERSION_STORAGE
from ...utils.logger import getLogger
_LOGGER = getLogger()
class HACSStore(Store):
"""A subclass of Store that allows multiple loads in the executor."""
def load(self):
"""Load the data from disk if version matches."""
data = json_util.load_json(self.path)
if data == {} or data["version"] != self.version:
return None
return data["data"]
def get_store_key(key):
"""Return the key to use with homeassistant.helpers.storage.Storage."""
return key if "/" in key else f"hacs.{key}"
def _get_store_for_key(hass, key, encoder):
"""Create a Store object for the key."""
return HACSStore(hass, VERSION_STORAGE, get_store_key(key), encoder=encoder)
def get_store_for_key(hass, key):
"""Create a Store object for the key."""
return _get_store_for_key(hass, key, JSONEncoder)
async def async_load_from_store(hass, key):
"""Load the retained data from store and return de-serialized data."""
return await get_store_for_key(hass, key).async_load() or {}
async def async_save_to_store_default_encoder(hass, key, data):
"""Generate store json safe data to the filesystem.
The data is expected to be encodable with the default
python json encoder. It should have already been passed through
JSONEncoder if needed.
"""
await _get_store_for_key(hass, key, None).async_save(data)
async def async_save_to_store(hass, key, data):
"""Generate dynamic data to store and save it to the filesystem.
The data is only written if the content on the disk has changed
by reading the existing content and comparing it.
If the data has changed this will generate two executor jobs
If the data has not changed this will generate one executor job
"""
current = await async_load_from_store(hass, key)
if current is None or current != data:
await get_store_for_key(hass, key).async_save(data)
return
_LOGGER.debug(
"Did not store data for '%s'. Content did not change",
get_store_key(key),
)
async def async_remove_store(hass, key):
"""Remove a store element that should no longer be used."""
if "/" not in key:
return
await get_store_for_key(hass, key).async_remove()

View File

@@ -0,0 +1,32 @@
"""Custom template support."""
# pylint: disable=broad-except
from jinja2 import Template
from custom_components.hacs.utils.logger import getLogger
_LOGGER = getLogger()
def render_template(content, context):
"""Render templates in content."""
# Fix None issues
if context.releases.last_release_object is not None:
prerelease = context.releases.last_release_object.prerelease
else:
prerelease = False
# Render the template
try:
render = Template(content)
render = render.render(
installed=context.data.installed,
pending_update=context.pending_upgrade,
prerelease=prerelease,
selected_tag=context.data.selected_tag,
version_available=context.releases.last_release,
version_installed=context.display_installed_version,
)
return render
except (Exception, BaseException) as exception:
_LOGGER.debug(exception)
return content

View File

@@ -0,0 +1,124 @@
"""Helper to do common validation for repositories."""
from __future__ import annotations
from typing import TYPE_CHECKING
from aiogithubapi import AIOGitHubAPIException
from custom_components.hacs.exceptions import (
HacsException,
HacsNotModifiedException,
HacsRepositoryArchivedException,
HacsRepositoryExistException,
)
from custom_components.hacs.helpers.functions.information import (
get_releases,
get_repository,
get_tree,
)
from custom_components.hacs.helpers.functions.version_to_install import (
version_to_install,
)
from custom_components.hacs.share import get_hacs, is_removed
if TYPE_CHECKING:
from custom_components.hacs.helpers.classes.repository import HacsRepository
async def common_validate(repository, ignore_issues=False):
"""Common validation steps of the repository."""
repository.validate.errors = []
# Make sure the repository exist.
repository.logger.debug("%s Checking repository.", repository)
await common_update_data(repository, ignore_issues)
# Step 6: Get the content of hacs.json
await repository.get_repository_manifest_content()
async def common_update_data(repository: HacsRepository, ignore_issues=False, force=False):
"""Common update data."""
hacs = get_hacs()
releases = []
try:
repository_object, etag = await get_repository(
hacs.session,
hacs.configuration.token,
repository.data.full_name,
etag=None if force or repository.data.installed else repository.data.etag_repository,
)
repository.repository_object = repository_object
if repository.data.full_name.lower() != repository_object.full_name.lower():
hacs.common.renamed_repositories[
repository.data.full_name
] = repository_object.full_name
if str(repository_object.id) not in hacs.common.default:
hacs.common.default.append(str(repository_object.id))
raise HacsRepositoryExistException
repository.data.update_data(repository_object.attributes)
repository.data.etag_repository = etag
except HacsNotModifiedException:
return
except HacsRepositoryExistException:
raise HacsRepositoryExistException from None
except (AIOGitHubAPIException, HacsException) as exception:
if not hacs.status.startup:
repository.logger.error("%s %s", repository, exception)
if not ignore_issues:
repository.validate.errors.append("Repository does not exist.")
raise HacsException(exception) from None
# Make sure the repository is not archived.
if repository.data.archived and not ignore_issues:
repository.validate.errors.append("Repository is archived.")
hacs.common.archived_repositories.append(repository.data.full_name)
raise HacsRepositoryArchivedException("Repository is archived.")
# Make sure the repository is not in the blacklist.
if is_removed(repository.data.full_name) and not ignore_issues:
repository.validate.errors.append("Repository is in the blacklist.")
raise HacsException("Repository is in the blacklist.")
# Get releases.
try:
releases = await get_releases(
repository.repository_object,
repository.data.show_beta,
hacs.configuration.release_limit,
)
if releases:
repository.data.releases = True
repository.releases.objects = [x for x in releases if not x.draft]
repository.data.published_tags = [x.tag_name for x in repository.releases.objects]
repository.data.last_version = next(iter(repository.data.published_tags))
except (AIOGitHubAPIException, HacsException):
repository.data.releases = False
if not repository.force_branch:
repository.ref = version_to_install(repository)
if repository.data.releases:
for release in repository.releases.objects or []:
if release.tag_name == repository.ref:
assets = release.assets
if assets:
downloads = next(iter(assets)).attributes.get("download_count")
repository.data.downloads = downloads
repository.logger.debug(
"%s Running checks against %s", repository, repository.ref.replace("tags/", "")
)
try:
repository.tree = await get_tree(repository.repository_object, repository.ref)
if not repository.tree:
raise HacsException("No files in tree")
repository.treefiles = []
for treefile in repository.tree:
repository.treefiles.append(treefile.full_path)
except (AIOGitHubAPIException, HacsException) as exception:
if not hacs.status.startup:
repository.logger.error("%s %s", repository, exception)
if not ignore_issues:
raise HacsException(exception) from None

View File

@@ -0,0 +1,20 @@
"""Install helper for repositories."""
def version_to_install(repository):
"""Determine which version to isntall."""
if repository.data.last_version is not None:
if repository.data.selected_tag is not None:
if repository.data.selected_tag == repository.data.last_version:
repository.data.selected_tag = None
return repository.data.last_version
return repository.data.selected_tag
return repository.data.last_version
if repository.data.selected_tag is not None:
if repository.data.selected_tag == repository.data.default_branch:
return repository.data.default_branch
if repository.data.selected_tag in repository.data.published_tags:
return repository.data.selected_tag
if repository.data.default_branch is None:
return "main"
return repository.data.default_branch