Parallel Task Library in C#
- mdbhowmik

- Nov 12, 2020
- 4 min read
Updated: Jan 8, 2021
#.net code #parallel tasking #concurrent bag
A solution in .net can be designed in various ways. My demonstration is to explain the use of TPL in C#. The use of concurrent bag collection is an example of concurrency resolution. IN case, you use List collection here, you will end up in runtime error. At the end do leave your comments , if I need to explain more.
The problem statement : A file having the rainfall readings requires to map to the table in database. The idea is to get the rainfall measurements by date.
Input file :

Expected result : the output of this solution is a sql table filled in with the rainfall reading as follows.

The solution :
A console app is built in Visual studio 2019. This application establishes a connection to the input file. Reads the input , uses string manipulation & a Concurrent Bag<T> to extract the data. The data extraction is a parallel process to enhance the performance of the program. Then there is a conversion of the collection into the Data table, which in turn helps using the Bulk Operation in SQL. The SQLBulk inserts all the data at once, adding to the performance of this operation.
This is just an example to demonstrate the usage of parallel task library & Bulk update. Below are the steps.
A connection is established to a local mdf file & the input file is loaded into the memory stream.

2. A parsing logic is required to extract the data from the file
The following structures are created to process the data
string[] filepaths;
string matchingXYtext = "Grid-ref=";
string matchingYearstext = "Years=";
enum matchingmonths { Jan, Feb, March, April, May, June, July, Aug, Sept, Oct, Nov, Dec };
List<measures> parsedlistfromfile;
class measures
{
public int X { get; set; }
public int Y { set; get; }
public int startyear { get; set; }
public int endyear { get; set; }
public int[,] infomatrix { get; set; }
}
ConcurrentBag<Rainfallrecord> parsedRecordListbeforeDBinsert;
DataTable parsedRecordListbeforeDBinsertDT;the following function is created for parsing the data
void parseFile()
{
if (filepaths.Count() > 0)
{
string line;
try
{
foreach (string file in filepaths)
{
StreamReader filestream = new StreamReader(file);
parsedlistfromfile = new List<measures>();
int startyear = 0;
int endyear = 0;
while ((line = filestream.ReadLine()) != null)
{
if (line.Contains(matchingYearstext))
{
int.TryParse(line.Substring(line.IndexOf(matchingYearstext) + matchingYearstext.Length, 4), out startyear);
int.TryParse(line.Substring(line.IndexOf(matchingYearstext) + matchingYearstext.Length + 4 + 1, 4), out endyear);
}
if (line.Contains(matchingXYtext))
{
var obj = new measures() { startyear = startyear, endyear = endyear };
int xValue = 0;
int yValue = 0;
var xytext = line.Substring(line.IndexOf(matchingXYtext) + matchingXYtext.Length).Split(new char[] { ',', ' ' }, StringSplitOptions.RemoveEmptyEntries);
int.TryParse(xytext[0], out xValue);
int.TryParse(xytext[1], out yValue);
obj.X = xValue;
obj.Y = yValue;
if (xValue > 0 && yValue > 0)
{
int[,] xyValue = new int[12, 12];
for (int i = 0; i < 12; i++)
{
line = filestream.ReadLine();
if (line == null)
break;
var readingLine = line.Split(new char[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);
int j = 0;
foreach (string s in readingLine)
{
int noval = 0;
int.TryParse(s, out noval);
xyValue[i, j++] = noval;
}
}
obj.infomatrix = xyValue;
parsedlistfromfile.Add(obj);
}
}
}
if (file != null)
filestream.Close();
}
}
catch { }
}
}3. the following function is created to convert the collection into DT
void convertToDTforDBOperation()
{
if (parsedlistfromfile != null & parsedlistfromfile.Count > 0)
{
// to monitor the processing time
System.Diagnostics.Stopwatch sw = new System.Diagnostics.Stopwatch();
sw.Start();
#region Parallel processing of data for conversion
Parallel.ForEach(parsedlistfromfile, (item, index) =>
{
for (int r = 0; r < item.infomatrix.GetLength(0); r++)
{
for (int c = 0; c < item.infomatrix.GetLength(1); c++)
{
/* //Note: Parallel processing is not working on datatable. its is getting runtime error as datatable is corrupted.
//DataRow dr = parsedRecordListbeforeDBinsertDT.NewRow();
//dr["X"] = item.X;
//dr["Y"] = item.Y;
//dr["monthOn"] = r + 1;
//dr["yearOn"] = item.startyear +c;
//dr["record"] = item.infomatrix[r, c];
// dr["recorddate"]= new DateTime(item.startyear,r+1,1);
//parsedRecordListbeforeDBinsertDT.Rows.Add(dr);*/
parsedRecordListbeforeDBinsert.Add(new Rainfallrecord()
{ X = item.X, Y = item.Y, monthOn = c + 1, yearOn = item.startyear + r, record = item.infomatrix[r, c]
, recorddate= new DateTime(item.startyear+r,c+1,1)
});
}
}
});
#endregion
sw.Stop();
Console.WriteLine(sw.ElapsedMilliseconds);
//convert list to db
parsedRecordListbeforeDBinsertDT = ConvertToDataTable(parsedRecordListbeforeDBinsert.ToList());
#region using for loop of data for conversion
/*
sw.Reset();
sw.Start();
foreach (var record in parsedlistfromfile)
{
for(int r=0;r<record.infomatrix.GetLength(0);r++)
{
for(int c=0; c<record.infomatrix.GetLength(1);c++)
{
parsedRecordListbeforeDBinsert.Add(new Rainfallrecord()
{ X = record.X, Y = record.Y, monthOn = r + 1, yearOn = int.Parse(record.startyear) + c, record= record.infomatrix[r,c], recorddate= new DateTime(record.startyear,r+1,1) });
}
}
}
sw.Stop();
Console.WriteLine(sw.ElapsedMilliseconds);
*/
#endregion
}
}
DataTable ConvertToDataTable<T>(IList<T> data)
{
PropertyDescriptorCollection properties = TypeDescriptor.GetProperties(typeof(T));
DataTable table = new DataTable();
foreach (PropertyDescriptor prop in properties)
table.Columns.Add(prop.Name, Nullable.GetUnderlyingType(prop.PropertyType) ?? prop.PropertyType);
foreach (T item in data)
{
DataRow row = table.NewRow();
foreach (PropertyDescriptor prop in properties)
{
row[prop.Name] = prop.GetValue(item)?? DBNull.Value ;
}
if(!row.ItemArray.Contains(DBNull.Value))
table.Rows.Add(row);
}
return table;
}4. Finally a method to call the parsing logic & the conversion
public DataTable parseInputFile()
{
try
{
parseFile();
convertToDTforDBOperation();
}
catch (IOException)
{
//log
}
catch (Exception ex)
{
}
return parsedRecordListbeforeDBinsertDT;
}
5. This is a console app, its main looks like
static void Main(string[] args)
{
string mdffilepath = Directory.GetParent(Directory.GetCurrentDirectory()).Parent.Parent.FullName+ "\\RainfalPrecipitation.Utilities\\RainfallReadingDB.mdf";
sqldbConn = "Data Source=(LocalDB)\\MSSQLLocalDB;AttachDbFilename="+ mdffilepath + ";Integrated Security=True";
inputfilestoprocess = Directory.GetFiles(Directory.GetCurrentDirectory() + "\\..\\..\\datafiles");
destinationTableName = "Reading";
process();
}
static void process()
{
try
{
IPrecipitationData p = new PrecipitationData(inputfilestoprocess);
DataTable dt = p.parseInputFile();
IDAL sqldal = new DAL(sqldbConn);
sqldal.bulkSQLInsert(dt,destinationTableName);
}
catch(Exception ex)
{
}
}Kindly note that the complete working solution is available in git. Click here to download the code.

Comments