c# - Why does my data flow finishes before all async calls are fully processed from BufferBlock? -
i have data flow follows.
1. task reads text file in chunks , adds them batchblock<chunksize>
2. actionblock
linked above batchblock
partitions data batches , adds them bufferblock
3. transformationblock
linked bufferblock
, spawns async
task each batch
4. process finished when spanwed async
calls finished.
the below code isn't working expected. finishes before batches processed. missing?
private static void dataflow(string filepath, int chunksize, int batchsize) { int chunkcount = 0; int batchcount = 0; batchblock<string> chunkblock = new batchblock<string>(chunksize); bufferblock<ienumerable<string>> batchblock = new bufferblock<ienumerable<string>>(); task producetask = task.factory.startnew(() => { foreach (var line in file.readlines(filepath)) { chunkblock.post(line); } console.writeline("finished producing"); chunkblock.complete(); }); var makebatches = new actionblock<string[]>(t => { console.writeline("got chunk " + ++chunkcount); // partition each chunk smaller chunks grouped on column 1 var partitions = t.groupby(c => c.split(',')[0], (key, g) => g); // further beakdown chunks batch size groups var groups = partitions.select(x => x.select((i, index) => new { i, index }).groupby(g => g.index / batchsize, e => e.i)); // batches groups var batches = groups.selectmany(x => x).select(y => y.select(z => z)); foreach (var batch in batches) { batchblock.post(batch); } batchblock.complete(); }, new executiondataflowblockoptions { maxdegreeofparallelism = 1 }); chunkblock.linkto(makebatches, new dataflowlinkoptions { propagatecompletion = true }); var executebatches = new transformblock<ienumerable<string>, ienumerable<string>>(async b => { console.writeline("got batch " + ++batchcount); await executebatch(b); return b; }, new executiondataflowblockoptions { maxdegreeofparallelism = dataflowblockoptions.unbounded }); batchblock.linkto(executebatches, new dataflowlinkoptions { propagatecompletion = true }); var finishbatches = new actionblock<ienumerable<string>>(b => { console.writeline("finised executing batch" + batchcount); }, new executiondataflowblockoptions { maxdegreeofparallelism = dataflowblockoptions.unbounded }); executebatches.linkto(finishbatches, new dataflowlinkoptions { propagatecompletion = true }); task.waitall(producetask); console.writeline("production complete"); makebatches.completion.wait(); console.writeline("making batches complete"); executebatches.completion.wait(); console.writeline("executing batches complete"); task.waitall(finishbatches.completion); console.writeline("process complete total chunks " + chunkcount + " , total batches " + batchcount); console.readline(); } // async task simulate network i/o private static async task executebatch(ienumerable<string> batch) { console.writeline("executing batch "); await task.run(() => system.threading.thread.sleep(2000)); }
chunkblock
calling makebatches
each chunk, , you're calling batchblock.complete()
within makebatches
, after first batch quits accepting new posts.
Comments
Post a Comment