top of page
Search

Parallel Task Library in C#

Updated: Jan 8, 2021

#.net code #parallel tasking #concurrent bag


A solution in .net can be designed in various ways. My demonstration is to explain the use of TPL in C#. The use of concurrent bag collection is an example of concurrency resolution. IN case, you use List collection here, you will end up in runtime error. At the end do leave your comments , if I need to explain more.


The problem statement : A file having the rainfall readings requires to map to the table in database. The idea is to get the rainfall measurements by date.

Input file :

ree

Expected result : the output of this solution is a sql table filled in with the rainfall reading as follows.


ree

The solution :


A console app is built in Visual studio 2019. This application establishes a connection to the input file. Reads the input , uses string manipulation & a Concurrent Bag<T> to extract the data. The data extraction is a parallel process to enhance the performance of the program. Then there is a conversion of the collection into the Data table, which in turn helps using the Bulk Operation in SQL. The SQLBulk inserts all the data at once, adding to the performance of this operation.


This is just an example to demonstrate the usage of parallel task library & Bulk update. Below are the steps.


  1. A connection is established to a local mdf file & the input file is loaded into the memory stream.

ree

2. A parsing logic is required to extract the data from the file

The following structures are created to process the data



 string[] filepaths;
        string matchingXYtext = "Grid-ref=";
        string matchingYearstext = "Years=";
        enum matchingmonths { Jan, Feb, March, April, May, June, July, Aug, Sept, Oct, Nov, Dec };

        List<measures> parsedlistfromfile;

        class measures
        {
            public int X { get; set; }
            public int Y { set; get; }
            public int startyear { get; set; }
            public int endyear { get; set; }
            public int[,] infomatrix { get; set; }

        }
        ConcurrentBag<Rainfallrecord> parsedRecordListbeforeDBinsert;
        DataTable parsedRecordListbeforeDBinsertDT;

the following function is created for parsing the data



void parseFile()

        {
            if (filepaths.Count() > 0)
            {
                string line;
                try
                {
                    foreach (string file in filepaths)
                    {
                        StreamReader filestream = new StreamReader(file);
                        parsedlistfromfile = new List<measures>();
                        int startyear = 0;
                        int endyear = 0;
                        while ((line = filestream.ReadLine()) != null)
                        {

                            if (line.Contains(matchingYearstext))
                            {
                                int.TryParse(line.Substring(line.IndexOf(matchingYearstext) + matchingYearstext.Length, 4), out startyear);
                                int.TryParse(line.Substring(line.IndexOf(matchingYearstext) + matchingYearstext.Length + 4 + 1, 4), out endyear);

                            }
                            if (line.Contains(matchingXYtext))
                            {
                                var obj = new measures() { startyear = startyear, endyear = endyear };
                                int xValue = 0;
                                int yValue = 0;
                                var xytext = line.Substring(line.IndexOf(matchingXYtext) + matchingXYtext.Length).Split(new char[] { ',', ' ' }, StringSplitOptions.RemoveEmptyEntries);
                                int.TryParse(xytext[0], out xValue);
                                int.TryParse(xytext[1], out yValue);
                                obj.X = xValue;
                                obj.Y = yValue;

                                if (xValue > 0 && yValue > 0)
                                {
                                    int[,] xyValue = new int[12, 12];
                                    for (int i = 0; i < 12; i++)
                                    {
                                        line = filestream.ReadLine();
                                        if (line == null)
                                            break;
                                        var readingLine = line.Split(new char[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);
                                        int j = 0;
                                        foreach (string s in readingLine)
                                        {
                                            int noval = 0;
                                            int.TryParse(s, out noval);
                                            xyValue[i, j++] = noval;
                                        }
                                    }
                                    obj.infomatrix = xyValue;
                                    parsedlistfromfile.Add(obj);
                                }
                            }
                        }
                        if (file != null)
                            filestream.Close();
                    }

                }
                catch  {  }
            }
        }

3. the following function is created to convert the collection into DT





void convertToDTforDBOperation()
        {
            if (parsedlistfromfile != null & parsedlistfromfile.Count > 0)
            {
                // to monitor the processing time
                System.Diagnostics.Stopwatch sw = new System.Diagnostics.Stopwatch();
                sw.Start();
                #region Parallel processing of data for conversion

                Parallel.ForEach(parsedlistfromfile, (item, index) =>
                {
                    for (int r = 0; r < item.infomatrix.GetLength(0); r++)
                    {
                        for (int c = 0; c < item.infomatrix.GetLength(1); c++)
                        {
                           /* //Note: Parallel processing is not working on datatable. its is getting runtime error as datatable is corrupted.
                            //DataRow dr = parsedRecordListbeforeDBinsertDT.NewRow();
                            //dr["X"] = item.X;
                            //dr["Y"] = item.Y;
                            //dr["monthOn"] = r + 1;
                            //dr["yearOn"] = item.startyear +c;
                            //dr["record"] = item.infomatrix[r, c];
                            // dr["recorddate"]= new DateTime(item.startyear,r+1,1);
                            //parsedRecordListbeforeDBinsertDT.Rows.Add(dr);*/

                            parsedRecordListbeforeDBinsert.Add(new Rainfallrecord()
                            { X = item.X, Y = item.Y, monthOn = c + 1, yearOn = item.startyear + r, record = item.infomatrix[r, c]
                            , recorddate= new DateTime(item.startyear+r,c+1,1)
                            });
                        }
                    }
                });

                #endregion
                sw.Stop();

                Console.WriteLine(sw.ElapsedMilliseconds);

                //convert list to db
                parsedRecordListbeforeDBinsertDT = ConvertToDataTable(parsedRecordListbeforeDBinsert.ToList());

                #region using for loop of data for conversion
                /*
                sw.Reset();
                sw.Start();
                foreach (var record in parsedlistfromfile)
                {

                    for(int r=0;r<record.infomatrix.GetLength(0);r++)
                    {
                        for(int c=0; c<record.infomatrix.GetLength(1);c++)
                        {
                            parsedRecordListbeforeDBinsert.Add(new Rainfallrecord()
                            { X = record.X, Y = record.Y, monthOn = r + 1, yearOn = int.Parse(record.startyear) + c, record= record.infomatrix[r,c],  recorddate= new DateTime(record.startyear,r+1,1) });
                        }
                    }

                }               

                sw.Stop();
                Console.WriteLine(sw.ElapsedMilliseconds);

                */
                #endregion
            }
        }
         DataTable ConvertToDataTable<T>(IList<T> data)
        {
            PropertyDescriptorCollection properties = TypeDescriptor.GetProperties(typeof(T));
            DataTable table = new DataTable();
            foreach (PropertyDescriptor prop in properties)
                table.Columns.Add(prop.Name, Nullable.GetUnderlyingType(prop.PropertyType) ?? prop.PropertyType);
            foreach (T item in data)
            {
                DataRow row = table.NewRow();
                foreach (PropertyDescriptor prop in properties)
                {
                   row[prop.Name] = prop.GetValue(item)?? DBNull.Value ;
                }
                if(!row.ItemArray.Contains(DBNull.Value))
                    table.Rows.Add(row);
            }
            return table;
        }

4. Finally a method to call the parsing logic & the conversion


        public DataTable parseInputFile()
        {
            try
            {
                parseFile();
                convertToDTforDBOperation();


            }
            catch (IOException)
            {
                //log

            }
            catch (Exception ex)
            {

            }
            return parsedRecordListbeforeDBinsertDT;
        }

5. This is a console app, its main looks like



  static void Main(string[] args)
        {
            string mdffilepath = Directory.GetParent(Directory.GetCurrentDirectory()).Parent.Parent.FullName+ "\\RainfalPrecipitation.Utilities\\RainfallReadingDB.mdf";
            sqldbConn = "Data Source=(LocalDB)\\MSSQLLocalDB;AttachDbFilename="+ mdffilepath + ";Integrated Security=True";
             inputfilestoprocess = Directory.GetFiles(Directory.GetCurrentDirectory() + "\\..\\..\\datafiles");
            destinationTableName = "Reading";
            process();
        }

        static void process()
        {
            try
            {
                IPrecipitationData p = new PrecipitationData(inputfilestoprocess);
                DataTable dt = p.parseInputFile();

                IDAL sqldal = new DAL(sqldbConn);
                sqldal.bulkSQLInsert(dt,destinationTableName);
            }
            catch(Exception ex)
            {

            }
        }

Kindly note that the complete working solution is available in git. Click here to download the code.





 
 
 

Recent Posts

See All
Introduction WPF with .net 6

This section is going to talk about WPF in .net 6 / .net Core. A simple WPF application is demonstrated using MVVM pattern in a simpler...

 
 
 

Comments


Post: Blog2_Post

©2020 by Quest Hunting. Proudly created with Wix.com

bottom of page