Proposal for well defined Streams in Ceramic

Proposal for well defined Streams in Ceramic

Motivation

We want Ceramic streams to be a well defined primitive of the Ceramic protocol.
Additionally we want to be able to use streams for various kinds of use cases.
These use cases can vary wildly in how they expect streams to behave.
We need to design Ceramic in such a way it can accommodate this variability.

We have identified that different kinds of streams can vary in how they aggregate events, however I believe there are two other important ways in which streams types can vary.
First in how they preserve old events in a stream, its reasonable that a stream could delete/forget old events. Second, how streams are grouped together i.e. model, controller or some other application specific way.

Currently streams represent a single use case of Models and Model Instance Documents with specific conflict resolution and aggregation rules.
Additionally it is baked into the protocol that all events for a stream must be preserved.

However if we want to maximize the utility of Ceramic we need to enable other kinds of streams.
A few possible stream kinds are:

There are potentially many others.
The design of Ceramic should be that stream kinds are opaque and therefore can accommodate any of these streams types.
This means we need to be careful about what requirements we place on a stream.

For example today a stream’s init event must always be preserved so we can track information about the stream like its controller.
However if we must always preserve an init event then we are potentially storing and replicating data within that event that is no longer useful.

Additionally its not known to Ceramic which grouping of streams are useful.
For example grouping MIDs by model is useful, but other groupings are useful for other types of streams.
We need a mechanism that allows the stream type to dictate grouping of streams.

In summary we need mechanisms for different kinds of streams to vary in these ways:

  • how streams are aggregated (this includes conflict resolution),
  • how streams are preserved,
  • how streams are grouped.

Each of these questions has a potentially unbounded number of different behaviors.
Therefore we should design a system that allows for describing these behaviors separate from the stream type, otherwise we will have an ever growing stream type enumeration and specialized code paths for each.
The stream type enumeration must remain opaque to Ceramic.

The Proposal

Streams are aggregated according to the stream type enumeration.
This is basically what was concluded in Protocol Features to enable custom aggregators.

Streams are grouped along a set dimensions provided in its init event and no payload data can exist in init events.
This decouples the existence and creation of streams from the data within streams.
This enables streams to have different preservation modes as no data for the stream lives within the init events.

Stream data is preserved according to an enumeration of preservation modes.
I propose we implement only a single mode initially (i.e. always preserve) but can add additional modes as needed.
For example a last write wins mode can forget all previous events.

This leads us to a precise definition for a stream.
A stream is:

  • controlled by authenticated users,
  • identified by its unique values for each grouping dimension,
  • a partial order of a set of events.

Using this definition of a stream we can build various different aggregation systems on top of the Ceramic protocol that can vary in how data in aggregated, grouped and preserved.

The Specification

What follows is a detailed specification of how these mechanism will work.
What is important here is that I am precise in what each of these mechanisms is and that it correctly enables the variability we need.
Isomorphic mechanisms can be discussed later.

Event Types

Simplified types for an Event in this proposed protocol.

/// Top level event type
enum Event {
    Time(TimeEvent), // No changes here
    Signed(SignedEvent),
    Unsigned(InitPayload),
}
/// A signed event, no changes here
enum SignedEvent {
    Init(InitPayload),
    Data(DataPayload),
}
/// An init payload
/// Note there is no payload in an init event.
/// Note sep, model, unique, shouldIndex etc are absent.
/// With this proposal the expectation is that those fields are specific to how
/// MIDs function and therefore would be part of the dimensions.
/// More on this in a bit.
struct InitPayload {
    stream_type: StreamType, // Opaque stream type
    preservation_mode: PreservationMode,
    controllers: Vec<String>,
    /// An ordered set of key value pairs defining the uniqueness of a stream.
    dimensions: Vec<(String, Bytes)>,
}
/// A data event belongs to a stream defined by the `id` in a partial order defined by its `prev`.
struct DataPayload {
    id: Cid,        // CID of init Event
    prev: Vec<Cid>, // multi-prev, may even be empty if no order is needed
    data: Ipld,     // Arbitrary payload
}

/// Stream type enumeration, however the variants are opaque to Ceramic.
type StreamType = u64;

/// How events are preserved
/// I propose we only build the Always variant to start but can add more as needed.
/// A few hypothetical examples are given.
enum PreservationMode {
    Always,
    Last, // Keep only the most recent event per stream, TODO how do we determine most recent? Can the rules for _recent_ vary?
    Expiring(Duration), // Keep events until they are older than the duration
}

There are simplified only in that they are not backwards compatible.
In order to make them backwards compatible we need to define a mapping from old structures to new ones, which is possible see below.
There are various techniques we can use but the key is we can distinguish between old and new based on the structure (i.e type field or missing data field etc) and we can map old to new.
This way only the parsing code needs to be aware of the old way of doing things and the rest of the systems can operate using these new types.
This will likely be similar to the RawEvent → Event mapping that already exists.

Stream Type

The stream type is an opaque enumeration represented as an integer.
Ceramic need only preserve the stream type information and expose it to consumers of the API.

However it may be beneficial for aggregation layers to define a few aspects associated with each stream type:

  • The aggregation method used
  • The schema of data that may exist in the payload of a data event

Ceramic itself will not use this information nor validate it, but it is valuable to aggregators.

Dimensions

It will be helpful to expand on this concept of dimensions.
A stream is uniquely identified by the values of each of its dimensions.
Ceramic can leverage these dimensions in two important ways to expose behavior to applications.

  • Applications can describe interests as a predicate over the dimensions.
  • Applications can subscribe to a feed of events over a predicate of dimensions.

This gives applications control over how can be shared/synchronized across the network and how aggregation layers can consume the data.

The concept of dimensions replaces the concept of sep and model from ComposeDB.
ComposeDB could be implemented using the following dimensions:

  • model - the stream id of the model
  • should_index - boolean value
  • unique - arbitrary bytes
  • context - arbitrary bytes

However this is not needed as this proposal is backwards compatible.

Predicates

Predicates over dimensions can be defined with the following operations:

  • Byte equality accounting for variable length byte sequences
  • Byte comparison greater than or less than
  • Any logical operation over boolean values

For example model >= xxx and model < yyy where model is one of the dimension keys.

Feed API

Somewhat tangential to this proposal is the idea that we can have two kinds of feed APIs:

  • Raw Feed
  • Conclusion Feed

The raw feed is what we have today, in that it returns the events directly (in their car files) without any extra metadata.

The conclusion feed differs in that it returns the events with various conclusions about them.
Specifically payloads from data events can be returned from the conclusion feed with a copy of its dimensions.
Other conclusions could be before and after markers that bound the time in which the data event as created.

Also note it is this conclusion feed that allows for seamless backwards compatibility because there is a natural transformation of the raw events into the conclusion events.
Therefore we can map both old and new events into the same conclusion events.

A possible conclusion event could be as follows.


/// Top level event type
enum ConclusionEvent {
    Init(ConclusionInit), // better names needed
    Data(ConclusionData),
    Time(TimeEvent),

}
struct ConclusionInit {
    stream_type: StreamType, // Opaque stream type
    preservation_mode: PreservationMode,
    controllers: Vec<String>,
    /// An ordered set of key value pairs defining the uniqueness of a stream.
    dimensions: Vec<(String, Bytes)>,
}
struct ConclusionData {
    init: ConclusionInit,   // Copy of the init dimensions etc
    before: CeramicTime,
    after: CeramicTime,
    data: Ipld,                        // Arbitrary payload
}

Also note that existing init events that do contain data can be mapped into both a ConclusionInit and ConclusionData events sequentially.

Summary

It seems possible to build Ceramic one according to this proposal in a backwards compatible way that enables generic stream types not only for different aggregation methods but also accounting for variation in how streams are grouped and how stream data is preserved over time.

Looking for feedback on this general design, specifically on if there are other ways in which stream types can vary and if I have addressed well enough the ways in which we have already anticipated.

4 Likes

We may want to restrict StreamType to u53 so it is the safe integers in JSON.
or to be explicit that in JSON it needs to be in string hex form “0x123”

Possible JWS encoding of an interest.

urlsafe_b64encode(

{
 "alg": "ES256",
 "typ":  "JWT",
 "kid":"did:key:z6MkhgYVbqLEy518e29dK7dempX2YFJMNJQi1wKr6gyRVMVc"
}

).urlsafe_b64encode(

{
 "time_geq": 1721320000000,
 "time_lt": 1721320999999,
 "author_eq": "did:key:z6MkhaXgBZDvotDkL5257faiztiGiC2QtKLGpbnnEGta2doK",
 "bucket_eq": "kjzl6fddub9hxf2q312a5qjt9ra3oyzb7lthsrtwhne0wu54iuvj852bw9wxfvs",
 "node": "12D3KooWD3eckifWpRn9wQpMG9R9hX3sD158z7EqHWmweQAJU5SA",
 "not_after": 1721999999999
 "capabilities": [CACAOs]
}.

).urlsafe_b64encode(signature_by_kid)

the ceramic node ID can be written as a libp2p peerID
12D3KooWD3eckifWpRn9wQpMG9R9hX3sD158z7EqHWmweQAJU5SA
or as a CID format libp2p peerID
bafzaajaiaejcal72gwuz2or47oyxxn6b3rkwdmmkrxgkjxzy3rqt5kczyn7lcm3l
or as a Ed25519 compressed public key
Ed25519:2ffa35a99d3a3cfbb17bb7c1dc5561b18a8dcca4df38dc613ea859c37eb1336b
or as a did: key
did:key:z6MkhgYVbqLEy518e29dK7dempX2YFJMNJQi1wKr6gyRVMVc

for the JWS the did:key format is probably best

op
lt less then
leq less than or equal to
gt grater than
geq grater than or equal to
eq equal to
neq not equal to

in this example the CACAO would need to be a delegation from kjzl6fddub9hxf2q312a5qjt9ra3oyzb7lthsrtwhne0wu54iuvj852bw9wxfvs to did:key:z6MkhgYVbqLEy518e29dK7dempX2YFJMNJQi1wKr6gyRVMVc granting the rite to read the bucket. the set of keys that could sign such a CACAO is in the stream kjzl6fddub9hxf2q312a5qjt9ra3oyzb7lthsrtwhne0wu54iuvj852bw9wxfvs that is the name of the bucket.

When a controller wants to publish a stream to a bucket, they just put the bucket’s StreamID in the bucket header of the init event.
Controller DID → bucket’s StreamID

When a controller wants to use a ceramic node to subscribe to a bucket, they would need to get one of the keys in kjzl6fddub9hxf2q312a5qjt9ra3oyzb7lthsrtwhne0wu54iuvj852bw9wxfvs to make a CACAO to delogate to their DID and then make a CACAO to delegate from their DID to the ceramic node.
The ceramic node could then use these 2 CACAOs in its interest to subscribe to the bucket.

Bucket StreamID → bucket keys list → Controler DID → NodeID DID.

What happens if an init event has extra or missing dimensions for its stream Type?

Not a fan of trying to encode a DSL into json, especially by combining strings like time_geq. Would much rather just have "predicates":"some actual dsl". The DSL could be raw or compressed.

I think the obvious DSL is a subset of the sql where clause. This gives us readability as SQL is well known and the ability to add more query types by just using a bigger subset of SQL where clause syntax.

2 Likes

Not sure I follow what the “Conclusion Feed” does. I could interpret a few different things:

  1. Only returns last event(s)?
  2. Returns an aggregated state?
  3. Returns each event alongside extra metadata?

If I understand correctly it’s (3) you are suggesting? Essentially augmenting each event in the feed with extra metadata? In this case maybe it coulc be called “Augmented Feed”?

Dimensions are currently defined as dimensions: Vec<(String, Bytes)>,. What is the reasoning behind using Bytes for the value? What use cases do we see for Bytes? Do we see any issue creating these on the user side, passing over http? Do we have concerns about readability when looking at dimensions?

Re the DSL:

I agree we want an actual DSL not an AST of the DSL. Its easy to build a grammar and a portable parser and much easier for humans to read understand.

The scope of the DSL will be to describe predicates over events.

I agree it should approximate SQL. Something we should be aware of is that namespacing quickly becomes a problem when specifying predicates over data. Today we know we want predicates over the dimensions, tomorrow we may discover we want predicates over conclusions of the data events. What happens if there is already a dimension named after a conclusion? There are a few different techniques we can follow here but first we need to clearly define the context under which these predicates will operate.

Re the Conclusion Feed

Yes its an augmented feed, but augmented with what? Conclusions. So a more descriptive name would be the augmented conclusion feed.

If you look closely at these structures they do not have the same structure as events. Meaning the data returned from the conclusion feed is transformed in someway from the original events. (If you want the original event use the raw feed). Specifically the ConclusionData.data field is the only field that comes directly from the event. The other fields like ConclusionData.init is not a field on the original data event but instead a derived conclusion about the event. Also notice that the ConclusionInit struct denormalizes the metadata from the stream, meaning it duplicates it for each data event. This way consumers of the feed do not need to maintain a large cache of all stream metadata. (As an aside there are ways to encode the data over the wire that do not actually duplicate the data but only logically duplicate it).

/// Top level event type
enum ConclusionEvent {
    Init(ConclusionInit), // better names needed
    Data(ConclusionData),
    Time(TimeEvent),

}
struct ConclusionInit {
    stream_type: StreamType, // Opaque stream type
    preservation_mode: PreservationMode,
    controllers: Vec<String>,
    /// An ordered set of key value pairs defining the uniqueness of a stream.
    dimensions: Vec<(String, Bytes)>,
}
struct ConclusionData {
    init: ConclusionInit,   // Copy of the init dimensions etc
    before: CeramicTime,
    after: CeramicTime,
    data: Ipld,                        // Arbitrary payload
}

Exact details of the conclusion feed are up for design. However the general concept of such a feed is what I am proposing. The benefits being that Ceramic nodes have the ability and often have already had to do the work to compute a conclusion (i.e. validate the event) so therefore we should not make consumers of Ceramic duplicate the work to compute those conclusions.

We want bytes as restricting data to utf8 only is quite limiting. Similar to discussions we had previously about adding a context to event ids allowing arbitrary bytes allows applications to directly control how the streams sort. Since the purpose of dimensions is to be able to query against them the sort order of a dimension controls which data is local to other data thus affecting performance of the predicates.

As for describing the value of dimensions in any textual context (i.e HTTP) we can leverage multibase encodings. Since keys are always strings a format of key=multibase_value could work well.

In fact I think the DSL we build should have some basic concept of types and differentiate between string and byte types. And with a literal syntax for specifying bytes in multibase as needed.

Open to other ideas to make this natural and comfortable to users.

I think we will likely need to handle this case explicitly in the predicates. Likely in the form of operators that can assert the existence of the data from its equality. A common way (see sql) to do this is with ternary logic of true, false, and null, where null indicates a missing/unknown value.

Just want to state publicly for the record that I’m very happy and excited about this proposal. It puts forward a concrete solution to a lot of the problems I’ve been musing on for a while. I have a few small nitpicks, mostly around naming, but overall I think directionally it’s very on target for where we want to be headed.