Source code for looker_powerpoint.looker

import logging
from typing import Optional
import looker_sdk
from dotenv import load_dotenv, find_dotenv
from looker_sdk import models40 as models
from tenacity import retry, stop_after_attempt, wait_fixed, before_sleep_log
import json


[docs] class LookerClient:
[docs] def __init__(self): load_dotenv(find_dotenv(usecwd=True)) try: self.client = looker_sdk.init40() # or init40() for the v4.0 API except looker_sdk.error.SDKError as e: logging.error( f"Error initializing Looker SDK: {e} Consider adding a looker.ini file, or setting the LOOKERSDK_BASE_URL, LOOKERSDK_CLIENT_ID, and LOOKERSDK_CLIENT_SECRET environment variables." ) exit(1)
[docs] async def run_query(self, query_object): """ Runs a query against the Looker API. Args: query_object: The query object containing the necessary parameters. """ response = self.client.run_inline_query( result_format=query_object["result_format"], body=query_object["body"], apply_vis=query_object["apply_vis"], apply_formatting=query_object["apply_formatting"], server_table_calcs=query_object["server_table_calcs"], ) return response
[docs] async def make_query( self, shape_id: int, filter: Optional[str] = None, filter_value: Optional[str] = None, filter_overwrites: Optional[dict] = None, id: Optional[int] = None, **kwargs, ) -> models.WriteQuery: """ Constructs a WriteQuery object based on a Look's definition and provided parameters. Args: id: The ID of the Look. filter: The name of the filter to apply. filter_value: The value to set for the filter. filter_overwrites: A dictionary of filters to overwrite with new values. **kwargs: Additional query parameters to set. Returns: A WriteQuery object representing the modified query. """ try: # check if string can be converted to int look = self.client.look(id) except Exception as e: logging.error( f"Error fetching Look with ID {id}, is this a valid Look ID? If it is a meta reference, remember to set id_type: 'meta'" ) return {shape_id: None} q = look.query for parameter, value in kwargs.items(): if value is not None: if hasattr(q, parameter): # If the parameter is a list, append the value if isinstance(getattr(q, parameter), list): getattr(q, parameter).append(value) else: # Otherwise, set the value directly setattr(q, parameter, value) if filter_overwrites is not None: for f, v in filter_overwrites.items(): logging.info(f"Overwriting filter {f} with value {v}") if hasattr(q, "filters"): filterable = False for _, existing_filter in enumerate(q.filters): if existing_filter == f: filterable = True if filterable: q.filters[f] = v else: logging.warning( f"Overwrite filter {f} not found in query filters. Available filters: {q.filters}" ) if filter_value is not None and filter is not None: logging.info(f"Applying filter {filter} with value {filter_value}") # If filter_value is provided, set the filter if hasattr(q, "filters"): filterable = False for _, f in enumerate(q.filters): # print(f, filter) if f == filter: filterable = True if filterable: q.filters[filter] = filter_value else: logging.warning( f"Filter {filter} not found in query filters. Available filters: {q.filters}" ) body = models.WriteQuery( model=q.model, view=q.view, fields=q.fields, pivots=q.pivots, fill_fields=q.fill_fields, filters=q.filters, sorts=q.sorts, limit=q.limit, column_limit=q.column_limit, total=q.total, row_total=q.row_total, subtotals=q.subtotals, dynamic_fields=q.dynamic_fields, query_timezone=q.query_timezone, vis_config=q.vis_config, visible_ui_sections=q.visible_ui_sections, ) result_format = kwargs.get("result_format", "json_bi") apply_vis = kwargs.get("apply_vis", False) apply_formatting = kwargs.get("apply_formatting", False) server_table_calcs = kwargs.get("server_table_calcs", False) retries = kwargs.get("retries", 0) query_object = { "shape_id": shape_id, "query": { "result_format": result_format, "body": body, "apply_vis": apply_vis, "apply_formatting": apply_formatting, "server_table_calcs": server_table_calcs, }, } try: @retry( stop=stop_after_attempt(retries + 1), wait=wait_fixed(2), before_sleep=before_sleep_log(logging.getLogger(), logging.WARNING), reraise=True, ) async def run_query_with_retry(): return await self.run_query(query_object["query"]) result = await run_query_with_retry() if result and result_format in ["json", "json_bi"]: try: parsed = json.loads(result) if isinstance(parsed, dict): # Pack the sorts and pivots into the payload parsed["custom_sorts"] = list(q.sorts) if q.sorts else [] parsed["custom_pivots"] = list(q.pivots) if q.pivots else [] result = json.dumps(parsed) except (json.JSONDecodeError, TypeError, ValueError) as e: logging.warning( "Failed to inject custom_sorts/custom_pivots for shape_id %s, look_id %s: %s", shape_id, id, e, exc_info=True, ) except looker_sdk.error.SDKError as e: logging.error(f"Error retrieving Look with ID {id} : {e}") result = None except Exception as e: logging.error(f"Unexpected error retrieving Look with ID {id} : {e}") result = None return {shape_id: result}
async def _async_write_queries(self, shape_id, filter_value=None, **kwargs): """ Asynchronously write a Looker query by its ID. Args: table: A dictionary containing the look_id and other parameters. Returns: The fetched look data. """ return await self.make_query( shape_id, filter_value=filter_value, **dict(kwargs) )