Proposal for CAS Scaling

Co-authored with @StephH.

Motivation

The Ceramic Anchoring Service (CAS) is currently operated and funded by 3Box Labs. It is a centralized service that is both a single point of failure and a bottleneck for scaling the network.

There are already efforts underway to progressively decentralize anchoring in the Ceramic network to remove the community’s dependence on 3Box Labs infrastructure. This, however, is a task that will require several months of discussion, planning, coordination, implementation, and testing.

In order to address our immediate network scaling needs for upcoming partner engagements, this proposal covers a few possible options for scaling the CAS as it works today. This will allow us to scale the network enough to tide us over till full anchoring decentralization is achieved.

Background

The CAS API is a stateless service that ingests anchor requests from all Ceramic nodes on the network, stores them in a AWS RDS database, and posts them to an SQS queue.

The CAS Scheduler reads from the SQS queue, validates and batches the anchor requests, then posts these batches to a separate SQS queue.

A CAS Anchor Worker reads an anchor batch from SQS, create a corresponding Merkle tree, and anchors it to the blockchain. Thereafter, the Worker creates a Merkle CAR file that contains the entire Merkle tree, along with the associated proof and the anchor commits corresponding to anchor requests included in the tree.

Multiple Anchor Workers can run at the same time without conflicting with each other. The only resource that simultaneously running Workers will contend over is the ETH transaction mutex.

The CAS API is also responsible for servicing anchor status requests from Ceramic nodes across the network. It does this by loading Merkle CAR files corresponding to anchored requests and returning a Witness CAR file that includes the anchor proof, the Merkle root, and the intermediate Merkle tree nodes corresponding to anchored commit.

Scaling the API

The CAS API can service a lot more requests than it can handle today merely by running more instances of the API. Since it is a stateless service, scaling it horizontally increases the number of requests that it can handle at any time.

An important consideration here is that an increase in the number of anchor requests that the API handles will likely correspond to an increase in the number of anchor status requests as well. This, in turn, means that the API will need to load Merkle CAR files a lot more frequently.

Another related consideration is that increasing the depth of the Merkle tree (discussed below), will result in significantly larger Merkle CAR files.

Even though we do currently have a mechanism for caching recent Merkle CAR files, additional API instances and/or larger Merkle CAR files will likely have an impact on latency and S3 bandwidth utilization.

The last consideration here is scaling the RDS database in which the CAS API stores anchor requests. This database will need to be scaled to handle the increased number of anchor requests, but this is as simple as a configuration change.

CAS Auth

CAS Auth via the AWS API Gateway, while useful at a small scale for providing our community self-service access to the CAS, has not been able to keep up with the demands of our larger partners. As a result, we have had to repeatedly bypass CAS Auth and provide direct access to the CAS API.

This mechanism can remain in place for the community CAS, but we should consider dedicated CAS clusters for larger partners that do not use CAS Auth. This would also reduce the risk of a single partner’s traffic affecting the CAS API’s ability to service other partners, especially now that we have partners with significant traffic.

Scaling Request Processing

Number of Anchor Workers

The number of Anchor Workers can be increased to process more anchor requests in parallel. This will allow the CAS to process more anchor requests in a given time period.

The main bottleneck here is the ETH transaction mutex. This is a global mutex that prevents multiple Anchor Workers from sending transactions to the Ethereum blockchain at the same time. This is necessary to prevent nonce reuse, which could invalidate some transactions.

The other bottleneck is IPFS. Anchor Workers store intermediate Merkle tree nodes, anchor proofs, and anchor commits to IPFS. They also publish anchor commits to PubSub.

Increase Depth of Merkle Tree

Increasing the depth of the Merkle tree will allow the CAS to include more anchor requests in each batch. This mean that with the same number of Anchor Workers, and without increasing contention for the ETH transaction mutex, the CAS can process more anchor requests in a given time period.

An important consideration here is the size of the Merkle CAR files. Larger Merkle CAR files will require more bandwidth to load and store in S3. Individual Witness CAR files will also be larger, including additional intermediate Merkle tree nodes to represent the deeper Merkle tree. This will require more bandwidth to transfer to the requesting Ceramic nodes.

Using deeper Merkle trees (e.g. 2^20) would also mean storing and publishing an exponentially larger number of anchor commits to IPFS. This is untenable and would likely require us to eliminate the use of IPFS for storing/publishing anchor commits.

Additionally, deeper Merkle trees will result in longer anchor commit validation times on Ceramic nodes. This is something we would need to characterize through testing.

Scaling IPFS

IPFS has been an unreliable component of the CAS. It has been a source of significant operational issues, and has been a bottleneck for scaling the CAS.

The CAS currently stores intermediate Merkle tree nodes, anchor proofs, and anchor commits in IPFS. It also publishes anchor commits to IPFS PubSub.

It would be beneficial to eliminate IPFS from the anchoring process entirely, instead relying on alternative mechanisms for providing Ceramic nodes with anchor-related information.

This is a significant change that would require a lot of coordination and testing. It also assumes that Ceramic’s anchor polling mechanism is rock-solid, and can become the sole provider of anchor information to Ceramic nodes via the CAS API.

Scaling the Scheduler

At a high-level, the CAS Scheduler is already setup for processing a very large number of requests per second.

The only consideration there is how its batching mechanism works w.r.t. SQS. The batching process holds incoming anchor requests “in-flight” until either a batch is full or the batch linger timeout has run out. This means that there can be up to a full batch’s worth of requests “in-flight” at any time.

SQS has a limit (120K) on the maximum number of messages that can be “in-flight” at any point in time. This means that for a deeper Merkle tree we must either:

  • Update the algorithm to use a different mechanism for batching.
  • Limit the size of the batch (and thus of the Merkle tree) to a number less than 120K.
  • Add an extra layer of batching, i.e. for a tree depth of 2^20, the first layer batches a maximum of 2^16 requests at a time, then a second layer groups 16 such batches into a larger 2^20 sized batch.

Recommendation

The following list is the order of operations we would recommend (assuming the desired Merkle tree depth is 2^20).

Some of these steps can happen in parallel, and some are optional or can be done later.

  • Setup a new CAS Cluster with 9 API instances.
  • Remove IPFS.
  • (Optional) Increase number of workers to 4.
  • Increase Merkle tree depth to 2^16.
  1. Update CAS Scheduler to support batches up to 2^20.

  2. Increase Merkle tree depth to 2^20

3 Likes

Thanks for the writeup @mohsin and @StephH!

This is a significant change that would require a lot of coordination and testing. It also assumes that Ceramic’s anchor polling mechanism is rock-solid, and can become the sole provider of anchor information to Ceramic nodes via the CAS API.

Wouldn’t we need to test this soon regardless in the move to rust-ceramic, since js-ceramic nodes relying on our rust code wouldn’t talk to the CAS IPFS node anyway? I’m assuming that we would completely rely on anchor polling from that point?

Increase Merkle tree depth to 2^16.

What is it now?

As for the other recommendations, how far do we expect to get if we start with (1) and (2)?

1 Like

You’re right! We discussed how we shouldn’t include IPFS in any new clusters but not that we can’t (because it won’t work) with Recon.

It’s worth noting explicitly that there aren’t any remaining functional dependencies on IPFS in CAS. CAS IPFS is now mostly a backup source for anchoring information.

Pre-Recon, it’s the backup source of anchor commits to non-requester nodes (i.e. those that are loading streams they haven’t authored and thus aren’t polling for), even if the requesting node is inaccessible.

Post-Recon, given that models will likely constantly be synchronized, it is far more likely for anchor information to be available elsewhere even if the original requester goes offline.

The tree depth is currently 2^10 (1024). Because this is fairly small, we’ve recently had to anchor as fast as possible (every 5-8 minutes). We’ve kept up with the increased load so far but won’t be able to for long if it keeps increasing, not to mention that it will keep getting more expensive.

I think we’ll get quite far with handling load starting with (1) and (2), maybe even far enough that we can stop there till anchoring is fully decentralized. The more significant concern when we start reaching the new system’s limits will then be cost of anchoring.

@AaronDGoldman did some back of the napkin math and came up with ~$2.5 per anchor. Anchoring once per hour for a year comes out to ~$20,000, which seems sustainable. At 2^16 tree depth, this comes out to ~1.6M unique anchored streams a day. This is already an order of magnitude higher than we are today, and might be sufficient to tide us over till decentralization.

Also, we don’t have to use powers of 2 for tree depth. We could stay on SQS’s good side with trees of sizes < 120K, which is ~300M writes per day and three orders of magnitude over where we are today. If we set up dedicated clusters for larger partners, each cluster will work on its own set of SQS queues, so we could do ~300M per cluster. Multiple clusters does, however, mean separate wallets and cost, so we’ll have to balance that with anchor frequency and tree depth across clusters.

1 Like

We have a dashboard for historical cost of CAS here:
https://dune.com/jthor/ceramic-anchor-service-mainnet

1 Like

Ah nice!

If we do not distribute CAS, then CAS will still be responsible for creating one Time Event for every Data Event on the network. This means total network throughput will be limited to CAS throughput, since CAS will have to ingest, anchor, and provide Time Events for every Data Event.

The following proposal attempts to break the symmetry between CAS and the rest of the network by offloading some work from CAS to Ceramic nodes.

Semi-Distributed Anchoring

This proposal takes advantage of the fact that CAS anchors trees.

If each node collects all of its events for a period of time, it can build its own Merkle tree for all of the Data Events that it wishes to anchor. It can then send an anchor request to the CAS for only the root of this tree.

The CAS, similarly, collects one tree root per node per anchor period and includes them in its own Merkle tree. It then anchors the root of this parent Merkle tree to the blockchain.

Once the tree is anchored, the CAS responds to each node with the proof and the path of the node’s subtree within the parent tree.

Since each node already maintains its own list of Data Events pending anchoring, it can take the proof and the node’s path through the parent tree to then create and distribute the corresponding Time Events.

The path in a Time Event will be the concatenation of the path of the node’s subtree in the parent tree and the path of the Data Event within the node’s subtree.

This approach reduces the work on the CAS from the order of the number of Data Events on the network to the order of the number of Ceramic nodes on the network.

Each node need only submit one request per anchor period, and receive one response per anchor period. Moreover, each node is now responsible for the creation and dissemination of Time Events for Data Events it originated.

Intermediate Steps

Anchor Proof Polling

Today, Ceramic nodes poll the CAS for every stream that is pending an Anchor. But if no new anchors have occurred, then none of these streams have been anchored and will all receive the same answer.

If, instead, Ceramic nodes polled for the most recent anchor proof, they could limit themselves to one request per polling period and not one request per stream in a loop across the anchor request store, as we do today.

Ceramic nodes can poll for the Time Events of individual streams only after a new anchor has occurred. In the meantime, it can poll at a much lower frequency for the latest anchor proof.

Note that at this stage, CAS is still responding to Ceramic nodes with Time Events versus just Merkle tree nodes.

Give Out Sub-Trees

Our CAS makes no promises about the order of events in the tree. If it sorts all of the Data Events by the node making the requests, then when that node polls, the CAS can respond with the proof, the path from the proof to the node’s part of the tree, and the node’s subtree.

At this stage, the CAS’s response does not include the actual Time Events, just the Merkle tree leaves and paths that are relevant to the requesting node. The node then walks the tree, creates and distributes the Time Events, and removes the Data Events from the anchor request store.

After the tree is processed, whatever is remaining in the node’s anchor request store can be polled or resubmitted to the CAS.

1 Like

We could use a different message bus/queueing mechanism, like Kinesis.

Yes, true, though I’m hoping we won’t need to. I think 300M anchors per day per cluster should cover our needs for a while, at least until we can fully decentralize.

And if we start implementing some of the enhancements from @AaronDGoldman’s post, we’ll have all the room we need.

With step 1 and 2, how many events per hour do you think we can push CAS to?

Capturing notes on this from Discord.

We’ve been able to achieve ~15K per hour without any tweaks to the current config, and we might be able to do more than that.

I think so. We should benchmark this on Dev CAS.

There are a few considerations here:

  • More than for perf, a dedicated cluster protects a partner from “noisy neighbors” and one big partner can’t take down CAS and, with it, all of another partner’s anchors. I’d say that we should think carefully about which partners are critical and their SLAs, and then consider if we want them on dedicated clusters. Will be a bit more operational overhead but we have most things in code, so it shouldn’t be terribly difficult.
  • It is remarkably difficult to benchmark CAS because Ceramic code is a terrible load generation tool. Steph, Ben and I collaborated on adding CAS load generation to CPK (Ceramic Pocket Knife) Rust code, which was promising, so we’ll need to put some work in to move that code to Keramik as a new scenario.
  • I think 1M anchors / hour is achievable but I would be very worried about the other impacts re: Aaron’s post above about distributing 1M anchors per hour. The n/w effects might be manageable if we stop writing to IPFS/PubSub but we’ll need to quantify the bandwidth/S3 usage, and the load on the API instances of generating that many witness CAR files. I think this is a significant risk unless we tweak how Ceramic learns about anchors in the Recon world and how they’re disseminated in the n/w.

All that to say that my answer to your question is a qualified yes, and we should spend some time setting up the right benchmarking infra and collecting results to really quantify things.

1 Like

One additional option we can consider is to apply the concept of subtrees to the existing CAS implementation without actually implementing it fully as proposed above.

We could have Ceramic continue to send requests for individual stream CIDs but only poll the CAS every few minutes for all its pending requests, whether by sending CAS the Ceramic node’s last known anchor proof, or sending a batch query with all the Ceramic node’s pending CIDs. Once CAS anchors these requests, it pulls out witness CAR information for only that node’s requests (filtered by node DID or using CIDs from the batch request) and sends them over.

This CAR file could include just the proof and path information so that the Ceramic nodes can create, apply, and publish the anchor commits themselves.

This approach will simplify the anchor polling mechanism and reduce the load on the CAS for responding to status requests and building/returning witness CAR files for individual CIDs.

Yikes, not sure how I got ~300M writes per day… Using ~120K batch sizes gets us to ~3M unique writes per day if we anchor once per hour.

An update on the current approach.

There are two main considerations here:

  • SQS message sizes are limited to 256K. So, even if we tried, we wouldn’t be able to send more than a few thousand CIDs in the message payload, which is how the Scheduler currently sends batches to Anchor Workers.
  • A single SQS queue can only have a maximum of 120,000 messages in-flight. Since we hold anchor requests in-flight as they’re being accumulated into a batch, we need to work around this limit for batch sizes larger than 120,000.

In order to solve these issues in the current prototype:

  • The Scheduler creates multiple queues for holding validated anchor requests instead of a single queue. Incoming requests are distributed to this “multi-queue” in a round-robin fashion. With 10 queues and a batch size of 2^20, each queue will only have at most ~105000 messages in-flight at any time, which avoids running into the SQS maximum in-flight message limit.
  • The Scheduler’s batching service accumulates requests from across the entire multi-queue, then creates an S3 file using the batch ID and containing all the batch’s anchor request IDs. It then sends just the batch ID in its SQS message, which avoids running into the SQS message size limit.
    • Workers load the S3 file using the batch ID from the SQS message and then treat the parsed JSON payload as the batch they need to process, just as they would have for an SQS message containing both the batch ID and the request IDs.