import argparse
import asyncio
import collections
import datetime
import io
import json
import logging
import os
import re
import subprocess
from io import BytesIO
import pandas as pd
from lxml import etree
from PIL import Image
from pptx import Presentation
from pptx.chart.data import CategoryChartData
from pptx.dml.color import RGBColor
from pptx.util import Pt
from pydantic import ValidationError
from rich.logging import RichHandler
from rich_argparse import RichHelpFormatter
import requests
from looker_powerpoint import gemini as gemini_module
from looker_powerpoint.looker import LookerClient
from looker_powerpoint.models import LookerShape, GeminiShape
from looker_powerpoint.tools.find_alt_text import (
get_presentation_objects_with_descriptions,
)
from looker_powerpoint.tools.pptx_text_handler import (
process_text_field,
update_text_frame_preserving_formatting,
)
NS = {"p": "http://schemas.openxmlformats.org/presentationml/2006/main"}
[docs]
class Cli:
# color with rich
HEADER = """
Looker PowerPoint CLI :
A command line interface for Looker PowerPoint integration.
"""
[docs]
def __init__(self):
self.client = None
self.relevant_shapes = []
self.looker_shapes = []
self.gemini_shapes = []
self.data = {}
# Initialize the argument parser
self.parser = self._init_argparser()
# load tools
self.get_alt_text = get_presentation_objects_with_descriptions
def _init_looker(self):
"""Initialize the Looker client"""
if not self.args.debug_queries:
logging.getLogger("looker_sdk").setLevel(logging.ERROR)
self.client = LookerClient()
def _init_argparser(self):
"""Create and configure the argument parser"""
parser = argparse.ArgumentParser(
description=self.HEADER,
formatter_class=RichHelpFormatter,
)
# todo
parser.add_argument(
"--file-path",
"-f",
help="Path to the PowerPoint file to process",
default=None,
type=str,
)
# todo
parser.add_argument(
"--output-dir",
"-o",
help="""Path to a directory that will contain the generated pptx files. \n
.env: OUTPUT_DIR""",
default="output",
type=str,
)
# todo
parser.add_argument(
"--add-links",
help="Add links to looker in the slides. \n .env: ADD_LINKS",
action="store_true",
default=False,
)
# todo
parser.add_argument(
"--hide-errors",
help="""
Stop showing red outlines around shapes with errors. \n
.env: HIDE_ERRORS
""",
action="store_true",
default=False,
)
# todo
parser.add_argument(
"--parse-date-syntax-in-filename",
"-p",
help="""Parse date syntax in the filename. \n
.env: PARSE_DATE_SYNTAX_IN_FILENAME
""",
action="store_true",
default=True,
)
parser.add_argument(
"--self",
"-s",
help="""Replace the powerpoint file directly instead of creating a new file. \n
.env: SELF""",
action="store_true",
default=False,
)
parser.add_argument(
"--quiet",
"-q",
help="""Do not open the PowerPoint file after processing. \n
.env: QUIET""",
action="store_true",
default=False,
)
parser.add_argument(
"--filter",
help="""use the string to filter shapes if they have a set filter dimension""",
action="store",
default=None,
type=str,
)
parser.add_argument(
"--debug-queries",
help="""Enable debugging for Looker queries. \n
.env: DEBUG_QUERIES""",
action="store_true",
default=False,
)
parser.add_argument(
"-v",
"--verbose",
action="count",
default=0,
help="Increase verbosity (e.g., -v, -vv, -vvv)",
)
return parser
def _setup_logging(self):
if self.args.verbose == 0:
level = logging.WARNING
elif self.args.verbose == 1:
level = logging.INFO
else:
level = logging.DEBUG
logging.basicConfig(
level=level,
format="%(message)s",
datefmt="[%X]",
handlers=[RichHandler()],
)
def _pick_file(self):
"""
Picks the PowerPoint file to process.
If no file path is provided, it looks for the first .pptx file in the current directory.
Returns:
str: The path to the PowerPoint file.
"""
self.file_path = self.args.file_path
if self.file_path:
try:
self.presentation = Presentation(self.file_path)
except Exception as e:
logging.error(f"Error opening {self.file_path}: {e}")
else:
# If no file path is provided look for a file in the current directory
files = [
f
for f in os.listdir(".")
if f.endswith(".pptx") and not f.startswith("~$")
]
if files:
self.file_path = files[0]
logging.warning(
f"No file path provided, using first found file: {self.file_path}. To specify a file, use the -f flag like 'lpt -f <file_path>'."
)
try:
self.presentation = Presentation(self.file_path)
except Exception as e:
logging.error(f"Error opening {self.file_path}: {e}")
else:
logging.error(
"""
No PowerPoint file found in the current directory, closing.
Run from a directory with a .pptx file, or
specify file using -f flag like 'lpt -f <file_path>'.
"""
)
exit(1)
def _fill_table(self, table, df, headers=True):
"""
Fills a PowerPoint table with data from a DataFrame.
Args:
table: A Table object from pptx.
df: A pandas DataFrame containing the data to fill the table.
"""
# Get table dimensions
table_rows = len(table.rows)
table_cols = len(table.columns)
# Get DataFrame dimensions
df_rows = df.shape[0] + 1 # +1 for header
df_cols = df.shape[1]
# Determine how much we can fill
rows_to_fill = min(table_rows, df_rows)
cols_to_fill = min(table_cols, df_cols)
# Fill header row
if headers:
for col_idx in range(cols_to_fill):
table.cell(0, col_idx).text = str(df.columns[col_idx])
# Fill DataFrame values
for row_idx in range(1, rows_to_fill): # skip header row
for col_idx in range(cols_to_fill):
value = df.iloc[row_idx - 1, col_idx]
table.cell(row_idx, col_idx).text = str(value)
# Optional: Clear unused cells
for row_idx in range(rows_to_fill, table_rows):
for col_idx in range(table_cols):
table.cell(row_idx, col_idx).text = ""
for col_idx in range(cols_to_fill, table_cols):
for row_idx in range(table_rows):
table.cell(row_idx, col_idx).text = ""
def _set_alt_text(self, shape, data):
"""
Sets the alternative text description for a shape's XML.
Args:
shape: A Shape object from pptx.
data: A Python object (dict, list, etc.) to serialize and set as YAML in the descr attribute.
"""
xml_str = shape.element.xml
xml_elem = etree.fromstring(xml_str)
import yaml
# convert pydantic model to dict
if isinstance(data, dict) is False:
data = data.model_dump()
data = {k: v for k, v in data.items() if v is not None}
data = yaml.dump(data)
# remove None values from data, and convert to string with newlines for YAML compatibility
for path in [
".//p:nvSpPr/p:cNvPr",
".//p:nvPicPr/p:cNvPr",
".//p:nvGraphicFramePr/p:cNvPr",
]:
cNvPr_elements = xml_elem.xpath(path, namespaces=NS)
if cNvPr_elements:
cNvPr = cNvPr_elements[0]
yaml_text = str(data)
cNvPr.set("descr", yaml_text)
# Overwrite the element in the actual pptx shape with updated XML
shape_element = shape.element
new_element = etree.fromstring(etree.tostring(xml_elem))
shape_element.clear()
for child in new_element:
shape_element.append(child)
return
raise ValueError("No compatible cNvPr element found to set descr.")
def _mark_failure(self, slide, shape):
line_color_rgb = (255, 0, 0) # RGB color for
line_width_pt = 2 # Width of the circle outline in points
# Calculate circle position - centered on the shape
# Add an oval shape (circle)
circle = slide.shapes.add_shape(
autoshape_type_id=1, # MSO_SHAPE_OVAL (value 1)
left=shape.left,
top=shape.top,
width=shape.width,
height=shape.height,
)
# Set no fill for the circle (transparent inside)
circle.fill.background() # or circle.fill.solid() + set transparency
# Set outline color and width
circle.line.color.rgb = RGBColor(*line_color_rgb)
circle.line.width = Pt(line_width_pt)
self._set_alt_text(
circle,
{"parent_shape_id": shape.shape_id, "meta": True},
)
def _select_slice_from_df(self, df, integration):
"""
Selects a specific slice from the DataFrame based on the integration settings.
Args:
df: A pandas DataFrame containing the data.
integration: A LookerReference object containing the integration settings.
Returns:
The selected data slice (str or other type).
"""
if integration.row is not None:
row_slice = integration.row
else:
row_slice = 0
row = df.iloc[row_slice]
if integration.label is not None and integration.column is not None:
logging.warning(
f"Both label and column are set for integration {integration.id}. Defaulting to label and ignoring column."
)
r = row[integration.label]
elif integration.label is not None:
r = row[integration.label]
elif integration.column is not None:
r = row.iloc[integration.column]
else:
r = df
return r
def _replace_image_with_object(
self, slide_index, shape_number, image_stream, integration
):
slide = self.presentation.slides[slide_index]
old_shape = next((s for s in slide.shapes if s.shape_id == shape_number), None)
if old_shape is None:
raise ValueError(f"Shape {shape_number} not found on slide {slide_index}.")
if old_shape.shape_type != 13: # picture
raise ValueError("Selected shape is not an image.")
left, top, width, height = (
old_shape.left,
old_shape.top,
old_shape.width,
old_shape.height,
)
slide.shapes._spTree.remove(old_shape._element)
# --- calculate scaled size preserving aspect ratio ---
img_bytes = image_stream.getvalue()
image_stream.seek(0)
with Image.open(BytesIO(img_bytes)) as im:
img_w, img_h = im.size
img_ratio = img_w / img_h
shape_ratio = width / height
if img_ratio > shape_ratio:
new_width = width
new_height = int(width / img_ratio)
else:
new_height = height
new_width = int(height * img_ratio)
# center within original box
new_left = left + (width - new_width) / 2
new_top = top + (height - new_height) / 2
picture = slide.shapes.add_picture(
BytesIO(img_bytes), new_left, new_top, width=new_width, height=new_height
)
self._set_alt_text(picture, integration)
def _remove_shape(self, slide_index, shape_number):
"""
Removes a shape from a PowerPoint slide.
Args:
prs: The Presentation object.
slide_index: The index of the slide containing the shape.
shape_index: The index of the shape to remove.
"""
slide = self.presentation.slides[slide_index]
shape_to_remove = None
for shape in slide.shapes:
if shape.shape_id == shape_number:
shape_to_remove = shape
if shape_to_remove is None:
raise ValueError(
f"Shape with number {shape_number} not found on slide {slide_index}."
)
# Remove the shape
slide.shapes._spTree.remove(shape_to_remove._element)
def _format_context_data(self, df) -> str:
"""
Format a pandas DataFrame as a human-readable plain-text table for use
as Gemini context.
Args:
df: A pandas DataFrame.
Returns:
str: A plain-text representation of the DataFrame.
"""
return df.to_string(index=False)
def _extract_slide_text_context(
self, slide_number: int, exclude_shape_id: int
) -> str:
"""
Return a plain-text extract of all text on a slide, excluding the target shape.
Only shapes that have a text_frame are included. Each shape is rendered as:
``[Shape name]: text content``
Args:
slide_number: The index of the slide.
exclude_shape_id: The shape_id of the shape to exclude (the Gemini shape itself).
Returns:
str: Multi-line text representing the slide content.
"""
slide = self.presentation.slides[slide_number]
lines: list[str] = []
for shape in slide.shapes:
if shape.shape_id == exclude_shape_id:
continue
if hasattr(shape, "text_frame"):
text = shape.text_frame.text.strip()
if text:
name = getattr(shape, "name", f"shape_{shape.shape_id}")
lines.append(f"[{name}]: {text}")
return "\n".join(lines)
def _resolve_context_item(
self,
ctx: str,
shape_number: int,
slide_number: int,
gemini_results: dict,
current_text: str,
) -> tuple | None:
"""
Resolve a single ``contexts`` entry to a ``(label, content)`` pair.
Resolution rules (checked in order):
1. ``"self"`` — the shape's own current text before synthesis.
2. ``"slide_self"`` — text of all other shapes on the slide after Looker
data has been rendered, with this shape excluded.
3. Strings starting with ``"gemini_"`` — the output of the Gemini box
whose ``gemini_id`` matches. Returns ``None`` (with a warning) if
that box has not been processed yet or has failed.
4. Anything else — treated as a Looker meta-look ``meta_name``; resolved
from ``self.data``. Returns ``None`` (with a warning) if missing.
Args:
ctx: The context string from ``GeminiConfig.contexts``.
shape_number: The shape_id of the current Gemini shape (for exclusion).
slide_number: The slide index of the current Gemini shape.
gemini_results: Dict of ``{gemini_id: synthesized_text}`` for already
processed Gemini boxes.
current_text: The shape's current text content.
Returns:
``(label, content)`` on success, or ``None`` if the reference cannot
be resolved (a warning is already logged).
"""
if ctx == "self":
return ("Current shape text", current_text)
if ctx == "slide_self":
content = self._extract_slide_text_context(slide_number, shape_number)
return ("Current slide context (excluding this shape)", content)
if ctx.startswith("gemini_"):
result = gemini_results.get(ctx)
if result is None:
logging.warning(
f"No result available for Gemini context '{ctx}'. "
"The referenced Gemini shape may not have run yet or may have failed."
)
return None
return (f"LLM report [{ctx}]", result)
# Meta-look fallback
raw = self.data.get(ctx)
if raw is None:
logging.warning(
f"No data found for Gemini context '{ctx}'. "
"Make sure a meta-look shape with that meta_name exists in the presentation."
)
return None
try:
df = self._make_df(raw)
return (f"Data [{ctx}]", self._format_context_data(df))
except Exception as e:
logging.warning(f"Could not format context data for '{ctx}': {e}")
return None
def _sort_gemini_shapes_by_dependency(self) -> list:
"""
Return ``self.gemini_shapes`` sorted in topological order.
Any ``contexts`` entry that starts with ``"gemini_"`` and matches the
``gemini_id`` of another shape in the list is treated as a dependency —
that shape must be processed first.
Raises
------
ValueError
If a circular dependency is detected.
"""
id_to_shape: dict[str, object] = {
gs.integration.gemini_id: gs
for gs in self.gemini_shapes
if gs.integration.gemini_id
}
in_degree: dict[int, int] = {id(gs): 0 for gs in self.gemini_shapes}
dependents: dict[int, list] = {id(gs): [] for gs in self.gemini_shapes}
for gs in self.gemini_shapes:
for ctx in gs.integration.contexts:
if ctx.startswith("gemini_") and ctx in id_to_shape:
dep = id_to_shape[ctx]
dependents[id(dep)].append(gs)
in_degree[id(gs)] += 1
queue: collections.deque = collections.deque(
gs for gs in self.gemini_shapes if in_degree[id(gs)] == 0
)
ordered: list = []
while queue:
node = queue.popleft()
ordered.append(node)
for dependent in dependents[id(node)]:
in_degree[id(dependent)] -= 1
if in_degree[id(dependent)] == 0:
queue.append(dependent)
if len(ordered) != len(self.gemini_shapes):
raise ValueError(
"Circular dependency detected in Gemini shape contexts. "
"Ensure there are no cycles among gemini_id references."
)
return ordered
def _process_gemini_shapes(self):
"""
Process all shapes configured for Gemini LLM synthesis.
Shapes are processed in dependency order (any shape whose ``gemini_id``
appears in another shape's ``contexts`` is processed first).
For each GeminiShape the ``contexts`` list is walked in order and each
entry resolved via :meth:`_resolve_context_item` — which handles
``"self"``, ``"slide_self"``, Gemini box references (``gemini_``-prefix),
and Looker meta-look names. The assembled context string is then sent to
the Gemini API together with the shape's current text and prompt.
On error: the error message is written into the text box and a red outline
is drawn around the shape (suppressed by ``--hide-errors``).
"""
if not self.gemini_shapes:
return
if not gemini_module.is_available():
logging.warning(
"google-genai is not installed; Gemini synthesis shapes will be skipped. "
"Install it with 'pip install looker_powerpoint[llm]' to enable LLM features."
)
return
try:
ordered_shapes = self._sort_gemini_shapes_by_dependency()
except ValueError as e:
logging.error(f"Cannot process Gemini shapes: {e}")
return
# Stores synthesized text keyed by gemini_id for chaining
gemini_results: dict[str, str] = {}
for gemini_shape in ordered_shapes:
slide = self.presentation.slides[gemini_shape.slide_number]
current_shape = None
for shape in slide.shapes:
if shape.shape_id == gemini_shape.shape_number:
current_shape = shape
break
if current_shape is None:
logging.error(
f"Could not find shape {gemini_shape.shape_number} on slide "
f"{gemini_shape.slide_number} for Gemini synthesis."
)
continue
try:
# Capture current text first (needed by "self" resolver and as
# replacement-target context for synthesize())
current_text = ""
if hasattr(current_shape, "text_frame"):
current_text = current_shape.text_frame.text
else:
logging.warning(
f"Shape {gemini_shape.shape_number} on slide "
f"{gemini_shape.slide_number} has no text_frame; "
"skipping Gemini synthesis."
)
continue
# Resolve each context entry in order
context_parts: list[str] = []
for ctx in gemini_shape.integration.contexts:
resolved = self._resolve_context_item(
ctx,
gemini_shape.shape_number,
gemini_shape.slide_number,
gemini_results,
current_text,
)
if resolved is not None:
label, content = resolved
if content:
context_parts.append(f"{label}:\n{content}")
context_data_str = "\n\n".join(context_parts)
synthesized = gemini_module.synthesize(
prompt=gemini_shape.integration.prompt,
context_data_str=context_data_str,
current_text=current_text,
model_name=gemini_shape.integration.model,
)
update_text_frame_preserving_formatting(
current_shape.text_frame, synthesized
)
logging.debug(
f"Gemini synthesis applied to shape {gemini_shape.shape_number} "
f"on slide {gemini_shape.slide_number}."
)
# Store result for downstream shapes
if gemini_shape.integration.gemini_id:
gemini_results[gemini_shape.integration.gemini_id] = synthesized
except Exception as e:
error_msg = str(e)
logging.error(
f"Gemini synthesis failed for shape {gemini_shape.shape_number} "
f"on slide {gemini_shape.slide_number}: {error_msg}"
)
# Populate error message into text box
try:
if hasattr(current_shape, "text_frame"):
update_text_frame_preserving_formatting(
current_shape.text_frame, error_msg
)
except Exception:
pass
# Draw red outline around the failed shape
if not self.args.hide_errors:
self._mark_failure(slide, current_shape)
def _make_df(self, result):
"""
Create a pandas DataFrame from Looker data based on the integration settings.
Categorizes and sorts columns into Dimensions -> Pivots -> Table Calcs.
"""
data = json.loads(result)
fields = data.get("metadata", {}).get("fields", {})
# 1. Pull the injected sorts and pivots rules from the Look
look_sorts = data.get("custom_sorts", [])
look_pivots = data.get("custom_pivots", [])
# Determine if the primary pivot is sorted descending
pivot_descending = False
if look_pivots:
main_pivot = look_pivots[0]
# Looker sorts look like: "view_name.date_dim desc 0"
# Parse sort strings by tokens so we only match the exact field name,
# not arbitrary substrings.
for sort_str in look_sorts:
parts = str(sort_str).split()
if len(parts) < 2:
continue
field_token = parts[0]
# Find the first explicit direction token after the field
direction_token = None
for token in parts[1:]:
token_lower = token.lower()
if token_lower in ("asc", "desc"):
direction_token = token_lower
break
if field_token == main_pivot and direction_token == "desc":
pivot_descending = True
break
# Create DataFrame first to expose all dynamic column names
df = pd.json_normalize(data.get("rows", [])).fillna("")
actual_cols = list(df.columns)
# 2. Extract base names from the metadata
dim_bases = [f["name"] for f in fields.get("dimensions", [])]
calc_bases = [f["name"] for f in fields.get("table_calculations", [])]
measure_bases = [f["name"] for f in fields.get("measures", [])]
dims = []
calcs = []
pivots_and_measures = []
leftovers = []
for col in actual_cols:
if any(col == f"{d}.value" or col == d for d in dim_bases):
dims.append(col)
elif any(col == f"{c}.value" or col == c for c in calc_bases):
calcs.append(col)
elif "|FIELD|" in col or any(
col == f"{m}.value" or col == m for m in measure_bases
):
pivots_and_measures.append(col)
else:
leftovers.append(col)
# 3. Apply Dimensions and Calcs sorting (Native query order)
dims.sort(
key=lambda x: next(
(i for i, d in enumerate(dim_bases) if x == f"{d}.value" or x == d), 999
)
)
calcs.sort(
key=lambda x: next(
(i for i, c in enumerate(calc_bases) if x == f"{c}.value" or x == c),
999,
)
)
# 4. Apply Looker's strict Pivot Sorting Rules
def parse_pivot_col(col):
# Break down "measure_name|FIELD|2025-03-03.value"
base = col.replace(".value", "")
if "|FIELD|" in base:
measure, pivot_val = base.split("|FIELD|", 1)
return pivot_val, measure
return "", base
# Get unique pivot values preserving their order of first appearance in the data
# (which reflects Looker's native ordering). Avoid lexicographic sorting so that
# numeric-like values ("2", "10") and non-ISO dates aren't misordered.
seen_pivots: set = set()
unique_pivots: list = []
for c in pivots_and_measures:
pv = parse_pivot_col(c)[0]
if pv not in seen_pivots:
seen_pivots.add(pv)
unique_pivots.append(pv)
if pivot_descending:
unique_pivots.reverse()
pivot_order_map = {val: i for i, val in enumerate(unique_pivots)}
measure_order_map = {m: i for i, m in enumerate(measure_bases)}
# Sort first by the properly sequenced pivot value, then by the measure's native query order
pivots_and_measures.sort(
key=lambda x: (
pivot_order_map.get(parse_pivot_col(x)[0], 999),
measure_order_map.get(parse_pivot_col(x)[1], 999),
)
)
# 5. Re-index and Rename
ordered_cols = dims + pivots_and_measures + calcs + leftovers
df = df[ordered_cols]
all_fields = (
fields.get("dimensions", [])
+ fields.get("measures", [])
+ fields.get("table_calculations", [])
)
mappy = {
f"{item['name']}.value": item.get("field_group_variant", item["name"])
.strip()
.lower()
.replace(" ", "_")
for item in all_fields
}
df.rename(columns=mappy, inplace=True)
return df
def _build_metadata_object(self):
"""
Build metadata object for the presentation.
"""
metadata_rows = []
looks = set()
for looker_shape in self.looker_shapes:
if looker_shape.integration.id not in looks:
looks.add(looker_shape.integration.id)
metadata_rows.append(
{
"looks": {
"value": f"{os.environ.get('LOOKERSDK_BASE_URL')}looks/{looker_shape.integration.id}"
}
}
)
metadata_object = {
"metadata": {"fields": {"dimensions": [{"name": "looks"}]}},
"rows": metadata_rows,
}
self.data["metadata_shapes"] = json.dumps(metadata_object)
[docs]
async def get_queries(self):
"""
asynchronously fetch a list of look references
"""
logging.info(
f"Running Looker queries... {len(self.looker_shapes)} queries to run."
)
tasks = [
self.client._async_write_queries(
shape.shape_id, self.args.filter, **dict(shape.integration)
)
for shape in self.looker_shapes
]
# Run all tasks concurrently and gather the results
results = await asyncio.gather(*tasks)
for r in results:
self.data.update(r)
def _test_str_to_int(self, s):
try:
int(s)
return True
except ValueError:
return False
[docs]
def run(self, **kwargs):
"""
Main method to run the CLI application.
"""
self.args = self.parser.parse_args()
self._setup_logging()
self._pick_file()
self._init_looker()
references = self.get_alt_text(self.file_path)
if not references:
logging.error(
"No shapes with id found in the presentation. Add a 'id' : '<look_id>' to the alternative text of a shape to load data into the shape."
)
return
for ref in references:
integration = ref.get("integration", {})
# Try to parse as a Gemini shape first (type: gemini discriminator)
if isinstance(integration, dict) and integration.get("type") == "gemini":
try:
gemini_shape = GeminiShape.model_validate(ref)
if gemini_shape.shape_type not in (
"TEXT_BOX",
"TITLE",
"AUTO_SHAPE",
):
logging.warning(
f"Gemini synthesis config found on shape "
f"{gemini_shape.shape_id} (type: {gemini_shape.shape_type}). "
"Gemini synthesis only works for text boxes (TEXT_BOX, TITLE, "
"AUTO_SHAPE). This shape will be skipped."
)
continue
self.gemini_shapes.append(gemini_shape)
except ValidationError as e:
logging.debug(
f"Could not parse Gemini config in shape {ref.get('shape_id', '?')}: {e}"
)
continue
# Otherwise try to parse as a regular Looker shape
try:
self.relevant_shapes.append(LookerShape.model_validate(ref))
except ValidationError as e:
logging.debug(
f"Could not parse the alternate text in slide {ref['shape_id'].split(',')[0]}, shape {ref['shape_id'].split(',')[1]}: {e}"
)
continue
self.looker_shapes = [
s
for s in self.relevant_shapes
if s.integration.id_type == "look"
and self._test_str_to_int(s.integration.id)
]
self._build_metadata_object()
asyncio.run(self.get_queries())
for looker_shape in self.relevant_shapes:
if looker_shape.integration.meta:
if not self.args.self:
self._remove_shape(
looker_shape.slide_number,
looker_shape.shape_number,
)
else:
result = self.data.get(looker_shape.shape_id)
if result is None:
result = self.data.get(looker_shape.integration.id)
try:
if looker_shape.shape_type == "PICTURE":
if looker_shape.integration.result_format in ("jpg", "png"):
image_stream = BytesIO(result)
else:
df = self._make_df(result)
url = self._select_slice_from_df(
df, looker_shape.integration
)
response = requests.get(url)
response.raise_for_status()
image_stream = io.BytesIO(response.content)
logging.debug(
f"Replacing image for shape {looker_shape.shape_number} on slide {looker_shape.slide_number}..."
)
self._replace_image_with_object(
looker_shape.slide_number,
looker_shape.shape_number,
image_stream,
looker_shape.original_integration,
)
elif looker_shape.shape_type in [
"CHART",
"TABLE",
"TEXT_BOX",
"TITLE",
"AUTO_SHAPE",
]:
slide = self.presentation.slides[looker_shape.slide_number]
for shape in slide.shapes:
if shape.shape_id == looker_shape.shape_number:
current_shape = shape
df = self._make_df(result)
if looker_shape.shape_type == "TABLE":
logging.debug(
f"Updating table for shape {looker_shape.shape_number} on slide {looker_shape.slide_number}..."
)
self._fill_table(
current_shape.table,
df,
looker_shape.integration.headers,
)
elif looker_shape.shape_type in [
"TEXT_BOX",
"TITLE",
"AUTO_SHAPE",
]:
logging.debug(
f"Updating text for shape {looker_shape.shape_number} on slide {looker_shape.slide_number}..."
)
try:
text_to_insert = self._select_slice_from_df(
df, looker_shape.integration
)
except Exception as e:
text_to_insert = df.to_string(index=False, header=False)
logging.debug(
f"inserting whole text for shape {looker_shape.shape_number} on slide {looker_shape.slide_number}: {e}"
)
current_shape = process_text_field(
current_shape,
text_to_insert,
df,
)
# add_text_with_numbered_links(current_shape.text_frame, str(text_to_insert))
elif looker_shape.shape_type == "CHART":
chart_data = CategoryChartData()
chart_data.categories = df.iloc[
:, 0
].tolist() # Assuming the first column contains categories
chart = current_shape.chart
existing_chart_data = chart.plots[0].series
logging.debug(
f"Existing chart series: {[s.name for s in existing_chart_data]}"
)
if looker_shape.integration.headers:
for series_name in df.columns[1:]:
try:
match = (
re.search(
r"^[^\.]*\.[^\.]*\.(.*)\.value$",
series_name,
)
.group(1)
.replace(".", " - ")
.strip()
.replace("|FIELD|", " ")
)
except AttributeError as e:
logging.debug(
f"Could not parse series name {series_name}, setting name to {series_name}"
)
match = series_name
chart_data.add_series(match, df[series_name])
else:
if len(df.columns[1:]) != len(existing_chart_data):
logging.warning(
f"{looker_shape.shape_id}. Missing headers! Number of series ({len(df.columns[1:])}) does not match number of existing chart series ({len(existing_chart_data)}). Perhaps you need to enable headers in the integration settings?"
)
for series_name, series in zip(
df.columns[1:], existing_chart_data
):
chart_data.add_series(series.name, df[series_name])
chart.replace_data(chart_data)
if looker_shape.integration.show_latest_chart_label:
for plot in chart.plots:
s = 0
for series in plot.series:
series_has_label = False
index = 0
for i, v in zip(
series.points, df.iloc[:, s + 1]
):
if i.data_label._dLbl is not None:
series_has_label = True
logging.debug(
f"Series {series.name} has data labels."
)
if v is not None and v != "":
logging.debug(
f"Value for point {index} in series {series.name}: {v}"
)
index += 1
if series_has_label is True:
new_index = 0
for point in series.points:
new_index += 1
if new_index == index:
logging.debug(
f"Showing data label for point {new_index} in series {series.name}."
)
point.data_label.text_frame.text = (
""
)
point.data_label.has_text_frame = (
False
)
else:
point.data_label.text_frame.text = (
""
)
point.data_label.has_text_frame = (
True
)
s += 1
else:
logging.warning(
f"unknown shape type {looker_shape.shape_type} for shape {looker_shape.shape_number} on slide {looker_shape.slide_number}."
)
continue
except Exception as e:
logging.error(f"Error processing reference {looker_shape}: {e}")
# import traceback
# traceback.print_exc() # Prints the full traceback
if not self.args.hide_errors:
slide = self.presentation.slides[looker_shape.slide_number]
for shape in slide.shapes:
if shape.shape_id == looker_shape.shape_number:
self._mark_failure(slide, shape)
# Process Gemini synthesis shapes
self._process_gemini_shapes()
if self.args.self:
self.destination = self.file_path
else:
if not self.args.output_dir.endswith("/"):
self.args.output_dir += "/"
self.destination = (
self.args.output_dir
+ os.path.basename(self.file_path).removesuffix(".pptx")
+ datetime.datetime.now().strftime("_%Y%m%d_%H%M%S.pptx")
)
if not os.path.exists(self.args.output_dir) and not self.args.self:
os.makedirs(self.args.output_dir)
self.presentation.save(self.destination)
if not self.args.quiet:
try:
os.startfile(self.destination)
logging.info(f"Opened {self.destination} in PowerPoint.")
except Exception as e:
try:
subprocess.Popen(["open", self.destination]) # For macOS
logging.info(f"Opened {self.destination} in PowerPoint.")
except Exception as e:
logging.error(f"Failed to open the PowerPoint file: {e}")
logging.info(f"You can find the file at {self.destination}.")
[docs]
def main():
cli = Cli()
cli.run()
if __name__ == "__main__":
main()