CIP-124: Recon (tip synchronization protocol)

The value of (prevTimestamp, eventHeight) is that it keeps the novelty together.

I expect this is very important to keep new events adjacent in the key space as it means we can use split keys around recent events, thus enabling fast sync on new events which is going to be the most common action by far.

So I concur we want to include both prevTimestamp and eventHeight in the event id.

However I still have some questions of the burden this places on nodes to generate event ids.

Do we need to ensure eventHeight is unique? Meaning if we have concurrent writes to a stream (on the same node or different nodes) if nodes choose the same eventHeight because they haven’t seen the other concurrent event what happens? The event CID will make sure the two events have unique event ids. Is there a process that eventually mutates on of the events to come after the other? Or is the same eventHeight a feature in that in clearly shows a branch in the stream? We don’t need to solve the conflict resolution problem here but we need to ensure that event ids are amenable to conflicts resolution strategies.

As an aside I was thinking about another small cost of including (prevTimestamp, eventHeight) in that the event id is longer. My assumption is that so long as an event id fits in a CPU cache line or two the size doesn’t really matter that much.

Reading the spec it seems we have about 70 bytes. If we can get that down to 64 bytes then we can fit an entire event id in a single CPU cache line. If we use @jthor 's suggestion to use only 4 bytes from the init-event-cid then that takes us down to 66 bytes. We could also drop the cbor structure or use varints more aggressively to get this down to under 64 bytes quite easily. Or we could just use two cache lines and not try and get down to one. In either case my expectation is that we will likely want to pad the size of these events to an entire cache line to take advantage of SIMD for comparisons etc. All of this is premature optimization :wink: so we should run some experiments before adding complexity to the event id structure.

eventIdBytes = EncodeDagCbor([ // ordered list not dict
  concatBytes(
    varint(networkId), // 1-2 bytes
    last16Bytes(separator-value), // 16 bytes 
    last16Bytes(hash-sha256(stream-controller-did)), // 16 bytes
    last8Bytes(init-event-cid) // 8 bytes 
  ),
  prevTimestamp, // 8 bytes (u64)
  eventHeight, // 8 bytes (u64) or maybe less if we use  varint
  eventCID // 8 bytes
]) // 4 bytes cbor overhead (haven't tested just a guess)

Are you saying that because we include prevTimestamp we can split keys around timestamps to only sync event after certain TimeEvents? That’s actually a quite nice property.

I like the idea of trying to squeze down to 64 bytes. Let’s see what we could feasilby get down to, not couunting cbor structure:

  • streamid - 2 bytes (our stream multicodec, varint(0xce))
  • streamtype - 1 byte (varint)
  • networkId - 1 byte (for mainnet and testnets, more overhead on local networks might be fine?)
  • sep-value - 16 bytes
  • ctrl-did - 16 bytes
  • init-event-cid - 4 bytes
  • prevTimestamp - 8 bytes (?)
  • eventHeight - 1 - 2 bytes (if varint)
  • eventCID - 34 bytes

That’s 83 - 84 bytes total.

Now your assumption about the eventCID being 8 bytes raises the question about if we actually need the full CID to be included. Could we just take the last 4 bytes here as well?
@AaronDGoldman thoughts?

We do want the full event CID. Once a ceramic node has the EventID it will go IPFS to retrieve the event itself. That event will have a prev with the previous event CID and the id with the Init Event CID.
If we have the EventID we can look up the init-event-cid, sep-value, and controller DID so we can safely abbreviate them.

The prevTimestamp and eventHeight are cbor unsigned integers so eventHeight will usually be 1 byte and the prevTimestamp will be 5 bytes

=< 23 : 1 byte
=< 255 : 2 byte
=< 65,535 : 3 byte
=< 4,294,967,295 : 5 byte
varint(0xce) + // streamid, 1 byte
varint(0x71) + // dag-cbor, 1 byte
EncodeDagCbor(
  [ // start list(4), 1 byte
    concatBytes( // start bytes len=41, 2 bytes
      varint(networkId), // 1 byte (5 for local network)
      last16Bytes(separator_value), // 16 bytes 
      last16Bytes(sha256(stream_controller_DID)), // 16 bytes
      last8Bytes(init_event_CID) // 8 bytes 
    ),
    prevTimestamp, // 5 bytes
    eventHeight, // 1-2 bytes
    eventCID // 36 bytes (4 static do we need them?)
      // start bytes len=34, 2 bytes
      // 0x01 cidv1, 1 byte
      // 0x71 dag-cbor, 1 byte
      // 0x12 sha2-256, 1byte
      // 0x20 varint(hash length), 1 byte
      // hash bytes, 32 bytes
  ] // end list 0 bytes
)

If we want to get from 84 to 64 maybe separator_value and stream_controller_DID_sha256 could be 8 bytes and the 4 static bytes in the CID baked into the 0xCE71 in the prefix.

What is the security risk of going from 16 to 8 bytes on the separator-value and controller-did?

the 4 static bytes in the CID baked into the 0xCE71 in the prefix

No I don’t think we can because it can be either dag-cbor or dag-jose and we need to be able to distinguish between the two.

Once a ceramic node has the EventID it will go IPFS to retrieve the event itself.

In theory we could have a ceramic based protocol for syncing event data based on eventIds without using bitswap. However, this seems more like a future optimization.

At 16 bytes (128 bits) we will likely never see a separator-value or controller-did collision. At 8 bytes (64 bits) we will start seeing them around 4 billon values. It would not be hard to generate order 4 billion DIDs or Models. That said it would be a collision with a random DID or Model. If an attacker wanted to collide with a specific model, then they would ne 2^64 models which would be too expensive.

Ok, if we grant that an attacker could deliberately get a collision with a random model what could they do? They could publish content that would be interspersed with the colliding model but of a different model. Any node following that separator-value value would retrieve both models and only after retrieving the blocks for the events would they realize that it was the wrong model. Tricking nodes into replicating their data. This seems bad but if they wanted the other nodes to replicate their data, they could just publish model instance documents for the model they collided with anyway.

The main value of the separator is that it serves as a Partition Key for sharding the data across many ceramic nodes. When two or more separators map to the same 64bits they would end up on the same shard(s) as each other. I may be being uncreative but I just don’t see how this lets the attacker do anything they couldn’t do by just spamming the model under attack.

So here the assumption we will still be relying on syncing directly from IPFS using BitSwap? @jthor @AaronDGoldman

My understanding is that in current implementation of Ceramic, the worst case scenario will be to attempt a sync from IPFS, even if the IPFS nodes are likely a bundled node running with Ceramic deployments. So without IPFS as a fallback, will data stored on Ceramic be more “centralised”?

The context is that we are building data explorer + hosted node + node-as-a
-service to make it easier for devs to build applications without running their own nodes. And the motivation is about treating Ceramic as a “logically centralised and unified” data network where developers could deploy models “to the network” instead of “to certain nodes”

I think here the key distinction is more about developer experience. Are developers expect to always know the URLs of nodes where their models are indexed? or they could just subscribe to the models ID? In others words, are streams content-addressed or location-addressed? And how does it work in the context with and without depending on IPFS?

Could elaborate on how interest discovery works? How does a node found out about others node from his peers who has overlapping interest via DHT?

eg.

type InitHeader struct {
  controllers [String]
  sort-key = 'model'
  sort-value = kjzl6hvfrbw6c82mkud4qs38zl4hd03ifoyg2ksvfjkhuxebfzh3ef89vwvtvrr
}

NodeA interest [ModelA, ModelB, ModelC]
NodeB interest [ModelA, ModelB]
NodeC interest [ModelB, ModelC]

How would node A, B, C annouce their interest using sort key & values?

DHT for interest discovery

In order to discover other ceramic nodes that have overlapping interest a node could announce itself in the DHT as interested in a model. the dht key would be derived by sortKey and sortValue

key =  are set, use `partition = hash(sortKey | sortValue)`

For now, we plan to keep using IPFS using BitSwap. It is possible that once we have the intrest data about other ceramic nodes we can have some optimization for first asking the ceramic nodes that we know are interested in that event if they have it but we still need to have IPFS around in case the event has been pinned by some off ceramic node pinning service.

Whether this makes data stored on Ceramic be more “centralized”, I don’t think so. The current common pattern is that many apps have only their own node and a few nodes trying to index the whole network having the events. Once the ceramic nodes find each other they are more likly to find content on ceramic nodes than any random IPFS node.

Developers are not expected to always know the URLs of nodes where their models are indexed. Thay are expected to know the URL of a node that is willing to do the work to answer their queries. If I contract with a node-as-a-service provider and I need to subscribe to a particular model, I would tell the node and it would add the model to its intrest ranges. As it syncs with other ceramic nodes it will pull in all the streams in the interest range and then be able to answer queries about those streams with no further WAN traffic.

There are two plans for interest discovery. The first is to have each Ceramic node advertise its interests. (multiadder, Start_key, stop_key, TTL_timestamp, sig) Each node can say that it is intrested in some range until sometime. Then the nodes can gossip about where each node is and what it’s intrest ranges are. The second plan would be to put provider records into the IPFS DHT to indicate that a particular node has an intrest in that (separatorKey, separatorValue).
At first, there will just not be that many Ceramic nodes so keeping a full intrest range and location for all other ceramic nodes is just not that expensive. A node will know who to call to sync if a new intrest range is added to its interests. Once the network grows beyond the scale where this full routing table is cheap, we can move to the DHT for intrest advertisement and discovery at the cost of DHT lookup latencies.

Mostly it is about syncing gossip. Gossip about the nodes a node knows exists. Gossip about ranges that the nodes have intrest in. Gossip about the events (EventIDs) that are in those ranges with other intrested nodes. Use the CIDs of the events to fetch the event data itself.
If I learn a node exists, I can ping it to prove it is reachable.
If I learn a node has an overlapping intrest range, I can exchange intrests with it to prove it.
If I learn a node had divergent set of events in an intrest range overlap, I can sync the EventIDs.
If I learn a EventID from a node, I can ask it for the IPFS block.
If the node that told me about an EventID cant give me the block, I can fall back to IPFS.

2 Likes

A helpful walkthrough of the Aljoscha Meyer paper :grin:

We did site “Range-Based Set Reconciliation and Authenticated Set Representations” [arXiv:2212.13567] in the CIP but I learned about the paper from this video.

1 Like
start = eventId(
    network_id = 0x00, // mainnet
    sort_value = last8Bytes(sha256(kjzl6hvfrbw6c82mkud4qs38zl4hd03ifoyg2ksvfjkhuxebfzh3ef89vwvtvrr)),
    controller = last8Bytes(repeat8(0x00)), // stream controller DID
    init_event = last4Bytes(repeat4(0x00)) // streamid
)

stop = eventId(
    network_id = 0x00, // mainnet
    sort_value = last8Bytes(kjzl6hvfrbw6c82mkud4qs38zl4hd03ifoyg2ksvfjkhuxebfzh3ef89vwvtvrr),
    controller = last8Bytes(repeat8(0xff)), // stream controller DID
    init_event = last4Bytes(repeat4(0xff)) // streamid
)

Stop eventId sort value is missing a sha256 operation. is this unintentional?

Another naive question: understanding that sort-value is encoded in eventID, how do we infer the sort key in event ID? How do we decide if it’s referring to a StreamID(include all events for StreamX) or “ModelID” (all events for all streams for model X)?

Yes, that seems like a typo in the spec!

1 Like

This is an interesting consideration. @AaronDGoldman now that we hash the sort-value, maybe we should just do hash(sort-key | sort-value) instead?

Some random questions from reading the CIP:

  • Is it possible to specify a sort range for all the streams of two different StreamTypes but that both use the same Model? For example, if in the future we introduce a new StreamType that isn’t ModelInstanceDocument but that still uses Models to define their schema. Could I sync all the streams belonging to this new streamtype and using a given model, but without seeing the MIDs using the same model?
  • It looks like subscribing to a single stream requires knowing the stream controller? How do you find that if you only have the streamid? Especially if individual events are no longer published to IPFS so you can’t just look up the init event by CID from IPFS.
  • “Random Peer Sync Order” section: how many random peers do we send new messages to?
  • “Interest Gossip” section: Can/should we just use gossipsub for this?
  • Will it be possible to include any sort of “progress bar” information back to users while an ongoing Recon sync is happening? If a node starts indexing a large model for the first time, it may take hours or more to sync in some cases. Having some way to track the progress and a rough ETA on completion would make for a much more positive user experience. Even if it’s as simple as X out of Y total events have been synced so far, but that would require a way for a node to know how many total events there are up front. Maybe there could be additional metadata attached to the recon messages where nodes indicate how many total events they are aware of in the interest range?
  1. So far Recon, and rust-ceramic is completely agnostic about which streamtype is being used. It actually doesn’t even retain this information.

  2. This is a good point. I think we should decide if this is something we actually want/need to support.

  3. Idk, @AaronDGoldman ?

  4. Can you say more? Why would that make sense?

  5. Interesting, so on a 1 MBit/s connection syncing a 5 GB dataset would take about an hour, so this seems like a relevant use case for sure. I think this lands in the bucket of future CIPs though, as all of our data sets are probably going to be much smaller and fast to sync at first.

  • Q: Is it possible to specify a sort range for all the streams of two different StreamTypes but that both use the same Model? For example, if in the future we introduce a new StreamType that isn’t ModelInstanceDocument but that still uses Models to define their schema. Could I sync all the streams belonging to this new streamtype and using a given model, but without seeing the MIDs using the same model?

    • A: The StreamSet is just the events that have that header so the StreamType does not matter. If it has a Model field it will sync.
  • Q: It looks like subscribing to a single stream requires knowing the stream controller? How do you find that if you only have the streamid? Especially if individual events are no longer published to IPFS so you can’t just look up the init event by CID from IPFS.

    • A: If we move from using the IPFS DHT+Bitswap to using Recon to transfer the events it will still be using the CID to retrieve Events. If you know only the StreamID you would need to send the CID to every ceramic node that you sync with. If you knew only the Model then you could only send the CID want to ceramic nodes that are interested in the Model. If you knew the Model and controller then you can be even more selective in the node you send the want to. Knowing more about the Event only helps to reduce overhead without this knowledge if falls back to a bitswap like overhead.
  • Q: “Random Peer Sync Order” section: how many random peers do we send new messages to?

    • A: For now, I would suggest that a ceramic node should try to sync will all other ceramic nodes it knows about. At some point that will become a scaling problem but for now there just are not that many nodes. By syncing with nodes that have low latency more often we will likely hear about most event from nearby nodes but will sync with all nodes eventually.
  • Q: “Interest Gossip” section: Can/should we just use gossipsub for this?

    • A: I think recons’ set-reconciliation based sync will work for synchronizing the Interest Ranges as well as the events. If we used gossipsub for this we would have the same missed messages would just be missed. By using sync Interest Ranges will eventually synced to all nodes in the network.
  • Q: Will it be possible to include any sort of “progress bar” information back to users while an ongoing Recon sync is happening? If a node starts indexing a large model for the first time, it may take hours or more to sync in some cases. Having some way to track the progress and a rough ETA on completion would make for a much more positive user experience. Even if it’s as simple as X out of Y total events have been synced so far, but that would require a way for a node to know how many total events there are up front. Maybe there could be additional metadata attached to the recon messages where nodes indicate how many total events they are aware of in the interest range?

    • A: We have a unusual kind of progress. Each time we process a massage we learn some slices of the interest range that are in sync. Take step 3: <- (aahed, C8EEC7, capture, 7C3B51, cheth, 549D95, coact, A24221, zymic) the range aahed - capture and coact - zymic are in sync but capture - coact is not. We have two ways we could report this. We could look at the name space as ask what ratio of the namespace is in sync. We could look at the events we know about and ask what ratio of events we know about is in sync. Both are misleading as there is no guarantee that the namespace is uniformly populated and we don’t know what is on the other side.

My preference would be to make a line that is the namespace with the keys that are at the boundaries and a green or red section showing the slices that are or are not in sync. Once a sync starts you would rapidly see the sections that have not changed go green as you learned that those ranges are in sync and then slowly see the remaining red get cleaned up as each section comes in to sync. Like watching an old school BitTorrent progress or windows defrag.

But you can specify a range of StreamIDs right? Can you specify the range in such a way that it includes all the StreamIDs of one streamtype but not those of another?

EDIT: I guess not, for the same reason we’re discussing in issue #2 - you can’t actually specify a range of streamids, since the controller takes precedence in defining the range.

I think this is a bit of a problem. I think there could be a lot of value in having Models remain a canonical way of specifying schemas even if composedb evolves beyond ModelInstanceDocuments or other database implementations introduce their own streamtypes for the actual underlying data. And I think in the world where different database implementations re-use the same Models for their schema definitions but have different underlying streamtypes representing the data for those database implementations, being able to sync one database impls data but not anothers would be valuable.

How do you even specify this as a user of the API? The API as written lets you subscribe to a range and I don’t think there’s even a way to specify a range that only includes a single StreamID if you don’t know the model and the controller. Today in Ceramic StreamIDs are the primary sync key - you can always sync a stream just by knowing its streamid. If we’re going to break that functionality we should be very careful and clear on what we are doing.

That seems fine to start with. I could also imagine putting some arbitrary upper bound on the number of nodes you contact, perhaps 20 since I believe that is the default in kademlia for their similar concept

Hmm, I don’t quite follow how this would work. In what way are you “synchronizing” interest ranges? Each node is going to have its own set of interest ranges that are set by the client, there’s nothing to synchronize with anyone else there.

I didn’t follow why peers can’t just share the number of overall events they know about within the interest range?

EDIT: oh wait, I think maybe I understand. If two nodes have a range they are trying to sync, they both have different hashes for that range, and both have a 1000 documents in that range, you don’t know if they have 999 events in common and each have 1 event the other doesn’t know about (so the total shared set size is 1001 events), or if they each have a totally different set of 1000 events (so the total shared set size is 2000 events).

Ooph, that’s a bummer. Of course, if you are a brand new node with no data, then just having the peer you are syncing from tell you how many events it has in total would be sufficient to have a decent sense of your progress. So maybe we can use a different way of reporting progress for a brand new initial sync vs a “catchup” sync after say a node was offline for a little while.

  1. The stream is in the recon key as the last 4 bytes of the CID of the Init Event. This will let you shard a controller across nodes but will not let you do it by streamtype as it is not in the recon key. We wanted to be able to generate the recon keys just from the events themselves. The events don’t know their streamtype that info is only in the StreamIDs not in the Events.

  2. If you have the StreamID the you have the CID of the Init event from that you can discover the sort_value. In theory you could create a subscription from the StreamID and a sort_key. Then use the Init Event to discover the sort_value. You would still need a way to indicate whether you wanted to subscribe to the whole (sort_value,*,*), the (sort_value, controller, *), or just the (sort_value, controller, stream_cid). Not knowing the Recon key in advance, you would be reduced to sending the want to any ceramic nodes that were interested in the sort_key with bitswap like performance.
    e.g. If I have just the StreamID and want to subscribe to the model that it is of, the node would need to just send the want to any node it was syncing with that was interested in any models.

  3. While I agree that there should be a limit (e.g. 20) to the number of nodes simultaneous connections. I am not sure that we win much by limiting the number of nodes we sync with. If there are rare models that only two nodes are interested in then they need to have each other as sync partners, for models that are very popular you are better off syncing with closer nodes or nodes with more bandwidth. As long as we priorities in some way how often we sync with each peer node we may not gain much by never syncing with many of them.

  4. Each node has some set of ranges that it wants to sync. This is basically the union of the configured interests and the interests from connected clients.

[{peerid, sort_key, start_value, stop_value, expiration}]

The set of these interest ranges is not just something that you can get by directly peering but probably makes sense to gossip the interest ranges themselves across the network. When your ceramic node and a foreign ceramic node have an overlapping interest then you should be calling them to sync periodically.
This intrest gossip can disseminate the existence of nodes with overlaping interests to your nodes as well as it would empower potential retreaval for streams that you node is not intrested in as it could reach out to the nodes that it knows are syncing those streams.

  1. yes exactly 1,000 âˆȘ 1,000 for two non-identical sets can be anywere from 1,001 to 2,000. If we did send the size of the range with the hash of that range we could take the sum of the two and give a the (number of local events) / (maximum unsynchronized events + number of local events) to get a rough ratio that at least monotonically approached 100% as the sync continued.