Protocol Features to enable custom aggregators

Context

The Ceramic core protocol is used to distribute and validate events in event streams, but the protocol layer has no idea what the data within those events are, and does not impose any rules on its structure. Currently almost all streams in Ceramic belong to either the Model or ModelInstanceDocument (MID) StreamType (Streamtypes | Ceramic Improvement Proposals), but the concept of StreamType is something that happens at a higher level than the core of the Ceramic protocol. We’ve increasingly been thinking of this as the “aggregation” layer, that exists above the protocol/event streaming layer but below the indexing and querying layer.

Both Models and MIDs use event payloads that contain jsonPatch objects describing transformations to json documents. MIDs also perform schema validation using the jsonSchema specified in their corresponding Model. We’d like to be able to generalize these ideas and enable a more diverse ecosystem of aggregators beyond the two that we have today. For example, one possible future aggregator might look similar to MIDs in that it also operates on JSON data and also uses Models for schema validation, but unlike MIDs instead of the event payloads being json-patch, they might be CRDT operations. This would enable more collaborative multi-writer documents. Another aggregator might use JSON documents but where the event payload is always the full current version of the document, rather than describing a transformation over previous state. Another possible aggregator might not use JSON documents at all, but might instead use something like Protocol Buffers.

Problem Statement

The problem is, if an aggregation library existed that could be configured with some number of different aggregators that it supports, this library would need a way to know how to dispatch incoming events it receives from the ceramic-one event feed and route those events to the proper aggregator. In other words, it needs to know that this event is for a ModelInstanceDocument and should go to the ModelInstanceDocument aggregator, while this other event is for a CRDTDocument and needs to go to its aggregator instead.
There are also interesting intersections between the different formats that data in event payloads can take and the way that event streams are partitioned, discovered, and synchronized at the core protocol layer. Currently this partitioning behavior is controlled via the separator key in event headers, which is currently always the “model” that the stream belongs to.

We don’t want the core protocol to have to know anything about the available aggregators, but we also want to make sure that it is possible to implement aggregators in a simple and straightforward way, and to avoid issues where two different aggregators cannot both co-exist in the same application without interfering with each other.

The goal of this post is to start a discussion to help us decide on what the set of core primitives are that the core protocol must expose in order to enable aggregators to do their job effectively.

Initial Proposal

Add a new field to the header of Ceramic init and data events, customMetadata, that contains an IPLD object. Users of the Ceramic protocol can write whatever they want into customMetadata.
Protocol implementations will not do anything with the data inside that object other than pass it back to the consumer. Any stream that is using the standard aggregation library and any of the generally-accepted aggregators (Model and ModelInstanceDocument to start), will reserve a streamType (or maybe aggregatorType or aggregatorId?) field in the customMetadata. The streamType field can contain an integer corresponding to an entry in the streamtypes table: CIPs/tables/streamtypes.csv at main · ceramicnetwork/CIPs · GitHub. The value of the streamType can be used by the aggregation library when applying events to look up the appropriate aggregator to send the event to. When registering an aggregator with the aggregation library, the aggregator must expose what streamtype it corresponds to.

There is still a question around how the registry of streamtypes/aggregators is maintained, and especially how to prevent conflicts like two aggregators both choosing the same streamtype id. For this I propose we follow a similar technique to how multicodecs works, where reserving a new streamtype id number is cheap and easy, but does need to be done in a centralized place to prevent conflicts: GitHub - multiformats/multicodec: Compact self-describing codecs. Save space by using predefined multicodec tables..

Alternate possibilities

In the proposal laid out above, streamtype/aggregator implementations define multiple different behaviors:
  1. What is the format of the data payload itself. For example: the payload is a json-patch diff to the existing JSON document state, the payload is the entire new JSON document, the payload is a CRDT operation describing a transformation to a JSON document, the payload is a binary file like an image, etc.
  2. The schema validation behavior. For example the ModelInstanceDocument aggregator today will load the corresponding Model by StreamID from the ModelInstanceDocument metadata, extract the jsonSchema from the Model content, and then validate the updated ModelInstanceDocument content against the Model’s jsonSchema
  3. Conflict resolution rules. For example with ModelInstanceDocument today if there is a fork in the stream’s history the branch with the earlier anchor timestamp wins. In the future a CRDT-based streamtype might have logic to merge both branches of history instead of keeping one and pruning the other.
  4. Possibly other aggregator-specific logic that we haven’t thought of yet. For example I could imagine a streamtype that has events containing notifications about changes to state on some blockchain, and maybe the streamtype logic would validate the data in the events against an RPC endpoint for the corresponding blockchain before considering the event valid, and would throw the event out if its data doesn’t match what can be observed on the blockchain.

All of these different behaviors could potentially be decoupled. For instance, instead of a single streamType field in the customMetadata for events, we could have separate payloadType, schemaValidationMode, and conflictResolutionMode fields instead. This might indeed be more flexible and powerful and is worth serious consideration. It does however increase the complexity a bit, and is further away from how things work today. For now I went with proposing a single streamType field in the customMetadata as it seems like the smallest and simplest change enable writing an aggregation library that can support being configured with different aggregators with differing logic, by giving that aggregation library just enough information to route events to aggregators. Then the aggregator implementations can have whatever logic they want and enforce whatever rules they want on the shape and structure of the event data they are aggregating.

2 Likes

I think this is a great idea.

I’ve been calling a similar need “verifiable transformations.” Aggregation is one use case for this. Your examples go beyond use cases and turn it into a full application platform, which I find very valuable.

Currently, in Index, all logic is done “on” Ceramic, resulting in fragmented distribution for protocol. If we could do this “in” Ceramic, we could offer more consistent experiences for developers. This is exactly the abstraction level I’d want to build protocols on.

For instance, imagine an embedding generator protocol in this layer. I deploy it and forget about it, and it’s instantly part of the ecosystem. Or consider an autonomous agent; again, deploy and forget, with event listeners, fancy.

I can create this by defining a verifiable pipeline over the data. For me, you handle the infrastructure below and the developer experience above, and I just focus on my logic without worrying about anything else.

I think there’s actually a way in which we can achieve the same goal as your proposal, but without any changes to the core protocol. We just need to slightly rethink the semantics around some core concepts.

Let model define stream type

My proposal is rather simple: we let the model field define which aggregator to use. More specifically, the model field has to encode a StreamID, and we redefine the stream-type inside of the model as the type for the stream which references it.

We would then have to refine some numbers in the stream types table:

  • 0x02 - indicates we have a MID
  • 0x04 - indicates we have a Model (we only use unloadable for Models atm)

Effectively this trurns the model field of a stream into the place that defines the metadata which aggregators would use to know how to dispatch incoming events.


Note that this might break how we think about streamId’s today as identifiers for ceramic event-logs. Instead it turns streamId’s into the thing which you put into the model field to tell your aggregator what to do. EventId’s already broke that mold though, so maybe this is fine.

@seref sounds like you’re thinking about more than just aggregations (e.g. materlializing events into JSON documents) but also thinking about more general purpose compute pipelines over event streams. I hadn’t been thinking about that use case too directly when writing this up, but yes I do think the same system for reserving a streamType for an aggregator could also be used to reserve a streamType other types of compute workloads and data processing.

I’m curious if you have any intuition on whether a single integer StreamType like in the main proposal vs breaking it up into its component pieces (data payload format, schema enforcement mechanism, conflict resolution strategy, etc) like in the alternative proposal at the end, would be a better fit for the use cases you are thinking about?

@jthor I think in most ways your proposal is generally isomorphic to the single streamtype in customMetadata proposal. Same core idea, just placed in a different field in the event header.

I think the main way in which it is meaningfully different, however, is that it now ties the data payload structure together with the way that data is partitioned and synchronized across the network, since model is currently the “separator key” used by recon for nodes to specify which event they want to synchronize.

I’m honestly conflicted on whether it makes sense or not to combine those two concepts or keep them separate. Combining them might keep things simpler, but perhaps at the cost of some flexibility and power in controlling how data is distributed across the network? TBH I’m not really sure how best to explore the possibility space here and increase confidence in one path or the other…

If we take the approach I suggested, does it prevent us from moving more towards your approach in the future? Seems like it might be a “no change now, with possible change in the future” kind of situation?

For example, there could be a “model type” that enables us to encode params in the InitEvent header that switches between different aggregators, i.e. a type that composes multiple aggregators.

1 Like

hmm. that’s an interesting idea. We’d still be somewhat limited though because we’d still be committed to “model” as the separator key, and only have whatever we can encode into the Model StreamID bytes to use for partitioning. I believe the model field is fixed-length in the Recon EventID key, so we have a limited number of bytes to work with, and by reserving the first one for specifying the type id of the “multiple aggregator dispatcher streamtype”, we have less remaining bytes to work with to actually control the sharding behavior.

Iirc we actually use hash( "model" | streamid.bytes ) in the EventId, so I don’t think we are limited in length here. Was trying to figure out if this is correct, but ooks like the spec is out of date right now.

I assume there will be hunderds of different aggregations functions:

If the main objective is to have custom aggregators to work in a verifiable way, tasks like schema and conflict resolution will stay pretty meta and shouldn’t be at the center of design. The first approach seems more relevant to me, and the streamType should be enough.

Still, to streamline these common aggregation functions, aggregation library could be designed as a boilerplate (similar to serverless), including required utilities for schema validation and conflict resolution (return types, accessors etc). This setup could offer more flexibility while keeping things safe.

By the way, It feels like this might be a bit overkill, but I’ll share it anyway. If this flexibility will be available to developers in the future, I would prefer the registry to be a stream as well. Something like an AggregationType model, for example. That could be the best layer for me to compose and collaborate.

The current implementation is last8bytes(hash("model" | streamid.bytes)) so we only have 8 bytes but that should be a few billion before collisions with the entropy from sha256.

1 Like

The problem is that since Ceramic doesn’t have global consensus there’s no real way to do exclusion around a scarce resource like the StreamType identifier. In other words there’d be no way to prevent multiple streams in the registry from all trying to register their own different aggregator to the same StreamType ID number. It’s the same reason something like ENS can’t really be easily built on Ceramic. For that you need strong consensus that can prevent conflicts. The registry could be on a blockchain like ethereum, but that feels like overkill when a simple CSV file in a known central location would suffice. I guess the advantage of doing it on chain would be it’s permissionless so no one entity could prevent the registration of new StreamType ID, which would theoretically be possible with Github as the source of truth. But perhaps that concern can be mitigated by having the github org be controlled by a decentralized foundation.

1 Like