A collection of Rust and TypeScript packages for Solana stream data, operated by ValidatorsDAO. This repository is published as open-source software (OSS) and is freely available for anyone to use.
This project provides libraries and tools for streaming real-time data from the Solana blockchain. It supports both Geyser and Shreds approaches, making it easier for developers to access Solana data streams.
- Geyser gRPC (TypeScript/Rust): production-ready streaming with resilient reconnects
- Shreds gRPC (TypeScript/Rust): raw shreds over gRPC for high-throughput ingestion
- UDP Shreds (Rust): lowest-latency signal for real-time event detection
- Refreshed starter layout and docs
- Yellowstone Geyser gRPC connection upgraded to an NAPI-RS-powered client for better backpressure
- NAPI-powered Shreds client/decoder so TypeScript can tap Rust-grade throughput
- Improved backpressure handling and up to 4x streaming efficiency (400% improvement)
- Faster real-time Geyser streams for TypeScript clients with lower overhead
- Rust Geyser exports track
yellowstone-grpc-client13.x and expose deshred, token-account expansion, and cuckoo filter helpers - Ping/Pong handling to keep Yellowstone gRPC streams alive
- Exponential reconnect backoff plus gap recovery (
fromSlot/from_slot) - Bounded queues/channels with drop logging for backpressure safety
- Code-based subscription filters in TypeScript
- Optional runtime metrics logging (TypeScript)
- Default filters drop vote/failed transactions to reduce traffic
- Rust Geyser client ships the same safeguards and powers UDP Shreds for fastest signal
Tip: start with slots, then add filters as needed. When resuming from fromSlot,
duplicates are expected.
If you have ERPC Dedicated Shreds, you can forward raw Shreds over UDP to your own listener.
This is Solana’s fastest observation layer—before Geyser gRPC and far ahead of RPC/WebSocket.
The SDK includes a simple Rust sample; use the generic_logger binary to watch any program of your choice.
- Shreds arrive first: validator-to-validator Shreds land before Geyser gRPC or RPC/WebSocket, so latency-critical flows see events earliest.
- UDP keeps overhead tiny: no connection setup, retransmit, or ordering; matches the on-wire format between validators.
- Optional latency monitoring uses a DashMap-backed slot tracker to reduce lock contention.
- Trade-off: pre-finalization data can be missing/out-of-order/failed—handle that as part of the speed bargain.
Note: the shared Shreds gRPC endpoint runs over TCP, so it’s slower than UDP Shreds.
- Sample code (
shreds-udp-rs, Rust): set any program of your own as the watch target.
https://github.com/ValidatorsDAO/solana-stream/tree/main/temp-release/shreds-udp-rs - Quick start requires
settings.jsoncplus env (e.g.,SOLANA_RPC_ENDPOINT); see the sample README. - Dedicated Shreds users: point your Shreds sender to the sample’s
ip:portto see detections. - Not on UDP yet? Run it locally or on your own server to explore logs and customize handlers.
This example comes from the SDK sample; clone and run it to see hits with your own watch target.
- All code and README docs are in the Solana Stream SDK repo:
https://github.com/ValidatorsDAO/solana-stream
- client/geyser-rs/: Rust client using Geyser gRPC
- client/shreds-rs/: Rust client for Shredstream over gRPC
- client/shreds-udp-rs/: Minimal UDP shred listener with configurable program watch example
- client/geyser-ts/: TypeScript client using Geyser gRPC
- client/shreds-ts/: TypeScript client for Shredstream over gRPC
- crate/solana-stream-sdk/: Rust SDK for Solana stream functionality
- package/solana-stream-sdk/: TypeScript SDK for Solana stream functionality
- Node.js 22 or 24 (LTS) for TypeScript packages
- Rust (for Rust packages)
- pnpm (for package management)
For the entire workspace:
git clone https://github.com/ValidatorsDAO/solana-stream.git
cd solana-stream
pnpm installCreate a .env file at client/geyser-ts/.env with your environment variables:
# Optional: required only if your endpoint enforces auth
X_TOKEN=YOUR_X_TOKEN
GEYSER_ENDPOINT=https://grpc-ams.erpc.global
SOLANA_RPC_ENDPOINT="https://edge.erpc.global?api-key=YOUR_API_KEY"Next, build and run the client:
pnpm -F @validators-dao/solana-stream-sdk build
pnpm -F geyser-ts dev- A 1-day free trial for the Geyser gRPC endpoints is available by joining the Validators DAO Discord community. Please try it out:
Create a .env file (placed in the project root)
SHREDS_ENDPOINT=https://shreds-ams.erpc.global
SOLANA_RPC_ENDPOINT="https://edge.erpc.global?api-key=YOUR_API_KEY"Run the sample client
cargo run -p shreds-rsThe sample code can be found at:
https://github.com/ValidatorsDAO/solana-stream/blob/main/client/shreds-rs/src/main.rs
A 1-day free trial for the Shreds endpoints is available by joining the Validators DAO Discord community. Please try it out: https://discord.gg/C7ZQSrCkYR
You can also use the published crate in your own projects:
[dependencies]
solana-stream-sdk = "1.4.0"
tokio = { version = "1", features = ["rt-multi-thread", "macros"] }
dotenvy = "0.15"
solana-entry = "3.0.12"
bincode = "1.3.3"use solana_stream_sdk::{CommitmentLevel, ShredstreamClient};
use std::env;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Load environment variables
dotenvy::dotenv().ok();
// Connect to shreds endpoint
let endpoint = env::var("SHREDS_ENDPOINT")
.unwrap_or_else(|_| "https://shreds-ams.erpc.global".to_string());
let mut client = ShredstreamClient::connect(&endpoint).await?;
// Create subscription for specific account
let request = ShredstreamClient::create_entries_request_for_account(
"6EF8rrecthR5Dkzon8Nwu78hRvfCKubJ14M5uBEwF6P",
Some(CommitmentLevel::Processed),
);
// Subscribe to entries stream
let mut stream = client.subscribe_entries(request).await?;
// Process incoming entries
while let Some(entry) = stream.message().await? {
let entries = bincode::deserialize::<Vec<solana_entry::entry::Entry>>(&entry.entries)?;
println!("Slot: {}, Entries: {}", entry.slot, entries.len());
for entry in entries {
println!(" Entry has {} transactions", entry.transactions.len());
}
}
Ok(())
}For specific packages, navigate to the package directory and install dependencies.
client/shreds-udp-rs listens for Shredstream over UDP and highlights watched programs (configurable via settings.jsonc). Settings live in client/shreds-udp-rs/settings.jsonc and are embedded at build time; secrets like RPC can be overridden via environment variables.
Quick start:
export SOLANA_RPC_ENDPOINT=https://api.mainnet-beta.solana.com # pass secrets via env only
cargo run -p shreds-udp-rs # settings already in settings.jsoncLog legend:
- Prefix:
🎯program hit,🐣authority hit (🎯🐣means both) - Action:
🐣create,🟢buy,🔻sell,🪙other,❓unknown/missing amounts - Votes skipped by default (
skip_vote_txs=true) - SOL values shown are instruction limits; actual fills require event/meta data (e.g., Geyser/RPC).
- UDP shreds are processed directly; not dependent on RPC commitment. Failed transactions may still appear; missing fields show as
❓.
Components from crate/solana-stream-sdk (5 layers):
- Config loader (
ShredsUdpConfig): reads JSONC/env and buildsProgramWatchConfig. Usewatch_config_no_defaults()to define your own watch set (composite mint finder = SPL Token MintTo/Initialize). - Receiver (
UdpShredReceiver): minimal UDP socket reader with timestamps. - Pipeline (5 layers): ① receive/prefilter (
decode_udp_datagram) → ② FEC buffer (insert_shred+ShredsUdpState) → ③ deshred (deshred_shreds_to_entries) → ④ watcher/detail (collect_watch_events+ detailers) → ⑤ sink (logs/custom hooks). - One-call convenience:
handle_pumpfun_watcherwraps the same 5 layers (legacy default watcher; set your own watch IDs). - Customize sink/detailer: via
ProgramWatchConfig::with_detailers(...)or replace the sink with your own hook. - Vote filtering: by default
skip_vote_txs=true, so vote-only shreds/txs are dropped early. - Samples:
cargo run -p shreds-udp-rs --bin generic_logger(recommended; setGENERIC_WATCH_PROGRAM_IDS/GENERIC_WATCH_AUTHORITIESto watch your own programs) orcargo run -p shreds-udp-rs(one-call wrapper).
Troubleshooting:
- Use
solana-stream-sdk >= 1.4.0for Direct Shreds UDP. Agave 3.x serializes deshredded entries withwincode; SDK 1.2.0 triedbincodefirst in the UDP helper, and SDK 1.2.1 could still decode from the middle of a multi-FEC entry segment. - Errors such as
entry decode failed: invalid value: integer ..., expected a valid transaction message version,continue signal on byte-three,unexpected end of file, oralias encodingusually indicate a codec mismatch rather than firewall loss. - UDP packet sizes around 1203/1228 bytes are normal Merkle shred sizes and do not by themselves indicate truncation.
Design notes
- Layered pipeline (5 layers): ① UDP receive → ② FEC buffer/pre-deshred → ③ deshred → ④ watcher (mint extraction) → ⑤ detailer/sink (labeling + log output). Each stage can be swapped or reused.
- Pure UDP/FEC path: single-purpose deshredder tuned for Agave merkle sizing; leaves ledger/rpc out of the hot path.
- Config is JSONC/env: secrets (RPC) in env, behavior (watch ids, logging) in JSONC.
- Composable stages: receiver → deshred → watcher → detailer → sink; each stage can be swapped or reused.
- Signal-first logging: emoji at a glance, vote-filtered by default, and mint-level detail with adapters.
- Small, dependency-light SDK crate backing a CLI client; intended to embed into larger consumers as well.
Quick choices:
- Want a one-call loop? Use
handle_pumpfun_watcherin your own binary and set your own watch IDs/env as needed. - Need to act on detections (e.g., push to a queue, custom filtering, alternate watchers/detailers)? Use the modular pipeline (
decode_udp_datagram→insert_shred→deshred_shreds_to_entries→collect_watch_events) and hook your own sink right after detection (seeclient/shreds-udp-rscustom hook example).
Minimal usage example (Rust):
use solana_stream_sdk::shreds_udp::{ShredsUdpConfig, ShredsUdpState, DeshredPolicy, handle_pumpfun_watcher};
use solana_stream_sdk::UdpShredReceiver;
use std::sync::Arc;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let cfg = ShredsUdpConfig::from_env(); // reads SHREDS_UDP_CONFIG jsonc too
let mut receiver = UdpShredReceiver::bind(&cfg.bind_addr, None).await?;
let policy = DeshredPolicy { require_code_match: cfg.require_code_match };
let watch_cfg = Arc::new(cfg.watch_config());
let state = ShredsUdpState::new(&cfg);
loop {
handle_pumpfun_watcher(&mut receiver, &state, &cfg, policy, watch_cfg.clone()).await?;
}
}Modular pipeline example (custom watch set):
use solana_stream_sdk::shreds_udp::{
collect_watch_events, decode_udp_datagram, deshred_shreds_to_entries, insert_shred,
DeshredPolicy, ShredInsertOutcome, ShredSource, ShredsUdpConfig, ShredsUdpState,
};
use solana_stream_sdk::txn::{ProgramWatchConfig, SplTokenMintFinder};
use solana_stream_sdk::UdpShredReceiver;
use std::sync::Arc;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let cfg = ShredsUdpConfig::from_env();
let mut receiver = UdpShredReceiver::bind(&cfg.bind_addr, None).await?;
let policy = DeshredPolicy { require_code_match: cfg.require_code_match };
let state = ShredsUdpState::new(&cfg);
let watch_cfg = Arc::new(
ProgramWatchConfig::new(vec![], vec![]) // define your own watch set
.with_mint_finder(Arc::new(SplTokenMintFinder))
.with_detailers(Vec::new()),
);
loop {
let datagram = receiver.recv_raw().await?;
if let Some(decoded) = decode_udp_datagram(&datagram, &state, &cfg).await {
if let ShredInsertOutcome::Ready(ready) =
insert_shred(decoded, &datagram, &state, &cfg, &policy).await
{
let entries = deshred_shreds_to_entries(&ready.shreds)?;
let txs: Vec<_> = entries.iter().flat_map(|e| e.transactions.iter()).collect();
let _hits = collect_watch_events(ready.key.slot, &txs, watch_cfg.as_ref(), 0);
state.remove_batch(&ready.key).await;
if matches!(ready.source, ShredSource::Data) {
state.mark_completed(ready.key).await;
}
}
}
}
}Filtering remains experimental on the Shreds gRPC path (shreds-rs, shreds-ts): requests should send empty filter maps because shreds-side filters are not usable yet. Geyser gRPC filters are fine. For workloads that need filtering, prefer the high-speed, customizable UDP shreds pipeline described above. Occasionally, data may not be fully available, and filters may not be applied correctly on the shreds gRPC path.
If you encounter such cases, please report them by opening an issue at: https://github.com/ValidatorsDAO/solana-stream/issues
Your feedback greatly assists our debugging efforts and overall improvement of this feature.
Other reports and suggestions are also highly appreciated.
You can also join discussions or share feedback on Validators DAO's Discord community: https://discord.gg/C7ZQSrCkYR
This project uses a monorepo structure with both Rust and TypeScript components:
- Rust packages: Managed with Cargo
- TypeScript packages: Managed with pnpm workspaces
- Unified configuration: Shared TypeScript and Prettier configurations
# Build all TypeScript packages
pnpm build
# Build Rust packages
cargo buildEach package contains its own documentation and usage examples. Please refer to the individual package READMEs for specific implementation details.
We welcome contributions from the community! This project is continuously updated and improved.
- Fork the repository
- Create a feature branch
- Make your changes
- Submit a pull request
This project is operated and maintained by ValidatorsDAO, focused on providing robust tools and infrastructure for the Solana ecosystem.
This repository is actively maintained and will receive continuous updates to improve functionality and add new features.
The package is available as open source under the terms of the Apache-2.0 License.
Everyone interacting in the Validators DAO project’s codebases, issue trackers, chat rooms and mailing lists is expected to follow the code of conduct.
