gm/tools/lib/azure_container.py

75 lines
2.4 KiB
Python

import os
from datetime import datetime, timedelta
from functools import lru_cache
from pathlib import Path
from typing import IO, Union
TOKEN_PATH = Path("/data/azure_token")
@lru_cache
def get_azure_credential():
if "AZURE_TOKEN" in os.environ:
return os.environ["AZURE_TOKEN"]
elif TOKEN_PATH.is_file():
return TOKEN_PATH.read_text().strip()
else:
from azure.identity import AzureCliCredential
return AzureCliCredential()
@lru_cache
def get_container_sas(account_name: str, container_name: str):
from azure.storage.blob import BlobServiceClient, ContainerSasPermissions, generate_container_sas
start_time = datetime.utcnow()
expiry_time = start_time + timedelta(hours=1)
blob_service = BlobServiceClient(
account_url=f"https://{account_name}.blob.core.windows.net",
credential=get_azure_credential(),
)
return generate_container_sas(
account_name,
container_name,
user_delegation_key=blob_service.get_user_delegation_key(start_time, expiry_time),
permission=ContainerSasPermissions(read=True, write=True, list=True),
expiry=expiry_time,
)
class AzureContainer:
def __init__(self, account, container):
self.ACCOUNT = account
self.CONTAINER = container
@property
def ACCOUNT_URL(self) -> str:
return f"https://{self.ACCOUNT}.blob.core.windows.net"
@property
def BASE_URL(self) -> str:
return f"{self.ACCOUNT_URL}/{self.CONTAINER}/"
def get_client_and_key(self):
from azure.storage.blob import ContainerClient
client = ContainerClient(self.ACCOUNT_URL, self.CONTAINER, credential=get_azure_credential())
key = get_container_sas(self.ACCOUNT, self.CONTAINER)
return client, key
def get_url(self, route_name: str, segment_num, log_type="rlog") -> str:
ext = "hevc" if log_type.endswith('camera') else "bz2"
return self.BASE_URL + f"{route_name.replace('|', '/')}/{segment_num}/{log_type}.{ext}"
def upload_bytes(self, data: Union[bytes, IO], blob_name: str) -> str:
from azure.storage.blob import BlobClient
blob = BlobClient(
account_url=self.ACCOUNT_URL,
container_name=self.CONTAINER,
blob_name=blob_name,
credential=get_azure_credential(),
overwrite=False,
)
blob.upload_blob(data)
return self.BASE_URL + blob_name
def upload_file(self, path: Union[str, os.PathLike], blob_name: str) -> str:
with open(path, "rb") as f:
return self.upload_bytes(f, blob_name)