专业网站建设品牌,十四年专业建站经验,服务6000+客户--广州京杭网络
免费热线:400-963-0016      微信咨询  |  联系我们

C#请求唯一性校验支持高并发的实现方法

当前位置:网站建设 > 技术支持
资料来源:网络整理       时间:2023/2/14 0:45:29       共计:3622 浏览

使用场景描述:

网络请求中经常会遇到发送的请求,服务端响应是成功的,但是返回的时候出现网络故障,导致客户端无法接收到请求结果,那么客户端程序可能判断为网络故障,而重复发送同一个请求。当然如果接口中定义了请求结果查询接口,那么这种重复会相对少一些。特别是交易类的数据,这种操作更是需要避免重复发送请求。另外一种情况是用户过于快速的点击界面按钮,产生连续的相同内容请求,那么后端也需要进行过滤,这种一般出现在系统对接上,无法去控制第三方系统的业务逻辑,需要从自身业务逻辑里面去限定。

其他需求描述:

这类请求一般存在时间范围和高并发的特点,就是短时间内会出现重复的请求,因此对模块需要支持高并发性。

技术实现:

对请求的业务内容进行MD5摘要,并且将MD5摘要存储到缓存中,每个请求数据都通过这个一个公共的调用的方法进行判断。

代码实现:

公共调用代码 UniqueCheck 采用单例模式创建唯一对象,便于在多线程调用的时候,只访问一个统一的缓存库


/*    * volatile就像大家更熟悉的const一样,volatile是一个类型修饰符(type specifier)。    * 它是被设计用来修饰被不同线程访问和修改的变量。    * 如果没有volatile,基本上会导致这样的结果:要么无法编写多线程程序,要么编译器失去大量优化的机会。    */   private static readonly object lockHelper = new object();      private volatile static UniqueCheck _instance;       /// <summary>   /// 获取单一实例   /// </summary>   /// <returns></returns>   public static UniqueCheck GetInstance()   {    if (_instance == null)    {     lock (lockHelper)     {      if (_instance == null)       _instance = new UniqueCheck();     }    }    return _instance;   }

这里需要注意volatile的修饰符,在实际测试过程中,如果没有此修饰符,在高并发的情况下会出现报错。

自定义一个可以进行并发处理队列,代码如下:ConcurrentLinkedQueue

using System; using System.Collections.Generic; using System.Text; using System.Threading;   namespace PackgeUniqueCheck {  /// <summary>  /// 非加锁并发队列,处理100个并发数以内  /// </summary>  /// <typeparam name="T"></typeparam>  public class ConcurrentLinkedQueue<T>  {   private class Node<K>   {    internal K Item;    internal Node<K> Next;      public Node(K item, Node<K> next)    {     this.Item = item;     this.Next = next;    }   }     private Node<T> _head;   private Node<T> _tail;     public ConcurrentLinkedQueue()   {    _head = new Node<T>(default(T), null);    _tail = _head;   }     public bool IsEmpty   {    get { return (_head.Next == null); }   }   /// <summary>   /// 进入队列   /// </summary>   /// <param name="item"></param>   public void Enqueue(T item)   {    Node<T> newNode = new Node<T>(item, null);    while (true)    {     Node<T> curTail = _tail;     Node<T> residue = curTail.Next;       //判断_tail是否被其他process改变     if (curTail == _tail)     {      //A 有其他process执行C成功,_tail应该指向新的节点      if (residue == null)      {       //C 其他process改变了tail节点,需要重新取tail节点       if (Interlocked.CompareExchange<Node<T>>(        ref curTail.Next, newNode, residue) == residue)       {        //D 尝试修改tail        Interlocked.CompareExchange<Node<T>>(ref _tail, newNode, curTail);        return;       }      }      else      {       //B 帮助其他线程完成D操作       Interlocked.CompareExchange<Node<T>>(ref _tail, residue, curTail);      }     }    }   }   /// <summary>   /// 队列取数据   /// </summary>   /// <param name="result"></param>   /// <returns></returns>   public bool TryDequeue(out T result)   {    Node<T> curHead;    Node<T> curTail;    Node<T> next;    while (true)    {     curHead = _head;     curTail = _tail;     next = curHead.Next;     if (curHead == _head)     {      if (next == null) //Queue为空      {       result = default(T);       return false;      }      if (curHead == curTail) //Queue处于Enqueue第一个node的过程中      {       //尝试帮助其他Process完成操作       Interlocked.CompareExchange<Node<T>>(ref _tail, next, curTail);      }      else      {       //取next.Item必须放到CAS之前       result = next.Item;       //如果_head没有发生改变,则将_head指向next并退出       if (Interlocked.CompareExchange<Node<T>>(ref _head,        next, curHead) == curHead)        break;      }     }    }    return true;   }   /// <summary>   /// 尝试获取最后一个对象   /// </summary>   /// <param name="result"></param>   /// <returns></returns>   public bool TryGetTail(out T result)   {    result = default(T);    if (_tail == null)    {     return false;    }    result = _tail.Item;    return true;   }  } } 虽然是一个非常简单的唯一性校验逻辑,但是要做到高效率,高并发支持,高可靠性,以及低内存占用,需要实现这样的需求,需要做细致的模拟测试。

using System; using System.Collections.Generic; using System.Text; using System.Threading; using System.Collections;   namespace PackgeUniqueCheck {  public class UniqueCheck  {   /*    * volatile就像大家更熟悉的const一样,volatile是一个类型修饰符(type specifier)。    * 它是被设计用来修饰被不同线程访问和修改的变量。    * 如果没有volatile,基本上会导致这样的结果:要么无法编写多线程程序,要么编译器失去大量优化的机会。    */   private static readonly object lockHelper = new object();     private volatile static UniqueCheck _instance;      /// <summary>   /// 获取单一实例   /// </summary>   /// <returns></returns>   public static UniqueCheck GetInstance()   {    if (_instance == null)    {     lock (lockHelper)     {      if (_instance == null)       _instance = new UniqueCheck();     }    }    return _instance;   }     private UniqueCheck()   {    //创建一个线程安全的哈希表,作为字典缓存    _DataKey = Hashtable.Synchronized(new Hashtable());    Queue myqueue = new Queue();    _DataQueue = Queue.Synchronized(myqueue);    _Myqueue = new ConcurrentLinkedQueue<string>();    _Timer = new Thread(DoTicket);    _Timer.Start();   }     #region 公共属性设置   /// <summary>   /// 设定定时线程的休眠时间长度:默认为1分钟   /// 时间范围:1-7200000,值为1毫秒到2小时   /// </summary>   /// <param name="value"></param>   public void SetTimeSpan(int value)   {    if (value > 0&& value <=7200000)    {     _TimeSpan = value;    }   }   /// <summary>   /// 设定缓存Cache中的最大记录条数   /// 值范围:1-5000000,1到500万   /// </summary>   /// <param name="value"></param>   public void SetCacheMaxNum(int value)   {    if (value > 0 && value <= 5000000)    {     _CacheMaxNum = value;    }   }   /// <summary>   /// 设置是否在控制台中显示日志   /// </summary>   /// <param name="value"></param>   public void SetIsShowMsg(bool value)   {    Helper.IsShowMsg = value;   }   /// <summary>   /// 线程请求阻塞增量   /// 值范围:1-CacheMaxNum,建议设置为缓存最大值的10%-20%   /// </summary>   /// <param name="value"></param>   public void SetBlockNumExt(int value)   {    if (value > 0 && value <= _CacheMaxNum)    {     _BlockNumExt = value;    }   }   /// <summary>   /// 请求阻塞时间   /// 值范围:1-max,根据阻塞增量设置请求阻塞时间   /// 阻塞时间越长,阻塞增量可以设置越大,但是请求实时响应就越差   /// </summary>   /// <param name="value"></param>   public void SetBlockSpanTime(int value)   {    if (value > 0)    {     _BlockSpanTime = value;    }   }   #endregion     #region 私有变量   /// <summary>   /// 内部运行线程   /// </summary>   private Thread _runner = null;   /// <summary>   /// 可处理高并发的队列   /// </summary>   private ConcurrentLinkedQueue<string> _Myqueue = null;   /// <summary>   /// 唯一内容的时间健值对   /// </summary>   private Hashtable _DataKey = null;   /// <summary>   /// 内容时间队列   /// </summary>   private Queue _DataQueue = null;   /// <summary>   /// 定时线程的休眠时间长度:默认为1分钟   /// </summary>   private int _TimeSpan = 3000;   /// <summary>   /// 定时计时器线程   /// </summary>   private Thread _Timer = null;   /// <summary>   /// 缓存Cache中的最大记录条数   /// </summary>   private int _CacheMaxNum = 500000;   /// <summary>   /// 线程请求阻塞增量   /// </summary>   private int _BlockNumExt = 10000;   /// <summary>   /// 请求阻塞时间   /// </summary>   private int _BlockSpanTime = 100;   #endregion     #region 私有方法   private void StartRun()   {    _runner = new Thread(DoAction);    _runner.Start();    Helper.ShowMsg("内部线程启动成功!");   }     private string GetItem()   {    string tp = string.Empty;    bool result = _Myqueue.TryDequeue(out tp);    return tp;   }   /// <summary>   /// 执行循环操作   /// </summary>   private void DoAction()   {    while (true)    {     while (!_Myqueue.IsEmpty)     {      string item = GetItem();      _DataQueue.Enqueue(item);      if (!_DataKey.ContainsKey(item))      {       _DataKey.Add(item, DateTime.Now);      }     }     //Helper.ShowMsg("当前数组已经为空,处理线程进入休眠状态...");     Thread.Sleep(2);    }   }   /// <summary>   /// 执行定时器的动作   /// </summary>   private void DoTicket()   {    while (true)    {     Helper.ShowMsg("当前数据队列个数:" + _DataQueue.Count.ToString());     if (_DataQueue.Count > _CacheMaxNum)     {      while (true)      {       Helper.ShowMsg(string.Format("当前队列数:{0},已经超出最大长度:{1},开始进行清理操作...", _DataQueue.Count, _CacheMaxNum.ToString()));       string item = _DataQueue.Dequeue().ToString();       if (!string.IsNullOrEmpty(item))       {        if (_DataKey.ContainsKey(item))        {         _DataKey.Remove(item);        }        if (_DataQueue.Count <= _CacheMaxNum)        {         Helper.ShowMsg("清理完成,开始休眠清理线程...");         break;        }       }      }     }     Thread.Sleep(_TimeSpan);    }   }     /// <summary>   /// 线程进行睡眠等待   /// 如果当前负载压力大大超出了线程的处理能力   /// 那么需要进行延时调用   /// </summary>   private void BlockThread()   {    if (_DataQueue.Count > _CacheMaxNum + _BlockNumExt)    {     Thread.Sleep(_BlockSpanTime);    }   }   #endregion     #region 公共方法   /// <summary>   /// 开启服务线程   /// </summary>   public void Start()   {    if (_runner == null)    {     StartRun();    }    else    {     if (_runner.IsAlive == false)     {      StartRun();     }    }     }   /// <summary>   /// 关闭服务线程   /// </summary>   public void Stop()   {    if (_runner != null)    {     _runner.Abort();     _runner = null;    }   }     /// <summary>   /// 添加内容信息   /// </summary>   /// <param name="item">内容信息</param>   /// <returns>true:缓存中不包含此值,队列添加成功,false:缓存中包含此值,队列添加失败</returns>   public bool AddItem(string item)   {    BlockThread();    item = Helper.MakeMd5(item);    if (_DataKey.ContainsKey(item))    {     return false;    }    else    {     _Myqueue.Enqueue(item);     return true;    }   }   /// <summary>   /// 判断内容信息是否已经存在   /// </summary>   /// <param name="item">内容信息</param>   /// <returns>true:信息已经存在于缓存中,false:信息不存在于缓存中</returns>   public bool CheckItem(string item)   {    item = Helper.MakeMd5(item);    return _DataKey.ContainsKey(item);   }   #endregion    } }

模拟测试代码:

private static string _example = Guid.NewGuid().ToString();     private static UniqueCheck _uck = null;     static void Main(string[] args)   {    _uck = UniqueCheck.GetInstance();    _uck.Start();    _uck.SetIsShowMsg(false);    _uck.SetCacheMaxNum(20000000);    _uck.SetBlockNumExt(1000000);    _uck.SetTimeSpan(6000);      _uck.AddItem(_example);    Thread[] threads = new Thread[20];      for (int i = 0; i < 20; i++)    {     threads[i] = new Thread(AddInfo);     threads[i].Start();    }      Thread checkthread = new Thread(CheckInfo);    checkthread.Start();      string value = Console.ReadLine();      checkthread.Abort();    for (int i = 0; i < 50; i++)    {     threads[i].Abort();    }    _uck.Stop();   }     static void AddInfo()   {    while (true)    {     _uck.AddItem(Guid.NewGuid().ToString());    }   }     static void CheckInfo()   {    while (true)    {     Console.WriteLine("开始时间:{0}...", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.ffff"));     Console.WriteLine("插入结果:{0}", _uck.AddItem(_example));     Console.WriteLine("结束时间:{0}", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.ffff")); //调整进程休眠时间,可以测试高并发的情况     //Thread.Sleep(1000);    }        }

测试截图:

总结

以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,谢谢大家对脚本之家的支持。

版权说明:
本网站凡注明“广州京杭 原创”的皆为本站原创文章,如需转载请注明出处!
本网转载皆注明出处,遵循行业规范,如发现作品内容版权或其它问题的,请与我们联系处理!
欢迎扫描右侧微信二维码与我们联系。
·上一条:asp.net c# 通过消息队列处理高并发请求 | ·下一条:c# parallel.for怎么控制并发数量,控制多5个并发量

Copyright © 广州京杭网络科技有限公司 2005-2025 版权所有    粤ICP备16019765号 

广州京杭网络科技有限公司 版权所有