search
Search
Login
Unlock 100+ guides
menu
menu
web
search toc
close
Comments
Log in or sign up
Cancel
Post
account_circle
Profile
exit_to_app
Sign out
What does this mean?
Why is this true?
Give me some examples!
search
keyboard_voice
close
Searching Tips
Search for a recipe:
"Creating a table in MySQL"
Search for an API documentation: "@append"
Search for code: "!dataframe"
Apply a tag filter: "#python"
Useful Shortcuts
/ to open search panel
Esc to close search panel
to navigate between search results
d to clear all current filters
Enter to expand content preview
icon_star
Doc Search
icon_star
Code Search Beta
SORRY NOTHING FOUND!
mic
Start speaking...
Voice search is only supported in Safari and Chrome.
Navigate to
chevron_leftSoftware Development
Git2 topics
Linux1 topic
Google Colaboratory1 topic
Dagster11 topics
Guide on virtual environments
check_circle
Mark as learned
thumb_up
0
thumb_down
0
chat_bubble_outline
0
Comment
auto_stories Bi-column layout
settings

Introduction to Milvus

schedule Aug 17, 2023
Last updated
local_offer
Tags
mode_heat
Master the mathematics behind data science with 100+ top-tier guides
Start your free 7-days trial now!

Data model

The data model of Milvus is as follows:

collection > shard > partition > segment

Here's a diagram showing the hierarchy:

Collection

A collection is the biggest unit of data in Milvus. We can think of a collection as a table in a relational database - for instance, we might create a collection called products to store data about products.

Shards

A collection is split into multiple shards. By default, a collection is split into 2 shards. The main idea behind shards is to make insertion operations faster. By having multiple shards, Milvus can perform multiple writing operations in parallel.

NOTE

Milvus hashes the primary key to decide which shard to write to.

Partitions

Milvus separates data into partitions, in which every partition has its own physical storage. We can partition our data according to some custom business rule - for instance, we can partition by customer gender, movie genre or book publish date. Partitioning is a key feature that makes Milvus performant. For instance, suppose we wanted to query for similar customers that are male. Instead of querying over the entire data, Milvus can do less work by querying over the partition that holds male data.

When using the Milvus SDK, we can specify which partition to insert and query like so:

  • when inserting data using the insert(-) function, we can specify the partition_name argument, which is the partition in which to insert the data.

  • when querying for data using the query(-) function, we can specify the partition_names argument, which are the partitions in which to search for the data.

By default, when we create a collection, a single partition called _default is created. If we do not specify any partition-related information when inserting or querying data, we will be interacting with this _default partition.

NOTE

Shards are for making write operations more efficient, while partitions are for making read operations more efficient.

Segments

Partitions can further be broken down into segments. There are two types of segments: growing segments and sealed segments.

Growing segment

As the name suggests, a growing segment is a newly created segment in which new data can be inserted. Any segment can be divided into three parts: used, allocated and free - this is illustrated below:

Note the following:

  • the used space holds data that has already been inserted.

  • the allocated space is reserved for upcoming data insertions.

  • the free space is essentially the remaining space available in the segment.

If no data is inserted after some time, the allocated space expires and becomes free space.

Sealed segment

As we keep inserting data, the used part of the growing segment expands. When a segment reaches the default upper limit of 512MB (as specified by the maxSize config) or if the user manually calls the flush() method on the collection, the growing segment is locked and becomes what is known as a sealed segment:

As the named suggested, the sealed segment no longer receives new data. Once the allocated part expires, the sealed segment will be written to disk in the object storage for data persistence. Since no new data can be added to the sealed segments, subsequent data insertions will result in generating a new growing segment.

Query operations are conducted on segments. If a segment has not been indexed yet, then Milvus has to scan through the entire segment in a brute-force manner, which drastically increases query time. Milvus will automatically index segment based on the minSegmentSizeToEnableIndex configuration parameter, which has a default value of 1024 (rows). This means that segments containing less than 1024 rows will not be indexed, that is, it will be searched via a brute-force scan.

NOTE

If our data is static, that is, we don't frequently insert into our database, then it's a good idea to call the flush() method to turn all growing segments into sealed/flushed segments. This will help speed up our queries.

NOTE

We should not call the flush() method too frequently since many small segments will be generated, which will increase query time.

NOTE

Milvus will also flush segments automatically based on the following configuration parameters:

  • maxIdleTime - if a segment does not receive any data manipulation requests such as data insertion within maxIdleTime, then a growing segment will automatically be sealed. By default, maxIdleTime=600 seconds.

The folder structure of our inserted data in the object storage looks like so:

/insert_log/{collection_id}/{partition_id}/{segment_id}/{field_id}/{log_id}

For instance, suppose our collection had a schema of 2 fields (e.g. user_id and user_age). The file structure at the segment-level would look like:

segment_1
├── user_id
├── log_1
└── log_2
└── user_age
├── log_1
└── log_2

When data is shipped to the log broker, it is serialized from different data types (e.g. float, string, JSON) into bytearray format before writing into log files. This is why if we inspect the log files in the object storage (e.g. MinIO), they are not in human-readable format. When the data is queried and sent to the user, it is deserialized into the original human-readable format.

As we can see, the data within a segment is persisted in multiple logs under the hood. In our case, the segment segment_1 consists of 4 binlog files. In our config file, we can specify the maxBinlogFileNumber - once the number of binlog files reaches this number, Milvus will automatically seal the segment.

Binlogs

Binlogs, which stand for binary logs, record all updates (e.g. insertion and deletion) made to the data within the Milvus database. A single binlog file contains multiple events. An event has two parts:

  • event header: contains meta information such as the creation time.

  • event data: contains additional meta information such as the starting timestamp of the event and more importantly, the actual inserted data.

Let's take a look at the structure of event headers:

Type

Value

Bytes

Description

Event header

Timestamp

0:8

Timestamp of creation

Event header

TypeCode

8:1

Event type code

Event header

ServerID

9:4

The ID of the write node

Event header

EventLength

13:4

Length of event, including header and data

Event header

NextPosition

17:4

Offset of next event from the start of the file

The structure of the event data depends on the event type. For instance, for INSERT_EVENT, the event data is structured like so:

Type

Part

Byte

Description

Event data

Fixed

x:8

min (or starting) timestamp of event

Event data

Fixed

x+8:8

max (or ending) timestamp of event

Event data

Fixed

x+16:y

reserved for expanding the fixed part

Event data

Variable

x+16+y:

data in parquet (columnar) format

Here, the variable part holds the actual inserted data in parquet format.

Data compaction

Compaction is the process of merging small segments into larger segments in order to save disk space. Since compaction is a time-consuming task, it is done in the background. Under the hood, compaction is triggered by the DataCoord and executed by the data nodes.

There are two main types of data compaction:

  • binlog compaction.

  • segment compaction.

Binlog compaction

There are two types of binlogs that can be compacted:

  • delta binlogs - these are generated when deleting data.

  • insert binlogs - these are generated when segments are flushed to disk (e.g. by flush()).

When Milvus performs a flush, the following happens:

  • insert events are appended into an insert binlog in disk.

  • delete events are appended into a delta binlog in disk.

Suppose we insert some data and then delete it afterwards. This means that we will have an event recorded in an insert binlog file and another event recoded in a delta binlog file. If we perform compaction, then these two events inside the insert binlog and delta binlog file will be removed - just as if this data has never been inserted into the database!

Since compaction removes all traces of data that has been deleted, the deleted data will not appear in the results of time travel search. If we wish to perform time travel search, we must specify the configuration parameter called common.retentionDuration, which indicates the time span in seconds where a compaction will not run.

By default, compaction occurs automatically depending on some conditions. We could also call the collection's compact() method to manually trigger a compaction.

Notice how not all delta binlogs are necessarily deleted after binlog compaction. This is because of the common.retentionDuration parameter. For instance, if we set common.retentionDuration=10 (seconds) and we perform bin log compaction, we will be deleting all information in delta binlogs with timestamp greater than 10 seconds ago (e.g. delete events appended 15 seconds ago). However, we will retain those delete events that happened within the last 10 seconds.

Milvus will attempt a global compaction either periodically or when a segment is flushed to disk. A global compaction will happen when the following two conditions are satisfied:

  • the number of rows in the delta binlogs exceeds 20% of the total number of rows in the segment.

  • the size of a delta binlog exceeds 10MB.

Segment compaction

A segment compaction involves combining multiple small sealed segments into larger sealed segments. This is illustrated below:

Note the following:

  • growing segments are not compacted.

  • the size of segments are compaction cannot exceed the maximum segment size set by segment.maxSize.

Just like for binlog compaction, Milvus will attempt to perform segment compaction periodically or when a segment has been flushed to disk. Milvus will perform segment compaction if the number of segments with size less than 0.5 * segment.maxSize exceeds 10.

Entities

Although not shown in the diagram, segments consists of so-called entities. These can be likened to rows in relational databases.

High-level architecture

The diagram below illustrates some of the main components of Milvus:

Note that this diagram does not show all the main components of Milvus to initially keep things simple. Later, we will add the remaining components like the coordinate layer to the diagram.

This architecture decouples the three main database operations of writing, reading and indexing. This means that we can scale these three types of operations independently. For instance, if our project deals mainly with static (rare updates) data with frequent reads, then we can provision more query nodes and keep only a small number of data and index nodes. In fact, Milvus claims that the query throughput can increase linearly with the number of query nodes.

Data insertion

Below is a diagram illustrating how Milvus handles an user's insert(-) request:

When a user performs an insertion operation, the request first passes through the proxy, which will then route the request to the log broker. At this point, Milvus will already respond to the client that the insertion operation is complete even though the data has not yet been persisted to disk.

The log broker (Pulsar or Kafka) then writes a log, or a so-called write-ahead log (WAL) to record the insertion operation. The data nodes are subscribed to the log broker, which means that the logs are passed to the data nodes. Instead of immediately writing the log to the object storage, the data node will first insert them into an insertion buffer. This is because disk inserts are relatively more expensive, and so Milvus accumulates the inserted data in an in-memory buffer instead. After the insertion buffer reaches some size, the logs are finally written to the object storage (disk) for persistence.

Querying data

Before we start querying data, we must first load our collection (or a partition) into memory by calling collection.load(). This loads our data in the memory of the query nodes. Data within Milvus can be classified into two types:

  • streaming (incremental) data - this is data held in the insertion buffer, which has not yet been written to disk (object storage). Streaming data is essentially data that has been very recently inserted into the database. Streaming data is also referred to as growing segments.

  • historical (batch) data - this is data that has already been flushed and written into the object storage. Historical data is also referred to as sealed segments.

The QueryCoord in the coordinate layer distributes the data among the query nodes. There are two types of allocators within the QueryCoord:

  • the segment allocator assigns sealed segments to the query nodes.

  • the channel allocator assigns different so-called data manipulation channels (DMchannel) in the log broker. Using the DMchannel, a query node is able to access and load streaming data (growing segments) in the log broker.

For instance, suppose we have:

  • 4 sealed segments S1, S2, S3 and S4.

  • 2 growing segments G1 and G2.

  • 2 query nodes.

Suppose the QueryCoord assigns the following:

  • query node 1 to load sealed segments S1 and S3.

  • query node 1 to use DMchannel 1 to load G1.

  • query node 2 to load sealed segments S2 and S4.

  • query node 2 to use DMchannel 1 to load G2.

This is illustrated below:

Real-time query

Not all newly inserted data is available for query. Recall that when a client performs an insertion operation, Milvus will return a successful response to the client when the request is received by the log broker. From the user's perspective, the insertion operation is complete but from the system's perspective, the data is neither available in the query node nor has it been written (flushed) to the object storage.

When the client performs a query immediately after inserting data, the newly inserted data will not be returned since only the data available in the query node can be returned.

Each insert operation is given a timestamp by the timestamp oracle (TSO) service in the root coordinator. As discussed earlier, all insert operations go through the DMchannel. For example, consider the following example:

Here, we see that there are 7 insert operations in the DMchannel. The number assigned to each operation represents its timestamp. The fourth operation, which is marked in orange, is a so-called timetick message. Data whose timestamp is earlier than the timestamp of the timetick message already resides in the log broker and is thus available for querying. For instance, if the query node's latest timetick is currently at 4, then it is only able to load and query inserted data that has a timestamp of less than 4.

When a user submits a query, Milvus will generate a so-called query message that describes how the query should be performed. We can treat the query message as an object that contains the following properties:

  • msgID: the ID of the message for internal use.

  • collectionID: the ID of the collection to query.

  • execPlan: the execution plan when scalar filtering is required.

  • service_ts: all data inserted before service_ts is available for query.

  • travel_ts: query will be performed on the state of the database at travel_ts.

  • guarantee_ts: query will only be performed when guarantee_ts < service_ts. The value of guarantee_ts is set depending on the consistency level we specify when querying.

If guarantee_ts > service_ts, then the query will not be performed since service_ts has not been updated. In this case, the query message will be queued until the service_ts becomes larger than guarantee_ts.

The guarantee_ts is set automatically by Milvus based on the consistency level we specify during query time. Milvus offers the following consistency levels, from strongest to weakest:

  • CONSISTENCY_STRONG: the guarantee_ts is the same as the latest system timestamp.

  • CONSISTENCY_BOUNDED: the guarantee_ts is set slightly smaller than the latest system timestamp. This means that query nodes may omit data that is inserted very recently.

  • CONSISTENCY_SESSION: the guarantee_ts is the timestamp of the last write operation that the client performed. This ensures that the client can at least query for the data that the client inserted.

  • CONSISTENCY_EVENTUALLY: the query nodes execute the query immediately with the available data.

The default consistency level is CONSISTENCY_BOUNDED.

Access layer

Database commands can be classified into one of the following:

  • DDL (data definition language) - commands to define or modify database schema.

  • DCL (data control language) - commands to define user permissions.

  • DML (data management language) - CRUD operations.

The role of the access layer is to:

  • preprocess the incoming requests. For instance, if the request is to create a collection that actually already exists, then the proxy will return an error straight away. As another example, suppose the user tries to insert a vector of dimension 100 when we defined the schema to be of dimension 1000. The proxy will throw an error without passing this request to the rest of the system.

  • routes DDL and DCL requests to the coordinate layer.

  • routes DML requests to the log broker.

Coordinate layer

The coordinate layer has multiple nodes:

Root coordinator node

The role of the root coordinator node is to:

  • handle DDL (data definition language) and DCL (data control language) requests.

  • manage the timer used internally by the system. This is important because Milvus heavily relies on timestamps.

Data coordinator node

The role of the data coordinator node is to:

  • trigger background data operations such as flushing data to disk and compressing data in case it is segmented.

  • maintains metadata of the inserted data. Examples of metadata could be the size and source of the inserted data.

  • supervises data nodes. For instance, the data coordinate node will treats the case when the data nodes disconnect and reconnect.

Query coordinator node

The role of the query coordinator node is to:

  • coordinate user searches and queries.

  • manages load balancing for the query nodes. This is important because query nodes are stateless and hence unaware of other nodes.

Index coordinator node

The role of the index coordinator node is to:

  • coordinate index nodes to build indexes for the user data.

  • maintains metadata of the index. Milvus supports several index types such as FAISS, ANNOY and HNSW - each of these requires maintaining specific metadata of the index. Users can also specify some index parameters when performing search.

Worker layer

DML (data management language) requests are handled by the worker layer.

Data node

The role of the data node is to:

  • subscribe to the log broker and process the incremental log data.

  • packs and stores the log data into log snapshots. For instance, suppose we send 3 vector insertion requests. We will have 3 log files, one of each request. The data note will read the 3 log files and pack them into a single file known as a snapshot. This snapshot is then saved into the disk for persistence.

  • carries out mutation operations such as update and deletion of data.

Index node

The role of the index node is to build the index.

  • both scalar and vector indexing are supported.

  • indexing is the typically the most resource-intensive (CPU and memory) task in the system as compared to other tasks such as insert operations. This is because unlike indexes in relational databases, vector indexes cannot be built using tree-based models due to their high dimensionality.

  • unlike relational databases that have indexes defined at the column level, Milvus builds index at the segment level.

Let's now go over how index node builds the index:

  1. reads the field log of a segment into memory from object storage.

  2. deserializes (or decodes) the log file to the data to build the index file.

  3. serializes (or encodes) this index file and writes it to object storage.

Query node

The role of the query node is to:

  • load the indexes and user data from the object storage.

  • run the query and fetch the results.

Everyone in the system is listening to the log broker for any new logs.

Storage layer

There are three types of storage components in Milvus: log broker, meta store and object storage.

Log broker

The backbone of Milvus is the log broker whose main role is to:

  • transcribes incoming CRUD operation in a form of a log.

  • store streaming data in memory.

The worker nodes are subscribed to the log broker and consume the logs to execute the CRUD operations.

Meta store

Milvus uses a key-value store called etcd for storing metadata (e.g. schema of a collection) that is used internally by the system. Note that meta data in this context does not represent business data (e.g. the gender of an user) but rather data reserved for internal use within the system.

The other key roles of etcd are to:

  • perform frequent health-checks to ensure everything in the system is functioning.

  • maintain checkpoints so that we can recover from system crashes.

All internal metadata is updated in etcd before before they are propagated to and synchronized with the coordinators.

NOTE

The first version of Milvus used a standalone MySQL database as the meta store. However, the second version of Milvus uses etcd instead because it provides higher availability through its leader election mechanism upon failure. This is also the reason etcd also serves as the metadata store for Kubernetes.

Object storage

The object storage stores unstructured data such as:

  • scalar and vector data (a log files).

  • index files for scalar and vector data.

Milvus supports AWS S3, Azure blob and an open source object storage called MinIO. Unlike AWS S3 and Azure Blob, MinIO can be hosted on our local machines. What's particularly nice about MinIO is that it is fully compatible with AWS S3, which means that we can interact with MinIO as if it is AWS S3. For instance, we can use the Python AWS S3 package to insert data into MinIO.

Tunable consistency

Milvus offers flexible consistency levels:

  • strong consistency: a strong consistent query is guaranteed to look at all the inserted data including newly ones that is yet to be indexed. This can be useful in some situations when we do need to even fresh data in the query results such as when we want to test our database implementation. However, the downside is that the query will be much slower.

Strong consistency

Strong consistency is a consistency model used in distributed systems, where all replicas of a data object are guaranteed to have the same value at all times. In other words, strong consistency provides a global view of the data across all replicas, and ensures that any read operation will always return the most recent value of the data.

In a strongly consistent system, updates to a data object are propagated synchronously to all replicas, and there is no temporary inconsistency or delay in the propagation of updates. This means that any read operation performed on any replica will always return the same, most up-to-date value of the data.

Strong consistency is often used in distributed systems where data integrity is critical, such as financial systems, healthcare systems, and other applications where the accuracy and consistency of data are essential. However, strong consistency can come at the cost of increased latency and reduced availability, as each update must be propagated to all replicas before the system can respond to any read requests.

One of the challenges of designing a strongly consistent system is dealing with the increased network traffic and latency that comes with synchronously propagating updates to all replicas. To mitigate these challenges, systems may use techniques such as quorum-based replication, consensus algorithms, or data partitioning to distribute the load and reduce the impact on system performance.

Imagine a distributed system that stores bank account balances across multiple replicas. In a strongly consistent system, whenever a user deposits or withdraws money from their account, the system must ensure that all replicas are updated synchronously to reflect the correct balance.

For example, let's say a user deposits $100 into their account. The system will update the balance on one replica, and then synchronously propagate the update to all other replicas before returning a response to the user. This ensures that all replicas have the same, most up-to-date balance of $100.

Now, let's say another user tries to withdraw $50 from the same account. Before the system can respond to the withdrawal request, it must first ensure that all replicas have been updated with the deposit of $100. Once all replicas have been updated, the system will deduct $50 from the account balance on all replicas and return a response to the user. Again, this ensures that all replicas have the same, most up-to-date balance of $50.

Overall, strong consistency ensures that all replicas of the account balance are always in sync and that any read operation performed on any replica will always return the same, most up-to-date balance.

Bounded staleness

Bounded staleness refers to a type of consistency model in distributed systems where there is a limit on how out-of-date data can be before it is considered stale. This means that while there may be some delay in propagating updates across all nodes in the system, the data will eventually become consistent within a pre-defined time period. Essentially, bounded staleness is a balance between strong consistency (where all nodes have the same data at all times) and eventual consistency (where nodes eventually converge on the same data but may have temporary inconsistencies).

Let's go over an example. Suppose we have a distributed system that is keeping track of the inventory of a store. Whenever a customer purchases an item, the system updates the inventory count. However, due to network latency and other factors, this update may not be immediately propagated to all nodes in the system.

With bounded staleness, you might define a limit of 10 seconds for how out-of-date the data can be. This means that if a customer purchases an item, the inventory count may be temporarily inconsistent across nodes for up to 10 seconds. After that time, all nodes should have the same updated inventory count.

So, if a customer purchases an item at 12:00:00 and the update takes 5 seconds to propagate to all nodes, a node that receives a request for the inventory count at 12:00:03 may see the old count. However, if that same node receives a request for the inventory count at 12:00:15, it should see the updated count.

In summary, bounded staleness allows for some temporary inconsistencies in the data, but ensures that all nodes will eventually converge on the same data within a defined time period.

Session consistency

The consistency level of a session in a distributed system refers to the level of consistency that is guaranteed for read and write operations performed within that session.

In general, the consistency level of a session is considered to be stronger than that of a single read or write operation, but weaker than that of a global consistency level that applies to all clients in the system.

In a session consistency model, all read and write operations performed by a client within a session are guaranteed to be consistent with each other. This means that if a client performs a write operation and then immediately performs a read operation within the same session, the read operation will return the updated value that was written by the client.

However, if another client performs a write operation on the same data outside of the session, the session consistency model does not guarantee that the client will immediately see the updated value. Instead, the consistency guarantees are limited to the operations performed within the session.

Overall, the consistency level of a session can be a good compromise between strong consistency and high availability, as it allows clients to perform read and write operations with relatively low latency while still maintaining a reasonable level of consistency.

Imagine a distributed system that stores user profile data across multiple replicas. In a session consistency model, whenever a user logs in and performs updates to their profile, the system must ensure that all subsequent read and write operations performed by that user within the same session are consistent with each other.

For example, let's say a user logs in and updates their profile picture. The system will update the profile picture on one replica and then return a response to the user. Subsequent read operations performed by the user within the same session will always return the updated profile picture, regardless of which replica was accessed.

Now, let's say the user updates their profile bio. Again, the system will update the bio on one replica and then return a response to the user. Subsequent read operations performed by the user within the same session will always return the updated bio, regardless of which replica was accessed.

However, if another user tries to access the same profile data outside of the session, the session consistency model does not guarantee that they will immediately see the updated values. Instead, the consistency guarantees are limited to the operations performed within the same session.

Overall, session consistency ensures that all read and write operations performed by a user within the same session are consistent with each other, but does not guarantee global consistency across all clients in the system.

Eventual consistency

Eventual consistency is a consistency model used in distributed systems, where data replicas may be temporarily inconsistent with each other, but will eventually converge and become consistent. In other words, eventual consistency allows for temporary inconsistencies in the system, but guarantees that all replicas will eventually converge to the same state.

In an eventually consistent system, updates to a data object are propagated asynchronously to all replicas, and there is no guarantee that all replicas will see the same version of the data at any given time. However, over time, all replicas will eventually receive all updates and converge to the same state.

Eventual consistency is often used in distributed systems where high availability and low latency are important, but strong consistency is not critical. Examples of systems that use eventual consistency include DNS (Domain Name System) and Amazon S3 (Simple Storage Service).

One of the challenges of designing an eventually consistent system is dealing with conflicts that can arise when multiple replicas receive updates to the same data object at the same time. To handle conflicts, systems may use techniques such as versioning, conflict resolution algorithms, or application-level conflict handling.

For example, let's say a user creates a new post. The system will store the new post on one replica and then asynchronously propagate the update to all other replicas. This means that there may be a temporary inconsistency between replicas, where some replicas have the new post while others do not.

Now, let's say another user tries to access the new post. Depending on which replica they access, they may or may not see the new post immediately. However, the system guarantees that eventually, all replicas will receive the update and converge to the same state, where all replicas have the new post.

Similarly, if a user deletes a post, the system will asynchronously propagate the update to all replicas, and there may be a temporary inconsistency where some replicas still have the deleted post. However, eventually, all replicas will receive the update and converge to the same state, where all replicas have the post deleted.

Overall, eventual consistency allows for temporary inconsistencies between replicas but guarantees that all replicas will eventually converge to the same state. This makes it a useful consistency model for distributed systems where high availability and low latency are important, and where temporary inconsistencies can be tolerated.

Time Travel

Since Milvus maintains these logs in the persistent object storage, it is able to recreate the exact state (or view) of the database at any particular moment in time. Users can also query against a specific view of the database at any moment in time by supplying a timestamp when querying. This means that we can even query for data that has been deleted 🤯.

This is a distinctive feature of Milvus because most databases only allow for such rollbacks by continuously taking snapshots of the database, which is costly in terms of storage and also a pain to maintain. Milvus, on the other hand, maintains timestamps of all the insertion and deletion events. This means that Milvus does not need to snapshots by our data - at query time, Milvus simply needs to refer to the timestamp of the events to know the state of the database was at the specified timestamp.

WARNING

By default, time travel search is disabled in Milvus. We can enable it by specifying common.retentionDuration in the config file. common.retentionDuration represents the number of seconds that Milvus can time travel back to. For instance, if we set its value to 10, then this means that we are able to query against the view of the database, say 6 seconds ago, but we are not able to do so for view 15 seconds ago.

When a proxy receives an insertion or deletion event, it asks the RootCoord (or more specifically the timestamp oracle) for the system's timestamp. The proxy then embeds this information into the event itself. Interestingly, even if we don't specify a timestamp column in our data schema, Milvus will automatically create one for us and stores it alongside our other columns. This is why when we load the collection in memory via collection.load(), Milvus know the exact time at which our data was inserted or deleted.

When we supply the travel_timestamp argument in our query(-), Milvus will pass the argument value to an internal component called segcore. The segcore will filter out all the data segments that do not satisfy the condition data.timestamp <= travel_timestamp. Remember, we can think of the timestamp as just another field of our data!

Load a collection into memory using collection.load() involves reading sealed segments from disk. Remember, sealed segments contain many rows of our data (formally known as entities). Upon reading the sealed segments, Milvus internally creates an index based on the timestamp (TimestampIndex). This index contains information about the following:

  • the smallest timestamp value of the rows.

  • the largest timestamp value of the rows.

  • the offset and row number of the timestamps.

During query time, Milvus uses the TimestampIndex to perform efficient filtering:

  • if the specified travel_timestamp is less than the smallest timestamp of a sealed segment, then it means that none of the data in the segment fulfils our timestamp condition. In this case, the bitset for every data in the segment is marked as 1. A bitset can simply thought of as a boolean. This also means that the bitmask (which can be thought of as a boolean mask) has all 1s as its entries.

  • if the specified travel_timestamp is larger than the largest timestamp, then it means that all of the data in the segment fulfils our timestamp condition. In this case, the bitset for every data in the segment is marked as 0. Therefore, the bitmask of this segment is filled with 0s.

  • otherwise, we have the case when some of the data in the segment fulfils the travel_timestamp condition. Milvus will read the data in the segment one by one, and then obtain a bitmask for the segment.

For growing segments, all the data already exists in memory (as they have not yet been flushed to disk). All the data has a timestamp attached and their insertion order is sequential, which means Milvus can perform a simple binary search to find the data that fulfils our timestamp condition. Finally, Milvus performs a filtering operation based on the found offset.

Delete operations

Internally, there are two type of delete operations:

  • deleting data in the insert buffer.

  • deleting data in the flushed disk.

The first case is straight-forward - Milvus can easily find and delete the data in memory given its ID. The second case is more involved. The inefficient approach of deleting data in the flushed disk is to first read the segment that contains the data into memory. Next, we perform the deletion in memory, and then finally write back the updated segment into disk. This is slow because reading from and writing to disk is slow.

Milvus implements a so-called soft-deletion, in which delete operations do not actually remove any data from disk.

When a user performs delete(), the following series of operations happen:

  1. Milvus first records down the ID of the data to delete in memory.

  2. When a flush operation occurs, MIlvus writes the deleted ID to a file within the corresponding segment called delta binlogs. Remember, segments contain the actual data, delta binlogs. In practice, Milvus actually uses the offset (or position) of the data to delete within a segment.

Milvus must prevent returning data that has been previously deleted by the user. Milvus maintains a bitmap, which can be thought of as a list of booleans - a value of 1 indicates . During search, Milvus will ensure that the index (e.g. FAISS) will not use vectors corresponding to active bits in its distance calculation.

When deleting flushed data from disk, Milvus first does not know which segment contains the data to be deleted. One naive solution would be to read all segments in disk and then check for the existence of the data. Instead of such an inefficient solution, Milvus uses a data structure known as bloom filter.

Bloom filter is a probabilities data structure that tells you whether an item is a member of a set. The reason why it is probabilistic it tells you that the item is definitely not a member or that the item could or could not be a member.

Each segment has its own bloom filter and this helps Milvus reduce the number of segments we check for the deleted ID. However, due to its probabilistic nature, in the case when the bloom filter responds with "ID might be present", Milvus must read the segment and check the IDs file to confirm whether ID is actually present.

Despite such limitation of bloom filters, they were chosen for Milvus (as opposed to other data structures such as hash tables) because they are extremely space-efficient and performant.

All data inserted in a batch will have the same timestamp.

Bitset

Workers nodes are subscribed to the log broker, which means that they are alerted whenever the log broker has new logs that need to be processed. The worker nodes then process (or consume) the new logs to perform CRUD operations.

When we call collection.load() in our code, the QueryCoord will ask the DataCoord about all the sealed and growing segments in the system.

Once the allocated part of the sealed segment expires, they can be manually flushed, which means written into the object storage for data persistence.

robocat
Published by Isshin Inada
Edited by 0 others
Did you find this page useful?
thumb_up
thumb_down
Comment
Citation
Ask a question or leave a feedback...