c# - Why does my data flow finishes before all async calls are fully processed from BufferBlock? -


i have data flow follows.

1. task reads text file in chunks , adds them batchblock<chunksize>

2. actionblock linked above batchblock partitions data batches , adds them bufferblock

3. transformationblock linked bufferblock, spawns async task each batch

4. process finished when spanwed async calls finished.

the below code isn't working expected. finishes before batches processed. missing?

private static void dataflow(string filepath, int chunksize, int batchsize) {     int chunkcount = 0;     int batchcount = 0;      batchblock<string> chunkblock = new batchblock<string>(chunksize);     bufferblock<ienumerable<string>> batchblock = new bufferblock<ienumerable<string>>();      task producetask = task.factory.startnew(() =>     {         foreach (var line in file.readlines(filepath))         {             chunkblock.post(line);         }          console.writeline("finished producing");         chunkblock.complete();     });      var makebatches = new actionblock<string[]>(t =>     {         console.writeline("got chunk  " + ++chunkcount);          // partition each chunk smaller chunks grouped on column 1         var partitions = t.groupby(c => c.split(',')[0], (key, g) => g);          // further beakdown chunks batch size groups         var groups = partitions.select(x => x.select((i, index) => new { i, index }).groupby(g => g.index / batchsize, e => e.i));          // batches groups         var batches = groups.selectmany(x => x).select(y => y.select(z => z));          foreach (var batch in batches)         {             batchblock.post(batch);         }          batchblock.complete();      }, new executiondataflowblockoptions { maxdegreeofparallelism = 1 });      chunkblock.linkto(makebatches, new dataflowlinkoptions { propagatecompletion = true });      var executebatches = new transformblock<ienumerable<string>, ienumerable<string>>(async b =>     {         console.writeline("got batch  " + ++batchcount);         await executebatch(b);         return b;      }, new executiondataflowblockoptions { maxdegreeofparallelism = dataflowblockoptions.unbounded });      batchblock.linkto(executebatches, new dataflowlinkoptions { propagatecompletion = true });      var finishbatches = new actionblock<ienumerable<string>>(b =>     {         console.writeline("finised executing  batch" + batchcount);     }, new executiondataflowblockoptions { maxdegreeofparallelism = dataflowblockoptions.unbounded });      executebatches.linkto(finishbatches, new dataflowlinkoptions { propagatecompletion = true });      task.waitall(producetask);     console.writeline("production complete");      makebatches.completion.wait();     console.writeline("making batches complete");      executebatches.completion.wait();     console.writeline("executing batches complete");      task.waitall(finishbatches.completion);      console.writeline("process complete total chunks " + chunkcount + " , total batches " + batchcount);     console.readline(); }  // async task simulate network i/o private static async task executebatch(ienumerable<string> batch) {     console.writeline("executing batch ");     await task.run(() => system.threading.thread.sleep(2000)); } 

chunkblock calling makebatches each chunk, , you're calling batchblock.complete() within makebatches, after first batch quits accepting new posts.


Comments

Popular posts from this blog

php - Admin SDK -- get information about the group -

dns - How To Use Custom Nameserver On Free Cloudflare? -

Python Error - TypeError: input expected at most 1 arguments, got 3 -