Skip to main content

Documentation Index

Fetch the complete documentation index at: https://tonapi.ness.su/llms.txt

Use this file to discover all available pages before exploring further.

Client

from pytonapi.streaming import TonapiSSE
from pytonapi.types import Network

sse = TonapiSSE("YOUR_API_KEY", Network.MAINNET)
ParameterDefaultDescription
api_keyRequiredAPI key.
networkRequiredMainnet or Testnet.
base_urlOptionalOverride the default URL.
sessionOptionalExternal HTTP session.
headersOptionalExtra HTTP headers.
reconnect_policyOptionalSee Reconnection.
on_state_changeOptionalSee Connection State.
heartbeat_timeout30 secondsSeconds before considering the connection dead.

Session

start — builds the subscription from registered handlers and dispatches notifications. Blocks until stop() is called or a fatal error occurs.
try:
    await sse.start(
        addresses=["EQDtFpEwcFAEcRe5mLVh2N6C0x-_hJEM7W61_JLnSF74p4q2"],
        include_address_book=True,
        include_metadata=True,
    )
finally:
    await sse.stop()
ParameterDefaultDescription
addressesOptionalAddresses to watch (in any form).
trace_external_hash_normsOptionalTrace hashes (required when on_traces is used).
include_address_bookFalseInclude DNS-resolved names in notifications.
include_metadataFalseInclude token metadata in notifications.
supported_action_typesOptionalAdvertise supported action types.
Only one start() per instance — a second call raises RuntimeError. stop — signals the dispatch loop to exit, closes the session, and resets state.
await sse.stop()

Handlers

Register event handlers with decorators. Multiple handlers on the same event type run sequentially in registration order.
from pytonapi.streaming import (
    ActionsNotification,
    ActionType,
    Finality,
    TransactionsNotification,
)


@sse.on_transactions(min_finality=Finality.PENDING)
async def on_tx(n: TransactionsNotification) -> None:
    for tx in n.transactions:
        print(tx.get("hash"))


@sse.on_actions(
    min_finality=Finality.FINALIZED,
    action_types=[ActionType.JETTON_TRANSFER, ActionType.TON_TRANSFER],
)
async def on_action(n: ActionsNotification) -> None:
    for a in n.actions:
        print(a.get("type"))
Three decorator forms are supported:
@sse.on_transactions                                       # bare
@sse.on_transactions(min_finality=Finality.CONFIRMED)      # with parameters
sse.on_transactions(my_callback, min_finality=...)         # programmatic
DecoratorEvent typeExtra parameters
on_transactions()transactionsmin_finality
on_actions()actionsmin_finality, action_types
on_traces()tracemin_finality
on_account_states()account_state_changemin_finality
on_jettons()jettons_changemin_finality
on_trace_invalidated()trace_invalidated
on_trace_invalidated — fires when a previously delivered pending or confirmed trace becomes invalid. Discard cached data for that trace.
from pytonapi.streaming import TraceInvalidatedNotification


@sse.on_trace_invalidated
async def on_invalidated(n: TraceInvalidatedNotification) -> None:
    print(f"Trace invalidated: {n.trace_external_hash_norm}")
An exception inside a handler halts dispatch for that notification (fail-fast).

Filtering

The streaming API delivers all transactions for the subscribed addresses. To filter by specific criteria (opcode, amount, source, etc.), apply the logic inside the handler:
JETTON_TRANSFER = "0x0f8a7ea5"
JETTON_TRANSFER_NOTIFICATION = "0x7362d09c"


@sse.on_transactions(min_finality=Finality.FINALIZED)
async def on_jetton_activity(n: TransactionsNotification) -> None:
    for tx in n.transactions:
        in_msg = tx.get("in_msg", {})
        opcode = in_msg.get("opcode")
        if opcode in (JETTON_TRANSFER, JETTON_TRANSFER_NOTIFICATION):
            print(f"Jetton activity (opcode {opcode}): {tx.get('hash')}")
Opcodes are hex strings (e.g. "0x7362d09c"). See Action Types for the full ActionType enum.

Finality

Every trace-based notification carries a finality field. The lifecycle is monotonic per trace: pendingconfirmedfinalized. At any point before finalized, a trace_invalidated event may signal rollback.
LevelLatencyGuarantee
Finality.PENDING~30-100 msSpeculative / emulated. May be rolled back.
Finality.CONFIRMEDSecondsIn a signed shard block. Small rollback risk.
Finality.FINALIZEDSecondsCommitted in masterchain. Irreversible.
min_finality sets the minimum level a handler accepts. Default: Finality.FINALIZED.
min_finalityPendingConfirmedFinalized
PendingDeliveredDeliveredDelivered
ConfirmedSkippedDeliveredDelivered
FinalizedSkippedSkippedDelivered
Convenience properties: .is_pending, .is_confirmed, .is_finalized.

Connection State

StateMeaning
IDLENot connected.
CONNECTINGOpening the connection.
SUBSCRIBEDConnected, receiving notifications.
RECONNECTINGRe-establishing after a drop.
Properties: state, is_subscribed, is_connecting, is_reconnecting.
await sse.wait_subscribed(timeout=10.0)
Monitor transitions via on_state_change callback (sync or async):
from pytonapi.streaming import ConnectionState

def on_state(state: ConnectionState) -> None:
    print(state.value)

sse = TonapiSSE("YOUR_API_KEY", Network.MAINNET, on_state_change=on_state)

Reconnection

Automatic reconnection on transient failures (5xx, 429, streaming transport errors, heartbeat timeout). All other client errors (400, 401, 403, 404, 405, 409, 422) are fatal and stop immediately.
from pytonapi.types import ReconnectPolicy

policy = ReconnectPolicy(
    max_reconnects=10,
    delay=0.5,
    max_delay=10.0,
    backoff_factor=2.0,
)
sse = TonapiSSE("YOUR_API_KEY", Network.MAINNET, reconnect_policy=policy)
ParameterDefaultDescription
max_reconnects10Maximum attempts (-1 for unlimited).
delay0.5Initial delay in seconds.
max_delay10.0Upper bound for backoff delay.
backoff_factor2.0Multiplier applied on each attempt.
Always call stop() in a finally block. Without a clean disconnect the server may hold the session open, blocking new connections until it times out.

Dynamic Subscription

WebSocket only. SSE does not support dynamic subscription changes.
After start() establishes the initial connection, TonapiWebSocket can modify subscriptions on the fly without reconnecting. dynamic_subscribe — replace the current subscription (snapshot semantics). All previously watched addresses/traces are replaced by the new set.
await ws.dynamic_subscribe(
    addresses=["EQDtFpEwcFAEcRe5mLVh2N6C0x-_hJEM7W61_JLnSF74p4q2"],
    min_finality=Finality.CONFIRMED,
    include_metadata=True,
)
ParameterDefaultDescription
addressesOptionalAddresses to watch (in any form).
trace_external_hash_normsOptionalTrace hashes to watch.
typesOptionalEvent types to receive.
min_finalityFinalizedMinimum finality level.
include_address_bookFalseInclude DNS-resolved names.
include_metadataFalseInclude token metadata.
action_typesOptionalFilter actions by type.
supported_action_typesOptionalAdvertise client-supported action types.
dynamic_unsubscribe — remove addresses or trace hashes from the current subscription.
await ws.dynamic_unsubscribe(addresses=["EQDtFpEwcFAEcRe5mLVh2N6C0x-_hJEM7W61_JLnSF74p4q2"])
ParameterDefaultDescription
addressesOptionalAddresses to stop watching.
trace_external_hash_normsOptionalTrace hashes to stop watching.
Both methods raise RuntimeError if called without an active WebSocket connection, and TONAPIStreamingError if the server rejects the request.

Errors

See Errors for the full exception hierarchy. Streaming-specific exceptions:
ExceptionWhen
TONAPIStreamingErrorTransport-level error during streaming.
TONAPIConnectionLostErrorReconnect limit exhausted. Exposes .attempts count.
All client errors except 429 are fatal — no reconnect is attempted (400, 401, 403, 404, 405, 409, 422).