Here is simple C# console app to upload large size files to Azure Cosmos
static async Task Main(string[] args) { string EndpointUrl = "https://wabac.documents.azure.com:443/"; string AuthorizationKey = ""; string DatabaseName = "wabacDB"; string CollectionName = "wabacCollection"; int CollectionThroughput = 100000; string CollectionPartitionKey = "/rowid"; string filepath = @"D:\wabac_"; ConnectionPolicy connectionPolicy = new ConnectionPolicy { ConnectionMode = ConnectionMode.Direct, ConnectionProtocol = Protocol.Tcp }; DocumentClient client = new DocumentClient(new Uri(EndpointUrl), AuthorizationKey, connectionPolicy); Database database = client.CreateDatabaseQuery().Where(d => d.Id == DatabaseName).AsEnumerable().FirstOrDefault(); if (database == null) { //await client.DeleteDatabaseAsync(database.SelfLink); database = await client.CreateDatabaseAsync(new Database { Id = DatabaseName }); } DocumentCollection dataCollection = client.CreateDocumentCollectionQuery(UriFactory.CreateDatabaseUri(DatabaseName)) .Where(c => c.Id == CollectionName).AsEnumerable().FirstOrDefault(); if (dataCollection == null) { PartitionKeyDefinition partitionKey = new PartitionKeyDefinition { Paths = new Collection { CollectionPartitionKey } }; DocumentCollection collection = new DocumentCollection { Id = CollectionName, PartitionKey = partitionKey }; dataCollection = await client.CreateDocumentCollectionAsync( UriFactory.CreateDatabaseUri(DatabaseName), collection, new RequestOptions { OfferThroughput = CollectionThroughput }); } // Set retry options high during initialization (default values). client.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 30; client.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 9; IBulkExecutor bulkExecutor = new BulkExecutor(client, dataCollection); await bulkExecutor.InitializeAsync(); // Set retries to 0 to pass complete control to bulk executor. client.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 0; client.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 0; List documentsToImportInBatch = new List(); Console.WriteLine($"{DateTime.UtcNow} : Start"); for (int i = 1; i = 0) { string line = sr.ReadLine(); line = line.Replace("\\", "\\\\"); var s = line.Split('\t'); // 0-LineNumber ; 1-TimeStamp ; 2-Level ; string Similarity = string.IsNullOrWhiteSpace(s[8]) ? "0" : s[8]; documentsToImportInBatch.Add($"{{\"id\":\"L{s[0]}\",\"rowid\":\"L{s[0]}F\",\"LineNumber\":{s[0]},\"TimeStamp\":\"{s[1]}\",\"Level\":\"{s[2]}\"}}"); if (documentsToImportInBatch.Count > CollectionThroughput) { BulkImportResponse bulkImportResponse; do { bulkImportResponse = await bulkExecutor.BulkImportAsync( documents: documentsToImportInBatch, enableUpsert: false, disableAutomaticIdGeneration: true, maxConcurrencyPerPartitionKeyRange: null, maxInMemorySortingBatchSize: null, cancellationToken: CancellationToken.None); Console.WriteLine($"\t\t{DateTime.UtcNow} : uploaded {bulkImportResponse.NumberOfDocumentsImported}"); Console.WriteLine($"\t\t{DateTime.UtcNow} : error {bulkImportResponse.BadInputDocuments.Count}"); } while ((bulkImportResponse.NumberOfDocumentsImported + bulkImportResponse.BadInputDocuments.Count) < documentsToImportInBatch.Count); Console.WriteLine($"\t{DateTime.UtcNow} : done with batch {documentsToImportInBatch.Count}"); documentsToImportInBatch.Clear(); } Console.WriteLine($"\t{DateTime.UtcNow} : done with file#{i}"); } } Console.WriteLine($"{DateTime.UtcNow} : End"); }
Leave a comment