Skip to content

Python Manual

A single-page tour of everything the HyperMesh Python SDK can do. It follows the same shape as a database driver manual: connect → query → manipulate results → explore the execution summary → advanced usage → reference.

Import the SDK once:

import hypermesh as hm

The supported, stable surface is exactly hypermesh.__all__. Anything with a leading underscore is private.


The compiled engine ships inside the wheel — no C toolchain required.

Terminal window
pip install "hypermesh[engine]" # embedded engine
pip install hypermesh # remote client only (pure-Python)
pip install "hypermesh[analytics]" # + numpy/scipy
pip install "hypermesh[all]" # everything
ExtraAdds
engineembedded local database
analyticsnumpy + scipy (analytics engine)
pandas / arrowDataFrame / Parquet bulk paths
serverhmdb serve (fastapi + uvicorn)
connectorssource connectors (e.g. boto3 for S3)
yamlYAML ingestion mapping specs

hm.connect() is a single factory that picks the backend from the target: a filesystem path opens the embedded engine, an http(s):// URL returns a remote client. Both satisfy the hypermesh.types.Database protocol.

# Embedded engine (builds the index from a CSV on first open)
db = hm.connect("/var/lib/hypermesh/data", hyperedges_csv="hyperedges.csv")
# Remote server
db = hm.connect("https://hypermesh.internal:8000", api_key="")
# Always release resources (or use a context manager)
with hm.connect("/var/lib/hypermesh/data") as db:
...
db = hm.connect(
"https://hypermesh.internal:8000",
api_key="", # or env HYPERMESH_API_KEY
timeout=30.0,
verify=True, # TLS verification (or a CA-bundle path)
retries=2, # exponential backoff + jitter on transient errors
backoff_factor=0.25,
headers={"X-Tenant": "acme"},
)
db.ping() # -> bool, never raises
db.health() # -> dict of server statistics
cfg = hm.Config.from_env() # reads HYPERMESH_URL, HYPERMESH_API_KEY, …
db = hm.connect(cfg.url, api_key=cfg.api_key, timeout=cfg.timeout)
import asyncio
from hypermesh import AsyncClient
async def main():
async with AsyncClient("https://hypermesh.internal:8000") as db:
res = await db.execute("MATCH HYPEREDGE (he:CoProximity) RETURN *")
print(res.num_tuples)
asyncio.run(main())

execute() runs any Cypher-compatible statement and returns a QueryResult. Pass named parameters with $name placeholders.

result = db.execute(
"MATCH HYPEREDGE (he:CoProximity) "
"WHERE he.event_ts >= $start AND he.event_ts <= $end "
"RETURN *",
parameters={"start": 0, "end": 300},
)
print(result.num_tuples, "hyperedges found")

The same execute() handles DDL, DML, indexes, COPY FROM, and procedure calls:

db.execute("CREATE HYPEREDGE TABLE Sensor (Drone, Drone) BUCKET_SECONDS 30")
db.execute("CALL show_hyperedge_tables() RETURN *")
db.execute("COPY CoProximity FROM 'edges.parquet'")

A QueryResult is iterable (yielding Row objects), supports cursor-style fetching, and converts to dicts/DataFrame/JSON.

result = db.execute("MATCH HYPEREDGE (he:CoProximity) RETURN * LIMIT 3")
# Iterate
for row in result:
print(row["event_ts"], row.members, row.weight)
# Cursor style
first = result.fetchone() # next Row or None
rest = result.fetchall() # remaining rows
result.reset() # rewind the cursor
# Bulk shapes
result.columns # ['event_ts', 'members', 'weight', ...]
result.num_tuples # row count
result.rows # list[Row] (non-consuming)
result.to_dicts() # list[dict]
df = result.to_df() # pandas DataFrame (needs [pandas])
bool(result) # True if any rows

A Row is accessible by name, attribute, or index, and behaves like a read-only mapping.

row = result.fetchone()
row["event_ts"] # by column name
row.members # by attribute
row[0] # by position
"weight" in row # membership test
row.get("missing", 0)
row.keys(), row.values(), row.items()
row.to_dict() # {'event_ts': 100, 'members': [1, 2, 3], ...}
import json
from hypermesh import HyperMeshEncoder
json.dumps(list(result), cls=HyperMeshEncoder) # full result set
json.dumps(result.to_dicts()) # or the built-in helper

Every data query carries a query_plan — the engine’s measured execution summary. Use it to confirm index usage and locate performance bottlenecks (the HyperMesh analogue of an EXPLAIN/PROFILE summary). DDL statements return None.

result = db.execute(
"MATCH HYPEREDGE (he:CoProximity) "
"WHERE he.event_ts >= $s AND he.event_ts <= $e RETURN *",
parameters={"s": 100, "e": 130},
)
plan = result.query_plan
print(plan)
# TPI_BUCKET_PUSHDOWN: read 4/6 buckets (1.5x speedup, 6µs, 4→4 rows)

query_plan is a QueryPlan dataclass with these attributes:

FieldMeaning
strategy"TPI_BUCKET_PUSHDOWN" (range query used the time index) or "FULL_SCAN" (all records scanned).
buckets_scannedTPI time buckets actually read from disk.
total_bucketsTotal buckets in the index.
speedup_factortotal_buckets / buckets_scanned — e.g. 25.0 means only 1/25th of the data was touched.
elapsed_usWall-clock microseconds measured by the C engine.
rows_scannedRaw rows the engine returned before Python-layer filtering.
rows_returnedRows after predicate filtering, projection, and LIMIT.
predicates_appliedNumber of property predicates evaluated in Python.
plan = result.query_plan
if plan and plan.strategy == "FULL_SCAN":
print("No time index used — consider a tighter event_ts range.")
print(f"{plan.speedup_factor:.1f}x speedup in {plan.elapsed_us}µs")
print(f"{plan.rows_scanned} -> {plan.rows_returned} rows "
f"after {plan.predicates_applied} predicate(s)")

The embedded engine supports atomic, multi-statement transactions (remote/Client is single-statement; see the parity note below).

# Context-manager form: commits on success, rolls back on exception
with db.transaction():
db.insert(event_ts=500, members=[1, 2, 3], weight=0.9, formation="WEDGE")
db.insert(event_ts=501, members=[2, 3, 4], weight=0.8, formation="LINE")
# Explicit form
db.begin()
try:
db.insert(event_ts=600, members=[1, 2], weight=0.5)
db.commit()
except Exception:
db.rollback()
raise

db.insert(event_ts=9999, members=[1, 2, 3], weight=0.95,
mean_dist_m=12.5, formation="WEDGE")
db.delete(event_ts=9999, members=[1, 2, 3]) # soft-delete (tombstone)
db.compact() # fold WAL into TPI+FMI
db.compact(ttl_seconds=86_400) # drop records older than TTL
db.set_autocompact(threshold=10_000) # auto-compact at N pending WAL
db.wal_pending # uncompacted WAL entries
db.total_records # total hyperedges
db.node_count # distinct nodes
db.bucket_count # TPI buckets
db.bucket_seconds # bucket granularity

Embedded connections load from many formats. Each returns a small result with rows_loaded.

db.copy_from_csv("CoProximity", "edges.csv")
db.copy_from_df(df, "CoProximity") # pandas/polars ([pandas])
db.copy_from_parquet("CoProximity", "edges.parquet") # ([arrow])
db.copy_from_json("CoProximity", "edges.json")
db.copy_from_numpy(arr, "CoProximity",
columns=["event_ts", "members", "weight"])

Remote clients use the equivalent COPY … FROM statement via execute().


hypermesh.analytics exposes the hypergraph analytics engine (requires [analytics]).

a = db.analytics("CoProximity") # -> hypermesh.analytics.Analytics
a.summary()
a.pagerank()
a.katz(alpha=0.1)
a.degree_centrality()
a.spectral_gap() # Zhou Laplacian spectral measures
a.s_components(s=2) # s-connectivity
a.burstiness() # temporal dynamics
a.changepoints()
a.activity_windows()
# Or build the hypergraph object directly
hg = db.to_hypergraph("CoProximity")
from hypermesh.analytics import build_hypergraph
hg = build_hypergraph(db, "CoProximity")

hypermesh.ingest is the universal converter: any source → a HyperCore bundle that loads through the unmodified engine.

bundle = hm.ingest.ingest_csv("events.csv", staging="./staging")
bundle = hm.ingest.ingest_dataframe(df, source_id="warehouse", staging="./staging")
bundle = hm.ingest.ingest_from_uri("rdbms://postgresql://host/db",
spec="mapping.yaml", staging="./staging")
# Declarative mapping spec (identity keys, PII policy, time bucketing)
from hypermesh.ingest import MappingSpec
spec = MappingSpec.from_yaml("mapping.yaml")
# Domain strategies (security/clinical/patents/geo)
from hypermesh.ingest import strategies, get_strategy
[s["name"] for s in strategies()]
mde = get_strategy("mde_baseline")

Pass dry_run=True to print the plan without writing a bundle.


hypermesh.connectors pulls from external systems (MDE, S3, webhooks) into a hyperedge table.

from hypermesh.connectors import build_connector, ConnectorConfig
cfg = ConnectorConfig(id="mde-prod", type="mde", target_table="MDE_ALERTS",
credentials=)
conn = build_connector(cfg, db)
conn.test_connection()
result = conn.sync(limit=10_000)

Subclass ConnectorBase to add your own source — see Connector Authoring.


bundle = hm.reports.build_stix_bundle(detection_result, entity_map, report_id="r-1")
html = hm.reports.render_html_report(detection_result, entity_map, report_id="r-1")

All SDK errors derive from hm.exceptions.HyperMeshError. The same classes are raised by the embedded engine and the remote client.

try:
db.execute("MATCH ...")
except hm.exceptions.AuthError: # 401/403 from a remote server
...
except hm.exceptions.TimeoutError: # request exceeded its timeout
...
except hm.exceptions.QueryError: # malformed / failed query
...
except hm.exceptions.EngineNotInstalledError:
... # embedded engine binary missing
except hm.exceptions.HyperMeshError: # catch-all
...
ExceptionRaised when
HyperMeshErrorbase class for everything below
ConnectionErrorlocal DB cannot open / remote server unreachable
QueryErrormalformed or failed query
SchemaErrorDDL / schema-catalog violation
IngestErroran ingestion run failed
AuthErrorauthentication failed (missing/invalid API key)
TimeoutErrora remote request timed out
EngineNotInstalledErrorembedded engine requested but binary absent

The library is silent by default (a NullHandler is attached); it never logs secrets.

hm.enable_debug_logging() # route the 'hypermesh' logger to stderr
cfg = hm.Config.from_env() # url, api_key, timeout, verify_tls, …

Environment variables: HYPERMESH_URL, HYPERMESH_API_KEY, HYPERMESH_TIMEOUT, HYPERMESH_VERIFY_TLS, HYPERMESH_DEFAULT_TABLE, HYPERMESH_STAGING_DIR, HYPERMESH_ENGINE_LIB.


Terminal window
hypermesh --version
hmdb serve /var/lib/hypermesh/data --port 8000
hmdb query /var/lib/hypermesh/data "MATCH HYPEREDGE (he:CoProximity) RETURN *"
hypermesh-ingest events.csv --staging ./staging

Everything in the supported public API at a glance.

AreaSymbols
Connectconnect, Connection, Client, AsyncClient, Database
Query resultsQueryResult, Row, QueryPlan, HyperMeshEncoder
Schema / bulk loadColumnDef, CopyOptions, Loader, copy_from_csv/df/parquet/json/numpy
Transactions (local)transaction(), begin(), commit(), rollback()
AnalyticsAnalytics, HypergraphPy, build_hypergraph, db.analytics(), db.to_hypergraph()
Ingestioningest_csv, ingest_dataframe, ingest_from_uri, MappingSpec, ingest.strategies()
Connectorsbuild_connector, ConnectorConfig, ConnectorBase
Reportsbuild_stix_bundle, render_html_report
ErrorsHyperMeshError + Connection/Query/Schema/Ingest/Auth/Timeout/EngineNotInstalled errors
Config / loggingConfig, enable_debug_logging
Metadata__version__
CapabilityEmbedded ConnectionRemote Client / AsyncClient
execute, insert, delete, compact, analytics
copy_from_*❌ (use COPY … FROM via execute)
Multi-statement transactions
to_hypergraph()
Auth / TLS / retries / ping/healthn/a

For the auto-generated, signature-level docs of every symbol, see the API Reference.