当前位置:主页 > 查看内容

C#多线程处理多个队列数据的方法

发布时间:2021-04-16 00:00| 位朋友查看

简介:本文转载自微信公众号「后端Q」,作者conan。转载本文请联系后端Q公众号。 概述 多线程(multithreading),是指从软件或者硬件上实现多个线程并发执行的技术。具有多线程能力的计算机因有硬件支持而能够在同一时间执行多于一个线程,进而提升整体处理性能。具……

 

本文转载自微信公众号「后端Q」,作者conan。转载本文请联系后端Q公众号。    

概述

多线程(multithreading),是指从软件或者硬件上实现多个线程并发执行的技术。具有多线程能力的计算机因有硬件支持而能够在同一时间执行多于一个线程,进而提升整体处理性能。具有这种能力的系统包括对称多处理机、多核心处理器以及芯片级多处理或同时多线程处理器。在一个程序中,这些独立运行的程序片段叫作“线程”(Thread),利用它编程的概念就叫作“多线程处理”。

队列(Queue)代表了一个先进先出的对象集合。当您需要对各项进行先进先出的访问时,则使用队列。当您在列表中添加一项,称为入队,当您从列表中移除一项时,称为出队。

比如平常我们在处理定时任务的时候,假设就一台机器,我们不可能单线程一条一条数据的去跑,这时候就需要提高机器资源的利用率。

下面我们来介绍下,如何实现多线程+队列以提高并发处理能力。

代码实现

1、定义线程数threadNum和队列queues

  1. /// <summary> 
  2.         /// 线程总数 
  3.         /// </summary> 
  4.         private int threadNum = 4; 
  5.  
  6.         /// <summary> 
  7.         /// 总数 
  8.         /// </summary> 
  9.         private int totalCount = 0; 
  10.  
  11.         /// <summary> 
  12.         /// 已处理 
  13.         /// </summary> 
  14.         private int index = 0; 
  15.  
  16.         /// <summary> 
  17.         /// 队列 
  18.         /// </summary> 
  19.         private ConcurrentQueue<AssetRepayment> queues = new ConcurrentQueue<AssetRepayment>(); 

2、定义线程列表,往线程添加数据

  1. public void SubDeTransaction() 
  2.         { 
  3.             var list = new List<AssetRepayment>(); 
  4.             for (int i = 0; i < 1000; i++) 
  5.             { 
  6.                 list.Add(new AssetRepayment() { Title = i.ToString() + "---" + Guid.NewGuid().ToString() }); 
  7.             } 
  8.  
  9.             if (list == null || list.Count() == 0) 
  10.             { 
  11.                 Console.WriteLine("没有可执行的数据"); 
  12.                 return
  13.             } 
  14.             totalCount = list.Count
  15.             Console.WriteLine("可执行的数据:" + list.Count() + "条"); 
  16.             foreach (var item in list) 
  17.             { 
  18.                 queues.Enqueue(item); 
  19.             } 
  20.             List<Task> tasks = new List<Task>(); 
  21.             for (int i = 0; i < threadNum; i++) 
  22.             { 
  23.                 var task = Task.Run(() => 
  24.                 { 
  25.                     Process(); 
  26.                 }); 
  27.                 tasks.Add(task); 
  28.             } 
  29.             var taskList = Task.Factory.ContinueWhenAll(tasks.ToArray(), (ts) => 
  30.             { 
  31.             }); 
  32.             taskList.Wait(); 
  33.         } 

3、对线程数进行限制 for (int i = 0; i < threadNum; i++)

  1. var taskList = Task.Factory.ContinueWhenAll(tasks.ToArray(), (ts) => 
  2.             { 
  3.             }); 
  4.             taskList.Wait(); 

4、从队列取出数据进行业务处理

  1. private void Process() 
  2.         { 
  3.             while (true
  4.             { 
  5.                 var currentIndex = Interlocked.Increment(ref index); 
  6.                 AssetRepayment repayId = null
  7.                 var isExit = queues.TryDequeue(out repayId); 
  8.                 if (!isExit) 
  9.                 { 
  10.                     break; 
  11.                 } 
  12.                 try 
  13.                 { 
  14.                     Console.WriteLine(repayId.Title); 
  15.  
  16.                     Console.WriteLine(string.Format(" 共{0}条 当前第{1}条", totalCount, currentIndex)); 
  17.                 } 
  18.                 catch (Exception ex) 
  19.                 { 
  20.                     Console.WriteLine(ex); 
  21.                 } 
  22.             } 
  23.         } 

运行测试

代码地址

https://gitee.com/conanOpenSource_admin/Example


本文转载自网络,原文链接:https://mp.weixin.qq.com/s/d-SX5CU-LSxboAzGiDnUug
本站部分内容转载于网络,版权归原作者所有,转载之目的在于传播更多优秀技术内容,如有侵权请联系QQ/微信:153890879删除,谢谢!
上一篇:我撸了个内存泄漏检测工具,只用了两招 下一篇:没有了

推荐图文

  • 周排行
  • 月排行
  • 总排行

随机推荐