Source code for energyunits.quantity

"""Quantity class for energy system modeling with unified .to() conversion."""

import html
import warnings
from typing import List, Optional, Union

import numpy as np

from .registry import registry


[docs] class Quantity: """Physical quantity with value, unit, and optional metadata. Supports: - Unit conversions (e.g., MWh to GJ) - Substance-based conversions (e.g., coal mass to CO2 emissions) - Heating value basis conversions (HHV/LHV) - Inflation adjustments for cost quantities - Arithmetic operations with dimensional analysis """ def __init__( self, value: Union[float, int, List[float], np.ndarray], unit: str, substance: Optional[str] = None, basis: Optional[str] = None, reference_year: Optional[int] = None, ) -> None: self.value: np.ndarray = np.asarray(value) self.unit: str = unit self.substance: Optional[str] = substance self.basis: Optional[str] = basis self.reference_year: Optional[int] = reference_year self.dimension: str = registry.get_dimension(unit)
[docs] @classmethod def list_units(cls, dimension=None): """List all available units, optionally filtered by dimension. Args: dimension: Filter by dimension (e.g., "ENERGY", "POWER", "MASS"). Returns: Sorted list of unit strings. Examples: Quantity.list_units() # All units Quantity.list_units("ENERGY") # Energy units only """ return registry.list_units(dimension)
[docs] @classmethod def list_dimensions(cls): """List all available dimensions. Returns: Sorted list of dimension strings. """ return registry.list_dimensions()
[docs] @classmethod def list_substances(cls, has_property=None): """List all available substances. Args: has_property: Filter to substances with this property (e.g., "hhv"). Returns: Sorted list of substance names. Examples: Quantity.list_substances() # All substances Quantity.list_substances("density") # Substances with density """ from .substance import substance_registry return substance_registry.list_substances(has_property)
[docs] @classmethod def list_currencies(cls): """List all available currencies. Returns: List of currency codes. """ from .exchange_rate import exchange_rate_registry return exchange_rate_registry.get_supported_currencies()
[docs] def to( self, target_unit: Optional[str] = None, basis: Optional[str] = None, substance: Optional[str] = None, reference_year: Optional[int] = None, ) -> "Quantity": """Convert to another unit, basis, substance, and/or reference year. This is the unified conversion method. Conversions are applied in sequence: substance → basis → unit → inflation. **Special case**: When combining currency conversion with year change: - Order becomes: substance → basis → inflation → unit - Uses "inflate first, then convert" convention - Issues warning about economic assumptions Examples: energy = Quantity(100, "MWh") energy.to("GJ") # → 360 GJ coal = Quantity(1000, "kg", "coal") coal.to("MWh") # Coal mass → energy content coal.to("kg", substance="CO2") # Coal mass → CO2 emissions capex_2020 = Quantity(1000, "USD/kW", reference_year=2020) capex_2025 = capex_2020.to(reference_year=2025) cost_eur_2015 = Quantity(50, "EUR/MWh", reference_year=2015) cost_usd_2024 = cost_eur_2015.to("USD/MWh", reference_year=2024) # Inflates EUR 2015→2024, then converts using 2024 exchange rate """ result = self # Detect currency conversion + year change combination from .exchange_rate import exchange_rate_registry is_currency_conversion = False if target_unit is not None: source_currency = exchange_rate_registry.detect_currency_from_unit(self.unit) target_currency = exchange_rate_registry.detect_currency_from_unit(target_unit) is_currency_conversion = ( source_currency is not None and target_currency is not None and source_currency != target_currency ) is_year_change = ( reference_year is not None and self.reference_year is not None and reference_year != self.reference_year ) needs_reordering = is_currency_conversion and is_year_change # Standard conversions if substance is not None and self.substance != substance: result = result._convert_substance(substance) if basis is not None and result.basis != basis: result = result._convert_basis(basis) # Special case: inflate before converting currency if needs_reordering: # Warn user about economic assumptions exchange_rate_registry.warn_currency_inflation_combination() # Do inflation first if reference_year is not None: result = result._convert_reference_year(reference_year) # Then do currency conversion using target year's exchange rate if target_unit is not None: result = result._convert_unit(target_unit, year_for_exchange_rate=reference_year) else: # Normal order: unit conversion, then inflation if target_unit is not None: result = result._convert_unit(target_unit) if reference_year is not None: result = result._convert_reference_year(reference_year) if all(x is None for x in [target_unit, basis, substance, reference_year]): return Quantity( self.value, self.unit, self.substance, self.basis, self.reference_year ) return result
def __str__(self) -> str: if np.isscalar(self.value) or self.value.size == 1: scalar_val = ( self.value.item() if hasattr(self.value, "item") else self.value ) value_str = f"{scalar_val:.2f}" else: if self.value.size <= 5: value_str = f"[{', '.join(f'{v:.2f}' for v in self.value.flat)}]" else: first = self.value.flat[0] last = self.value.flat[-1] size = self.value.size value_str = f"[{first:.2f}, ..., {last:.2f}] ({size} values)" if self.substance: return f"{value_str} {self.unit} of {self.substance}" return f"{value_str} {self.unit}" def __repr__(self) -> str: substance_str = f", '{self.substance}'" if self.substance else "" basis_str = f", basis='{self.basis}'" if self.basis else "" ref_year_str = ( f", reference_year={self.reference_year}" if self.reference_year else "" ) return f"Quantity({self.value}, '{self.unit}'{substance_str}{basis_str}{ref_year_str})" def _repr_html_(self) -> str: """Rich HTML representation for Jupyter notebooks.""" if np.isscalar(self.value) or self.value.size == 1: scalar_val = ( self.value.item() if hasattr(self.value, "item") else self.value ) value_str = f"{scalar_val:,.4g}" else: if self.value.size <= 5: value_str = ( "[" + ", ".join(f"{v:,.4g}" for v in self.value.flat) + "]" ) else: first = self.value.flat[0] last = self.value.flat[-1] size = self.value.size value_str = f"[{first:,.4g}, ..., {last:,.4g}] ({size} values)" unit_escaped = html.escape(str(self.unit)) parts = [ f'<span style="font-weight:bold;font-size:1.1em">{value_str}</span>', f'<span style="color:#2563eb;font-weight:bold"> {unit_escaped}</span>', ] if self.substance: substance_escaped = html.escape(str(self.substance)) parts.append( f'<span style="background:#e0f2fe;color:#0369a1;' f'border-radius:4px;padding:1px 6px;margin-left:4px;' f'font-size:0.9em">{substance_escaped}</span>' ) if self.basis: basis_escaped = html.escape(str(self.basis)) parts.append( f'<span style="background:#fef3c7;color:#92400e;' f'border-radius:4px;padding:1px 6px;margin-left:4px;' f'font-size:0.9em">{basis_escaped}</span>' ) if self.reference_year: parts.append( f'<span style="background:#f3e8ff;color:#6b21a8;' f'border-radius:4px;padding:1px 6px;margin-left:4px;' f'font-size:0.9em">{self.reference_year}</span>' ) return "".join(parts) def __add__(self, other: "Quantity") -> "Quantity": if not isinstance(other, Quantity): raise TypeError(f"Cannot add Quantity and {type(other)}") other_converted = other.to(self.unit) result_value = self.value + other_converted.value if self.substance == other.substance: substance = self.substance elif self.substance is None: substance = other.substance elif other.substance is None: substance = self.substance else: substance = None warnings.warn( f"Adding quantities with different substances " f"('{self.substance}' and '{other.substance}'): " f"substance metadata will be dropped from the result.", UserWarning, stacklevel=2, ) basis = self.basis if self.basis == other.basis else None return Quantity(result_value, self.unit, substance, basis, self.reference_year) def __sub__(self, other: "Quantity") -> "Quantity": if not isinstance(other, Quantity): raise TypeError(f"Cannot subtract {type(other)} from Quantity") other_converted = other.to(self.unit) result_value = self.value - other_converted.value if self.substance == other.substance: substance = self.substance elif self.substance is None: substance = other.substance elif other.substance is None: substance = self.substance else: substance = None warnings.warn( f"Subtracting quantities with different substances " f"('{self.substance}' and '{other.substance}'): " f"substance metadata will be dropped from the result.", UserWarning, stacklevel=2, ) basis = self.basis if self.basis == other.basis else None return Quantity(result_value, self.unit, substance, basis, self.reference_year) def __mul__(self, other: Union[int, float, "Quantity"]) -> "Quantity": if isinstance(other, (int, float)): return Quantity( self.value * other, self.unit, self.substance, self.basis, self.reference_year, ) elif isinstance(other, Quantity): # Smart compound unit cancellation: convert units to match denominators self_converted = self other_converted = other # Check if self has compound unit (e.g., USD/kW) and other matches denominator dimension if "/" in self.unit: denominator = self.unit.split("/", 1)[1] denominator_dim = registry.get_dimension(denominator) if other.dimension == denominator_dim and other.unit != denominator: other_converted = other.to(denominator) # Check if other has compound unit and self matches denominator dimension elif "/" in other.unit: denominator = other.unit.split("/", 1)[1] denominator_dim = registry.get_dimension(denominator) if self.dimension == denominator_dim and self.unit != denominator: self_converted = self.to(denominator) result_value = self_converted.value * other_converted.value result_unit = self._multiply_units( self_converted.unit, other_converted.unit, self_converted.dimension, other_converted.dimension, ) if self.substance == other.substance: result_substance = self.substance elif self.substance is None: result_substance = other.substance elif other.substance is None: result_substance = self.substance else: result_substance = None result_basis = self.basis if self.basis == other.basis else None result_ref_year = ( self.reference_year if self.reference_year == other.reference_year else None ) return Quantity( result_value, result_unit, result_substance, result_basis, result_ref_year, ) else: raise TypeError(f"Cannot multiply Quantity and {type(other)}") def _multiply_units(self, unit1: str, unit2: str, dim1: str, dim2: str) -> str: # Compound unit cancellation (e.g., GWh/min * min = GWh, USD/t * t = USD) if "/" in unit1 and unit2 in unit1: return unit1.split("/")[0] elif "/" in unit2 and unit1 in unit2: return unit2.split("/")[0] # Check dimensional multiplication rules from data result = registry.get_multiplication_result(dim1, dim2) if result: result_dimension, source_dimension = result source_unit = unit1 if dim1 == source_dimension else unit2 return registry.get_corresponding_unit(source_unit, result_dimension) return f"{unit1}·{unit2}" def __rmul__(self, other: Union[int, float]) -> "Quantity": return self.__mul__(other) def __truediv__(self, other: Union["Quantity", int, float]) -> "Quantity": if isinstance(other, (int, float)): return Quantity( self.value / other, self.unit, self.substance, self.basis, self.reference_year, ) elif isinstance(other, Quantity): result_value = self.value / other.value result_unit = self._divide_units( self.unit, other.unit, self.dimension, other.dimension ) if self.substance == other.substance: result_substance = self.substance elif self.substance is None: result_substance = other.substance elif other.substance is None: result_substance = self.substance else: result_substance = None result_basis = self.basis if self.basis == other.basis else None result_ref_year = ( self.reference_year if self.reference_year == other.reference_year else None ) return Quantity( result_value, result_unit, result_substance, result_basis, result_ref_year, ) else: raise TypeError(f"Cannot divide Quantity by {type(other)}") def _divide_units(self, unit1: str, unit2: str, dim1: str, dim2: str) -> str: # Check dimensional division rules from data result_dimension = registry.get_division_result(dim1, dim2) if result_dimension: return registry.get_corresponding_unit(unit1, result_dimension) # Same units = dimensionless if unit1 == unit2: return "" return f"{unit1}/{unit2}" def __lt__(self, other: "Quantity") -> bool: other_converted = other.to(self.unit) return np.all(self.value < other_converted.value) def __gt__(self, other: "Quantity") -> bool: other_converted = other.to(self.unit) return np.all(self.value > other_converted.value) def __eq__(self, other: "Quantity") -> bool: if not isinstance(other, Quantity): return False other_converted = other.to(self.unit) return np.all(self.value == other_converted.value) def __le__(self, other: "Quantity") -> bool: other_converted = other.to(self.unit) return np.all(self.value <= other_converted.value) def __ge__(self, other: "Quantity") -> bool: other_converted = other.to(self.unit) return np.all(self.value >= other_converted.value) def __ne__(self, other: "Quantity") -> bool: if not isinstance(other, Quantity): return True other_converted = other.to(self.unit) return np.any(self.value != other_converted.value) def _convert_unit(self, target_unit: str, year_for_exchange_rate: Optional[int] = None) -> "Quantity": from_dim = self.dimension to_dim = registry.get_dimension(target_unit) if from_dim == to_dim: # Check if this is a currency conversion from .exchange_rate import exchange_rate_registry source_currency = exchange_rate_registry.detect_currency_from_unit(self.unit) target_currency = exchange_rate_registry.detect_currency_from_unit(target_unit) if source_currency and target_currency and source_currency != target_currency: # This is a currency conversion - use year-dependent exchange rate year = year_for_exchange_rate or self.reference_year factor = exchange_rate_registry.get_conversion_factor( source_currency, target_currency, year ) else: # Standard unit conversion factor = registry.get_conversion_factor(self.unit, target_unit) new_value = self.value * factor elif registry.are_dimensions_compatible(from_dim, to_dim): kwargs = {} if self.basis is not None: kwargs["basis"] = self.basis new_value = registry.convert_between_dimensions( self.value, self.unit, target_unit, self.substance, **kwargs ) else: hint = "" if (from_dim in ("MASS", "VOLUME") and to_dim == "ENERGY") or ( from_dim == "ENERGY" and to_dim in ("MASS", "VOLUME") ): hint = " Hint: specify a substance (e.g., substance='coal') to enable this conversion." elif from_dim in ("MASS", "VOLUME") and to_dim in ("MASS", "VOLUME"): hint = " Hint: specify a substance with a known density to enable this conversion." raise ValueError( f"Cannot convert from {self.unit} ({from_dim}) to {target_unit} ({to_dim}).{hint}" ) return Quantity( new_value, target_unit, self.substance, self.basis, self.reference_year ) def _convert_basis(self, target_basis: str) -> "Quantity": if target_basis.upper() not in ["HHV", "LHV"]: raise ValueError("Basis must be 'HHV' or 'LHV'") if self.substance is None: raise ValueError("Substance must be specified for basis conversion") current_basis = self.basis or "LHV" target_basis = target_basis.upper() if current_basis.upper() == target_basis: return Quantity( self.value, self.unit, self.substance, target_basis, self.reference_year ) from .substance import substance_registry lhv_hhv_ratio = substance_registry.lhv_hhv_ratio(self.substance) if current_basis.upper() == "HHV" and target_basis == "LHV": new_value = self.value * lhv_hhv_ratio elif current_basis.upper() == "LHV" and target_basis == "HHV": new_value = self.value / lhv_hhv_ratio else: raise ValueError( f"Invalid basis conversion: {current_basis} to {target_basis}" ) return Quantity( new_value, self.unit, self.substance, target_basis, self.reference_year ) def _convert_substance(self, target_substance: str) -> "Quantity": if self.substance is None: raise ValueError( "Source substance must be specified for substance conversion. " "Example: Quantity(1000, 'kg', substance='coal').to('kg', substance='CO2')" ) valid_products = ["CO2", "H2O", "ash"] if target_substance not in valid_products: raise ValueError( f"Substance conversion only supported for combustion products: " f"{', '.join(valid_products)}. Got: '{target_substance}'" ) # Renewables have zero combustion products if self.substance in ["wind", "solar", "hydro", "nuclear"]: # Preserve source mass unit if applicable, otherwise default to "t" result_unit = self.unit if registry.get_dimension(self.unit) == "MASS" else "t" return Quantity(0.0, result_unit, target_substance) from .substance import substance_registry combustion_product = substance_registry.calculate_combustion_product( self, target_substance ) return Quantity( combustion_product.value, combustion_product.unit, target_substance, None, self.reference_year, ) def _convert_reference_year(self, target_year: int) -> "Quantity": if self.reference_year is None: raise ValueError("Reference year not specified for inflation adjustment") if target_year == self.reference_year: return Quantity( self.value, self.unit, self.substance, self.basis, target_year ) from .inflation import inflation_registry currency = inflation_registry.detect_currency_from_unit(self.unit) if currency is None: raise ValueError( f"Cannot detect currency from unit '{self.unit}'. " f"Supported currencies: {inflation_registry.get_supported_currencies()}" ) inflation_factor = inflation_registry.get_cumulative_inflation_factor( currency, self.reference_year, target_year ) adjusted_value = self.value * inflation_factor return Quantity( adjusted_value, self.unit, self.substance, self.basis, target_year )