Skip to main content

Client

from pytonapi.streaming import TonapiStreaming
from pytonapi.types import Network

streaming = TonapiStreaming("YOUR_API_KEY", Network.MAINNET)
ParameterDefaultDescription
api_keyRequiredAPI key from tonconsole.com.
networkRequiredMainnet or Testnet.
base_urlOptionalOverride the default URL.
headersOptionalExtra HTTP headers sent with every request.
reconnect_policyOptionalSee Reconnection.

Session

async with TonapiStreaming("YOUR_API_KEY", Network.MAINNET) as streaming:
    async for event in streaming.sse.subscribe_transactions():
        print(event.tx_hash)
Transports (streaming.sse and streaming.ws) are lazy-initialized on first access and share the session with the parent client. Accessing a transport before create_session() raises TONAPISessionNotCreatedError.

Subscriptions

All four methods are available on both transports with identical signatures. Use streaming.sse or streaming.ws interchangeably:
async for event in streaming.sse.subscribe_transactions(accounts=["EQ..."]):
    ...
subscribe_transactions — returns AsyncIterator[TransactionEvent].
ParameterDefaultDescription
accountsOptionalAccount addresses to monitor. Omit to subscribe to all accounts.
operationsOptionalOpcode filters — hex strings like 0x0f8a7ea5 or Opcode enum values.
stopOptionalasyncio.Event to stop the subscription gracefully.
subscribe_blocks — returns AsyncIterator[BlockEvent].
ParameterDefaultDescription
workchainOptionalFilter by workchain. Omit to subscribe to all blocks.
stopOptionalasyncio.Event to stop the subscription gracefully.
subscribe_traces — returns AsyncIterator[TraceEvent].
ParameterDefaultDescription
accountsOptionalAccount addresses to monitor. Omit to subscribe to all accounts.
stopOptionalasyncio.Event to stop the subscription gracefully.
subscribe_mempool — returns AsyncIterator[MempoolEvent].
ParameterDefaultDescription
accountsOptionalAccount addresses to filter. Omit to subscribe to all messages.
stopOptionalasyncio.Event to stop the subscription gracefully.

Opcode Filtering

Use the Opcode enum or raw hex strings to filter transactions by operation code:
from pytonapi.types import Opcode

async for event in streaming.sse.subscribe_transactions(
    accounts=["EQ..."],
    operations=[Opcode.JETTON_TRANSFER, Opcode.JETTON_NOTIFY],
):
    print(f"Jetton activity: {event.tx_hash}")
EnumHex
TEXT_COMMENT0x00000000
ENCRYPTED_TEXT_COMMENT0x2167da4b
EXCESS0xd53276db
BOUNCE0xffffffff
TOP_UP0xd372158c
JETTON_TRANSFER0x0f8a7ea5
JETTON_INTERNAL_TRANSFER0x178d4519
JETTON_NOTIFY0x7362d09c
JETTON_BURN0x595f07bc
JETTON_MINT0x642b7d07
JETTON_CHANGE_ADMIN0x6501f354
JETTON_CHANGE_METADATA0xcb862902
JETTON_CLAIM_ADMIN0xfb88e119
JETTON_BURN_NOTIFICATION0x7bdd97de
JETTON_CALL_TO0x235caf52
JETTON_SET_STATUS0xeed236d3
JETTON_UPGRADE0x2508d66a
NFT_TRANSFER0x5fcc3d14
NFT_OWNERSHIP_ASSIGNED0x05138d91
GET_ROYALTY_PARAMS0x693d3950
GET_STATIC_DATA0x2fcb26a2
REPORT_ROYALTY_PARAMS0xa8cb00ad
REPORT_STATIC_DATA0x8b771735
OUTBID_NOTIFICATION0x557cea20
PROVE_OWNERSHIP0x04ded148
OWNERSHIP_PROOF0x0524c7ae
SBT_DESTROY0x1f04537a
SBT_REVOKE0x6f89f5e3
SBT_REQUEST_OWNER0xd0c3bfea
SBT_OWNER_INFO0x0dd607e3
CHANGE_DNS_RECORD0x4eb1f0f9
DELETE_DNS_RECORD0x4eb1f0f9
DNS_BALANCE_RELEASE0x4ed14b65
TELEITEM_BID_INFO0x38127de1
TELEITEM_CANCEL_AUCTION0x371638ae
TELEITEM_DEPLOY0x299a3e15
TELEITEM_OK0xa37a0983
TELEITEM_RETURN_BID0xa43227e1
TELEITEM_START_AUCTION0x487a8e81
TELEMINT_DEPLOY0x4637289a
TELEMINT_DEPLOY_V20x4637289b
MULTISIG_NEW_ORDER0xf718510f
MULTISIG_APPROVE0xa762230f
MULTISIG_EXECUTE_INTERNAL0xa32c59bf
You can also pass raw hex strings directly: operations=["0x0f8a7ea5"].
Full opcode reference: tonkeeper/tongo/abi/messages.md.

Graceful Shutdown

Pass an asyncio.Event to any subscribe method. When set, the iterator exits cleanly after the current iteration — no exception is raised.
import asyncio
import signal

from pytonapi.streaming import TonapiStreaming
from pytonapi.types import Network

stop = asyncio.Event()


async def main() -> None:
    loop = asyncio.get_running_loop()
    loop.add_signal_handler(signal.SIGINT, stop.set)
    loop.add_signal_handler(signal.SIGTERM, stop.set)

    async with TonapiStreaming("YOUR_API_KEY", Network.MAINNET) as streaming:
        async for event in streaming.sse.subscribe_transactions(
            accounts=["EQ..."],
            stop=stop,
        ):
            print(event.tx_hash)


if __name__ == "__main__":
    asyncio.run(main())
stop = asyncio.Event()


async def stop_after(seconds: float) -> None:
    await asyncio.sleep(seconds)
    stop.set()


async with TonapiStreaming("YOUR_API_KEY", Network.MAINNET) as streaming:
    asyncio.create_task(stop_after(30))

    async for event in streaming.sse.subscribe_transactions(
        accounts=["EQ..."],
        stop=stop,
    ):
        print(event)

Reconnection

Transient errors (5xx, 429, heartbeat timeout) are retried automatically with exponential backoff. Client errors (400, 401, 403, 404) raise immediately.
from pytonapi.types import ReconnectPolicy

async with TonapiStreaming(
    "YOUR_API_KEY",
    Network.MAINNET,
    reconnect_policy=ReconnectPolicy(
        max_reconnects=10,   # -1 for unlimited
        delay=0.5,           # initial delay (seconds)
        max_delay=10.0,      # upper bound after backoff
        backoff_factor=2.0,  # multiplier per attempt
    ),
) as streaming:
    ...
Default progression: 0.5 s → 1.0 s → 2.0 s → 4.0 s → 8.0 s → 10.0 s (capped). After all attempts are exhausted, TONAPIConnectionLostError is raised. SSE heartbeats arrive every 5 seconds — if no data arrives within 30 seconds, a reconnect is triggered.
Always close the session properly. After an ungraceful disconnect the server may hold the old session open, counting against your connection limit.