Skip to main content

Cart - Compressed Transport Format

💜 Support the DIG Network

Help build the future of decentralized storage! The DIG Network is an open-source project that needs community support to continue development.

💜 Support the Project → - Donate crypto, buy NFTs, or sponsor development

Overview​

Carts are compressed transport containers that bundle capsules with their datastore membership proofs. Each capsule includes its datastore ID and merkle proof, allowing recipients to verify data integrity against the on-chain merkle root without trusting the transport layer.

Design Principles​

  • Verifiable Transport: Each capsule includes merkle proof for blockchain verification
  • Maximum Compression: Efficient encoding of proofs and capsule data
  • Zero Trust: Recipients verify against on-chain merkle roots
  • Streaming Support: Extract and verify capsules individually
  • Batch Efficiency: Transport thousands of capsules with minimal overhead

Binary Format​

Cart Structure​

[Cart Header]           // 64 bytes fixed
├─ Magic: "CART" // 4 bytes: 0x43415254
├─ Version // 2 bytes: Format version
├─ Compression // 1 byte: Algorithm used
├─ ItemCount // 4 bytes: Number of capsules
├─ UncompressedSize // 8 bytes: Total uncompressed size
├─ CompressedSize // 8 bytes: Total compressed size
├─ IndexOffset // 8 bytes: Offset to index section
├─ IndexSize // 4 bytes: Size of index
├─ Timestamp // 8 bytes: Creation time
├─ Checksum // 8 bytes: XXH64 of compressed data
└─ Reserved // 16 bytes: Future use

[Compressed Data] // Variable size
├─ Block 0 // First compression block
├─ Block 1 // Second compression block
├─ ... // Additional blocks
└─ Block N // Final compression block

[Index Section] // Uncompressed for fast access
├─ IndexHeader // 16 bytes
└─ IndexEntries[] // 48 bytes each
├─ CapsuleId // 32 bytes: SHA-256 of capsule
├─ DatastoreId // 8 bytes: Which datastore
├─ BlockNumber // 2 bytes: Which block contains item
├─ BlockOffset // 4 bytes: Offset within block
└─ Flags // 2 bytes: Compression flags

Capsule Entry Format​

Each capsule in the cart contains:

[Capsule Entry]
├─ Header // 16 bytes
│ ├─ EntrySize // 4 bytes: Total entry size
│ ├─ CapsuleSize // 4 bytes: Capsule data size
│ ├─ ProofSize // 2 bytes: Merkle proof size
│ ├─ DatastoreId // 8 bytes: Datastore identifier
├─ CapsuleData // Variable: The actual capsule
└─ MerkleProof // Variable: Proof of inclusion
├─ LeafIndex // 4 bytes: Position in tree
├─ TreeDepth // 1 byte: Merkle tree depth
├─ PathBits // Variable: Left/right path
└─ Siblings[] // Variable: 32 bytes each

Implementation​

Cart Creation​

class Cart:
def __init__(self, compression_type=CompressionType.ZSTD):
self.compression = compression_type
self.entries = []
self.index = CartIndex()

def add_capsule(self, capsule: Capsule, datastore_id: str, merkle_proof: MerkleProof):
"""Add a capsule with its datastore proof"""
entry = CapsuleEntry(
capsule_id=capsule.hash,
capsule_data=capsule.data,
datastore_id=datastore_id,
merkle_proof=merkle_proof
)

self.entries.append(entry)
self.index.add(capsule.hash, len(self.entries) - 1)

def compress(self) -> bytes:
"""Compress cart with proof-aware optimization"""
# Group by datastore for better compression
grouped = self._group_by_datastore()

# Compress merkle proofs separately (they compress well)
proof_data = self._extract_proofs(grouped)
compressed_proofs = self._compress_proofs(proof_data)

# Compress capsule data with dictionary training
capsule_data = self._extract_capsules(grouped)
compressed_capsules = self._compress_capsules(capsule_data)

# Build final cart
return self._build_cart(compressed_proofs, compressed_capsules)

def _compress_proofs(self, proofs: List[MerkleProof]) -> bytes:
"""Compress merkle proofs efficiently"""
# Proofs have high redundancy - use high compression
cctx = zstd.ZstdCompressor(level=22)

# Pack proofs efficiently
packed = self._pack_merkle_proofs(proofs)
return cctx.compress(packed)

Cart Extraction and Verification​

class CartReader:
def __init__(self, cart_path: str):
self.file = open(cart_path, 'rb')
self.header = self._read_header()
self.index = self._load_index()

async def extract_and_verify(self, capsule_id: str, blockchain: BlockchainInterface) -> VerifiedCapsule:
"""Extract a capsule and verify against on-chain merkle root"""
# Find capsule in index
entry = self.index.get(capsule_id)
if not entry:
raise KeyError(f"Capsule {capsule_id} not found")

# Extract capsule and proof
capsule_data, datastore_id, merkle_proof = self._extract_entry(entry)

# Get on-chain merkle root for this datastore
merkle_root = await blockchain.get_datastore_root(datastore_id)
if not merkle_root:
raise ValueError(f"Datastore {datastore_id} not found on chain")

# Verify merkle proof
if not self._verify_merkle_proof(capsule_id, merkle_proof, merkle_root):
raise ValueError(f"Merkle proof verification failed for {capsule_id}")

return VerifiedCapsule(
data=capsule_data,
datastore_id=datastore_id,
verified=True,
merkle_root=merkle_root
)

def _verify_merkle_proof(self, leaf_hash: bytes, proof: MerkleProof, root: bytes) -> bool:
"""Verify a capsule belongs to the merkle tree"""
current = leaf_hash

for i, sibling in enumerate(proof.siblings):
if proof.is_left_path(i):
current = sha256(current + sibling)
else:
current = sha256(sibling + current)

return current == root

Batch Verification​

async def verify_cart_contents(cart_path: str, blockchain: BlockchainInterface):
"""Verify all capsules in a cart against blockchain"""
reader = CartReader(cart_path)

# Group by datastore to minimize blockchain queries
datastore_groups = reader.group_by_datastore()

# Fetch all merkle roots in parallel
roots = {}
async with asyncio.TaskGroup() as tg:
for datastore_id in datastore_groups:
tg.create_task(fetch_root(blockchain, datastore_id, roots))

# Verify all capsules
results = []
for datastore_id, capsule_ids in datastore_groups.items():
merkle_root = roots.get(datastore_id)
if not merkle_root:
results.append({"datastore": datastore_id, "error": "Not found on chain"})
continue

for capsule_id in capsule_ids:
entry = reader.extract_entry(capsule_id)
is_valid = verify_merkle_proof(
capsule_id,
entry.merkle_proof,
merkle_root
)
results.append({
"capsule": capsule_id,
"datastore": datastore_id,
"valid": is_valid
})

return results

Compression Strategy​

Proof-Aware Compression​

Since merkle proofs have high redundancy:

def pack_merkle_proofs(proofs: List[MerkleProof]) -> bytes:
"""Pack merkle proofs efficiently"""
# Common depth for all proofs in same datastore
common_depth = proofs[0].depth if proofs else 0

packed = struct.pack('<H', len(proofs)) # Proof count
packed += struct.pack('<B', common_depth) # Common depth

for proof in proofs:
# Pack leaf index (often sequential)
packed += struct.pack('<I', proof.leaf_index)

# Pack path bits efficiently (1 bit per level)
path_bytes = (proof.path_bits + 7) // 8
packed += proof.path_data[:path_bytes]

# Pack siblings (high redundancy between proofs)
for sibling in proof.siblings:
packed += sibling

return packed

Size Optimization​

ComponentUncompressedCompressedRatio
Capsule Data100 MB40 MB60%
Merkle Proofs10 MB0.5 MB95%
Index1 MB0.8 MB20%
Total111 MB41.3 MB63%

Use Cases​

Content Distribution​

# Bundle capsules from a datastore for distribution
cart = Cart()
for capsule_id in datastore.capsule_ids:
capsule = storage.get_capsule(capsule_id)
proof = datastore.get_merkle_proof(capsule_id)
cart.add_capsule(capsule, datastore.id, proof)

# Compress and send
compressed = cart.compress() # e.g., 100MB → 40MB
await network.broadcast(compressed)

Verified Sync​

# Sync capsules with verification
async def sync_datastore(datastore_id: str, peer: NetworkPeer):
# Request cart with all capsules
cart_data = await peer.request_datastore_cart(datastore_id)

# Save temporarily
cart_path = save_temp_file(cart_data)
reader = CartReader(cart_path)

# Get on-chain merkle root
merkle_root = await blockchain.get_datastore_root(datastore_id)

# Extract and verify each capsule
for capsule_id in reader.list_capsules():
try:
verified = await reader.extract_and_verify(
capsule_id,
blockchain
)
await storage.store_verified_capsule(verified)
except ValueError as e:
log.warning(f"Invalid capsule {capsule_id}: {e}")

Selective Download​

# Download specific capsules with proof
async def download_capsules(capsule_ids: List[str], datastore_id: str):
# Request cart with just these capsules
cart = await peer.request_capsules(capsule_ids, datastore_id)

# Verify against blockchain
verified_capsules = []
for capsule_id in capsule_ids:
verified = await cart.extract_and_verify(capsule_id, blockchain)
verified_capsules.append(verified)

return verified_capsules

Security Model​

Trust Model​

  • Zero Trust Transport: Don't trust cart creator
  • Blockchain Verification: Merkle roots are source of truth
  • Proof Validation: Every capsule must prove membership
  • Independent Verification: Any node can verify

Attack Resistance​

  • Fake Capsules: Detected by merkle proof failure
  • Wrong Datastore: Proof won't match on-chain root
  • Modified Data: Changes invalidate merkle proof
  • Proof Tampering: Cryptographically impossible

Performance Characteristics​

Verification Speed​

  • Single capsule: ~1ms (proof verification)
  • 1000 capsules: ~1 second (parallel verification)
  • Bottleneck: Blockchain merkle root queries

Memory Usage​

  • Streaming extraction: O(1) - constant memory
  • Batch verification: O(n) - proportional to capsule count
  • Index overhead: ~48 bytes per capsule

Network Efficiency​

  • Bulk transfer: 60-90% compression ratio
  • Proof overhead: ~500 bytes per capsule
  • Amortized cost: Decreases with cart size

Integration Example​

Full Workflow​

# 1. Creator builds cart
cart = Cart()
for capsule in datastore.get_all_capsules():
proof = datastore.get_merkle_proof(capsule.id)
cart.add_capsule(capsule, datastore.id, proof)

cart_data = cart.compress()

# 2. Transport (untrusted)
network.send(cart_data, recipient)

# 3. Recipient verifies
reader = CartReader(cart_data)
merkle_root = await blockchain.get_datastore_root(datastore_id)

for entry in reader.stream_entries():
if verify_merkle_proof(entry.capsule_id, entry.proof, merkle_root):
storage.store_capsule(entry.capsule_data)
else:
log.error(f"Invalid capsule: {entry.capsule_id}")