Concepts
Microsoft Cosmos DB is a fully managed, globally distributed, and multi-model database service offered by Microsoft Azure. It provides seamless scalability and high availability, making it an ideal choice for building data-intensive native applications. In this article, we will explore how to efficiently move data using client SDK bulk operations within Azure Cosmos DB.
Bulk Import Using .NET SDK
The .NET SDK for Azure Cosmos DB provides the BulkExecutor
class to facilitate high-throughput bulk import operations. This class offers methods for uploading documents in bulk using parallelization, reducing the overall time required for data migration.
Here’s an example of how you can use the BulkExecutor
class for bulk importing using the .NET SDK:
csharp
using Microsoft.Azure.Cosmos;
using Microsoft.Azure.Documents.Client;
using System.Collections.Generic;
using System.Threading.Tasks;
// Initialize Cosmos Client
CosmosClient cosmosClient = new CosmosClient("connectionString");
// Create or get a reference to your Cosmos database
Database database = await cosmosClient.Databases.CreateIfNotExistsAsync("myDatabase");
// Create or get a reference to your Cosmos container
Container container = await database.Containers.CreateIfNotExistsAsync("myContainer", "/partitionKeyPath");
// Create a list of documents to import
IEnumerable
// Configure bulk import options
BulkExecutionOptions options = new BulkExecutionOptions()
{
EnableUpsert = true, // Enable upsert (update or insert) behavior
MaxConcurrencyPerPartitionKey = 10, // Set maximum concurrency per partition key
};
// Create an instance of the BulkExecutor
using (BulkExecutor bulkExecutor = new BulkExecutor(cosmosClient, container))
{
// Perform bulk import
BulkImportResponse bulkImportResponse = await bulkExecutor.BulkImportAsync(
documents,
enableUpsert: options.EnableUpsert,
maxConcurrencyPerPartitionKey: options.MaxConcurrencyPerPartitionKey);
}
In the above code snippet, we first initialize the CosmosClient
with your Cosmos DB connection string. Then we create or get a reference to the desired database and container. Next, we create a list of documents to import.
To configure the bulk import options, we set the EnableUpsert
flag to enable upsert behavior (update or insert) and specify the MaxConcurrencyPerPartitionKey
value to control the maximum concurrency allowed per partition key.
Finally, we create an instance of BulkExecutor
and call the BulkImportAsync
method, passing in the list of documents and the import options. This method performs the bulk import operation.
Bulk Update Using Java SDK
Similarly, Azure Cosmos DB provides bulk support for data updates using the Java SDK. The BulkExecutor
class in the Java SDK allows efficient bulk execution of update queries.
Here’s an example of how to perform bulk updates using the Java SDK:
java
import com.azure.cosmos.*;
import com.azure.cosmos.models.*;
import com.azure.cosmos.bulkexecutor.*;
// Initialize CosmosClient
CosmosClient cosmosClient = new CosmosClientBuilder()
.endpoint("https://mycosmosdb.documents.azure.com:443/")
.key("myAuthKey")
.buildClient();
// Create or get a reference to your Cosmos database
CosmosDatabase database = cosmosClient.createDatabaseIfNotExists("myDatabase");
// Create or get a reference to your Cosmos container
CosmosContainer container = database.createContainerIfNotExists(
"myContainer",
"/partitionKeyPath",
throughput);
// Create an instance of the BulkExecutor
BulkExecutor bulkExecutor = BulkExecutor.builder()
.cosmosClient(cosmosClient)
.container(container)
.build();
// Create an instance of the bulk executor context
BulkExecutorContext bulkExecutorContext = bulkExecutor.getContextBuilder()
.build();
// Define a query to update documents in bulk
String query = "SELECT * FROM c WHERE c.processed = false";
// Create update item request options
UpdateItemRequestOptions requestOptions = new UpdateItemRequestOptions()
.setPartitionKey(new PartitionKey("myPartitionKey")));
// Perform the bulk update operation
BulkUpdateResponse bulkUpdateResponse = bulkExecutorContext.bulkUpdate(
query,
requestOptions,
new BulkUpdateCallback() {
@Override
public void bulkUpdateResponse(BulkUpdateResponse bulkUpdateResponse) {
// Handle the response
}
}
);
In the above code snippet, we first initialize the CosmosClient
with the Cosmos DB endpoint and authorization key. Then we create or get a reference to the desired database and container, similar to the .NET SDK example.
Next, we create an instance of BulkExecutor
using the builder pattern and provide the CosmosClient
and CosmosContainer
objects.
We also create a BulkExecutorContext
using the BulkExecutor
instance, which helps define the bulk operation context.
In this example, we define a query string to select documents for update. We also create UpdateItemRequestOptions
to specify the partition key for the update operation.
Finally, we invoke the bulkUpdate
method on the BulkExecutorContext
instance, passing in the query, request options, and a callback to handle the response.
By utilizing bulk operations provided by the Azure Cosmos DB SDKs, you can efficiently move and update data in your native applications, saving both time and costs. Make sure to refer to the official Microsoft documentation for details on additional features and options available for bulk operations in Azure Cosmos DB.
Answer the Questions in Comment Section
Which feature of Azure Cosmos DB allows for efficient bulk insertion of data?
a) Azure Data Factory
b) Azure Event Hubs
c) Cosmos DB Bulk executor library
d) Azure Managed Instance for Apache Cassandra
Correct answer: c) Cosmos DB Bulk executor library
True or False: The Cosmos DB Bulk executor library supports bulk deletion operations.
Correct answer: True
Which programming languages are supported by the Cosmos DB client SDK for bulk operations? (Select all that apply)
a) C#
b) Python
c) Java
d) Node.js
Correct answer: a) C#, b) Python, c) Java, d) Node.js
What is the maximum size of a batch that can be submitted for bulk insert in Cosmos DB?
a) 10 MB
b) 100 MB
c) 1 GB
d) 10 GB
Correct answer: b) 100 MB
True or False: Data consistency guarantees are automatically provided when performing bulk operations in Azure Cosmos DB.
Correct answer: False
Which API can be used to perform bulk operations in Cosmos DB?
a) SQL API
b) MongoDB API
c) Cassandra API
d) Table API
Correct answer: a) SQL API
Which configuration option can be used to control the throughput allocated for bulk operations in Cosmos DB?
a) Request units (RUs)
b) Consistency level
c) Partition key
d) Indexing policy
Correct answer: a) Request units (RUs)
True or False: Bulk operations with the Cosmos DB client SDK are only available for documents stored in a single partition.
Correct answer: False
What is the maximum number of concurrent operations that can be executed by the Cosmos DB Bulk executor library?
a) 100
b) 500
c) 1000
d) 5000
Correct answer: d) 5000
Which authentication method is supported by the Cosmos DB client SDK for bulk operations?
a) Azure Active Directory
b) Shared Key
c) Certificate-based authentication
d) OAuth
Correct answer: b) Shared Key
Great insights on using bulk operations with Cosmos DB client SDK.
Could someone explain the advantages of using bulk operations over individual requests?
How do we handle errors in bulk operations?
This blog post was extremely helpful!
I’m facing issues with timeout when using bulk operations on large datasets.
Can we use SDK bulk operations with the transactional batch feature?
Can someone elaborate on change feed processor with bulk operations?
Very detailed and well-written.