program story

새로운 대기열에 자동으로 기존 값을 대기열에 넣는 고정 크기 대기열

inputbox 2020. 7. 28. 08:29
반응형

새로운 대기열에 자동으로 기존 값을 대기열에 넣는 고정 크기 대기열


내가 사용하고 ConcurrentQueue목적 (종류 역사)에 전달 객체 마지막 N 잡고있다 공유 데이터 구조.

브라우저가 있고 마지막으로 탐색 한 Urls를 100 개 원한다고 가정합니다. 용량이 가득 찼을 때 (이력에서 100 주소) 새로운 항목 삽입 (인큐)시 가장 오래된 (첫 번째) 항목을 자동으로 삭제 (큐잉)하는 대기열을 원합니다.

그것을 사용하여 System.Collections어떻게 달성 할 수 있습니까?


Enqueue에서 Count를 확인한 다음 Count가 제한을 초과하면 Dequeue를 확인하는 래퍼 클래스를 작성합니다.

 public class FixedSizedQueue<T>
 {
     ConcurrentQueue<T> q = new ConcurrentQueue<T>();
     private object lockObject = new object();

     public int Limit { get; set; }
     public void Enqueue(T obj)
     {
        q.Enqueue(obj);
        lock (lockObject)
        {
           T overflow;
           while (q.Count > Limit && q.TryDequeue(out overflow)) ;
        }
     }
 }

약간의 변형을하겠습니다. FixedSizeQueue에서 Linq 확장을 사용할 수 있도록 ConcurrentQueue를 확장하십시오.

public class FixedSizedQueue<T> : ConcurrentQueue<T>
{
    private readonly object syncObject = new object();

    public int Size { get; private set; }

    public FixedSizedQueue(int size)
    {
        Size = size;
    }

    public new void Enqueue(T obj)
    {
        base.Enqueue(obj);
        lock (syncObject)
        {
            while (base.Count > Size)
            {
                T outObj;
                base.TryDequeue(out outObj);
            }
        }
    }
}

유용하다고 생각되는 사람이라면 누구나 위의 Richard Schneider의 답변을 기반으로 한 작동 코드가 있습니다.

public class FixedSizedQueue<T>
{
    readonly ConcurrentQueue<T> queue = new ConcurrentQueue<T>();

    public int Size { get; private set; }

    public FixedSizedQueue(int size)
    {
        Size = size;
    }

    public void Enqueue(T obj)
    {
        queue.Enqueue(obj);

        while (queue.Count > Size)
        {
            T outObj;
            queue.TryDequeue(out outObj);
        }
    }
}

그 가치를 위해, 여기에는 안전하고 안전하지 않은 용도로 표시된 몇 가지 방법이있는 경량 원형 버퍼가 있습니다.

public class CircularBuffer<T> : IEnumerable<T>
{
    readonly int size;
    readonly object locker;

    int count;
    int head;
    int rear;
    T[] values;

    public CircularBuffer(int max)
    {
        this.size = max;
        locker = new object();
        count = 0;
        head = 0;
        rear = 0;
        values = new T[size];
    }

    static int Incr(int index, int size)
    {
        return (index + 1) % size;
    }

    private void UnsafeEnsureQueueNotEmpty()
    {
        if (count == 0)
            throw new Exception("Empty queue");
    }

    public int Size { get { return size; } }
    public object SyncRoot { get { return locker; } }

    #region Count

    public int Count { get { return UnsafeCount; } }
    public int SafeCount { get { lock (locker) { return UnsafeCount; } } }
    public int UnsafeCount { get { return count; } }

    #endregion

    #region Enqueue

    public void Enqueue(T obj)
    {
        UnsafeEnqueue(obj);
    }

    public void SafeEnqueue(T obj)
    {
        lock (locker) { UnsafeEnqueue(obj); }
    }

    public void UnsafeEnqueue(T obj)
    {
        values[rear] = obj;

        if (Count == Size)
            head = Incr(head, Size);
        rear = Incr(rear, Size);
        count = Math.Min(count + 1, Size);
    }

    #endregion

    #region Dequeue

    public T Dequeue()
    {
        return UnsafeDequeue();
    }

    public T SafeDequeue()
    {
        lock (locker) { return UnsafeDequeue(); }
    }

    public T UnsafeDequeue()
    {
        UnsafeEnsureQueueNotEmpty();

        T res = values[head];
        values[head] = default(T);
        head = Incr(head, Size);
        count--;

        return res;
    }

    #endregion

    #region Peek

    public T Peek()
    {
        return UnsafePeek();
    }

    public T SafePeek()
    {
        lock (locker) { return UnsafePeek(); }
    }

    public T UnsafePeek()
    {
        UnsafeEnsureQueueNotEmpty();

        return values[head];
    }

    #endregion


    #region GetEnumerator

    public IEnumerator<T> GetEnumerator()
    {
        return UnsafeGetEnumerator();
    }

    public IEnumerator<T> SafeGetEnumerator()
    {
        lock (locker)
        {
            List<T> res = new List<T>(count);
            var enumerator = UnsafeGetEnumerator();
            while (enumerator.MoveNext())
                res.Add(enumerator.Current);
            return res.GetEnumerator();
        }
    }

    public IEnumerator<T> UnsafeGetEnumerator()
    {
        int index = head;
        for (int i = 0; i < count; i++)
        {
            yield return values[index];
            index = Incr(index, size);
        }
    }

    System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator()
    {
        return this.GetEnumerator();
    }

    #endregion
}

나는 Foo()/SafeFoo()/UnsafeFoo()컨벤션 을 사용하고 싶다 :

  • Foo메소드 UnsafeFoo는 기본값으로 호출 합니다.
  • UnsafeFoo 메소드는 잠금없이 상태를 자유롭게 수정하므로 안전하지 않은 다른 메소드 만 호출해야합니다.
  • SafeFoo메소드 UnsafeFoo는 잠금 내에서 메소드를 호출 합니다.

약간 장황하지만 스레드 안전하고 더 분명한 메소드의 잠금 외부에서 안전하지 않은 메소드를 호출하는 것과 같이 명백한 오류가 발생합니다.


다음은 고정 크기 대기열에 대한 설명입니다.

Count속성이에 사용될 때 동기화 오버 헤드를 피하기 위해 일반 큐를 사용합니다 ConcurrentQueue. 또한 IReadOnlyCollectionLINQ 메소드를 사용할 수 있도록 구현 합니다. 나머지는 여기의 다른 답변과 매우 비슷합니다.

[Serializable]
[DebuggerDisplay("Count = {" + nameof(Count) + "}, Limit = {" + nameof(Limit) + "}")]
public class FixedSizedQueue<T> : IReadOnlyCollection<T>
{
    private readonly Queue<T> _queue = new Queue<T>();
    private readonly object _lock = new object();

    public int Count { get { lock (_lock) { return _queue.Count; } } }
    public int Limit { get; }

    public FixedSizedQueue(int limit)
    {
        if (limit < 1)
            throw new ArgumentOutOfRangeException(nameof(limit));

        Limit = limit;
    }

    public FixedSizedQueue(IEnumerable<T> collection)
    {
        if (collection is null || !collection.Any())
           throw new ArgumentException("Can not initialize the Queue with a null or empty collection", nameof(collection));

        _queue = new Queue<T>(collection);
        Limit = _queue.Count;
    }

    public void Enqueue(T obj)
    {
        lock (_lock)
        {
            _queue.Enqueue(obj);

            while (_queue.Count > Limit)
                _queue.Dequeue();
        }
    }

    public void Clear()
    {
        lock (_lock)
            _queue.Clear();
    }

    public IEnumerator<T> GetEnumerator()
    {
        lock (_lock)
            return new List<T>(_queue).GetEnumerator();
    }

    IEnumerator IEnumerable.GetEnumerator()
    {
        return GetEnumerator();
    }
}

재미를 위해, 여기에 대부분의 주석가들의 관심사를 다루는 다른 구현이 있습니다. 특히, 스레드 안전성은 잠금없이 달성되며 구현은 랩핑 클래스에 의해 숨겨집니다.

public class FixedSizeQueue<T> : IReadOnlyCollection<T>
{
  private ConcurrentQueue<T> _queue = new ConcurrentQueue<T>();
  private int _count;

  public int Limit { get; private set; }

  public FixedSizeQueue(int limit)
  {
    this.Limit = limit;
  }

  public void Enqueue(T obj)
  {
    _queue.Enqueue(obj);
    Interlocked.Increment(ref _count);

    // Calculate the number of items to be removed by this thread in a thread safe manner
    int currentCount;
    int finalCount;
    do
    {
      currentCount = _count;
      finalCount = Math.Min(currentCount, this.Limit);
    } while (currentCount != 
      Interlocked.CompareExchange(ref _count, finalCount, currentCount));

    T overflow;
    while (currentCount > finalCount && _queue.TryDequeue(out overflow))
      currentCount--;
  }

  public int Count
  {
    get { return _count; }
  }

  public IEnumerator<T> GetEnumerator()
  {
    return _queue.GetEnumerator();
  }

  System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator()
  {
    return _queue.GetEnumerator();
  }
}

내 버전은 정상적인 Queue것의 서브 클래스 일뿐 입니다. 특별한 것은 아니지만 모든 사람이 참여하는 것을 보아도 여전히 여기에 넣을 수 있습니다. 또한 경우에 따라 대기열에서 제외 된 항목을 반환합니다.

public sealed class SizedQueue<T> : Queue<T>
{
    public int FixedCapacity { get; }
    public SizedQueue(int fixedCapacity)
    {
        this.FixedCapacity = fixedCapacity;
    }

    /// <summary>
    /// If the total number of item exceed the capacity, the oldest ones automatically dequeues.
    /// </summary>
    /// <returns>The dequeued value, if any.</returns>
    public new T Enqueue(T item)
    {
        base.Enqueue(item);
        if (base.Count > FixedCapacity)
        {
            return base.Dequeue();
        }
        return default;
    }
}

코딩의 즐거움을 위해 ' ConcurrentDeck'

public class ConcurrentDeck<T>
{
   private readonly int _size;
   private readonly T[] _buffer;
   private int _position = 0;

   public ConcurrentDeck(int size)
   {
       _size = size;
       _buffer = new T[size];
   }

   public void Push(T item)
   {
       lock (this)
       {
           _buffer[_position] = item;
           _position++;
           if (_position == _size) _position = 0;
       }
   }

   public T[] ReadDeck()
   {
       lock (this)
       {
           return _buffer.Skip(_position).Union(_buffer.Take(_position)).ToArray();
       }
   }
}

사용법 예 :

void Main()
{
    var deck = new ConcurrentDeck<Tuple<string,DateTime>>(25);
    var handle = new ManualResetEventSlim();
    var task1 = Task.Factory.StartNew(()=>{
    var timer = new System.Timers.Timer();
    timer.Elapsed += (s,a) => {deck.Push(new Tuple<string,DateTime>("task1",DateTime.Now));};
    timer.Interval = System.TimeSpan.FromSeconds(1).TotalMilliseconds;
    timer.Enabled = true;
    handle.Wait();
    }); 
    var task2 = Task.Factory.StartNew(()=>{
    var timer = new System.Timers.Timer();
    timer.Elapsed += (s,a) => {deck.Push(new Tuple<string,DateTime>("task2",DateTime.Now));};
    timer.Interval = System.TimeSpan.FromSeconds(.5).TotalMilliseconds;
    timer.Enabled = true;
    handle.Wait();
    }); 
    var task3 = Task.Factory.StartNew(()=>{
    var timer = new System.Timers.Timer();
    timer.Elapsed += (s,a) => {deck.Push(new Tuple<string,DateTime>("task3",DateTime.Now));};
    timer.Interval = System.TimeSpan.FromSeconds(.25).TotalMilliseconds;
    timer.Enabled = true;
    handle.Wait();
    }); 
    System.Threading.Thread.Sleep(TimeSpan.FromSeconds(10));
    handle.Set();
    var outputtime = DateTime.Now;
    deck.ReadDeck().Select(d => new {Message = d.Item1, MilliDiff = (outputtime - d.Item2).TotalMilliseconds}).Dump(true);
}

답변을 하나 더 추가하겠습니다. 왜 이것이 다른 사람들보다?

1) 단순성. 크기를 보장하는 것은 좋고 훌륭하지만 불필요한 문제가 발생하여 자체 문제가 발생할 수 있습니다.

2) IReadOnlyCollection을 구현하여 Linq를 사용하고 IEnumerable을 기대하는 다양한 것들에 전달할 수 있습니다.

3) 잠금이 없습니다. 위의 많은 솔루션은 잠금을 사용하며 잠금없는 컬렉션에서는 올바르지 않습니다.

4) BlockingCollection과 함께 컬렉션을 사용하려는 경우 중요한 IProducerConsumerCollection을 포함하여 ConcurrentQueue와 동일한 메소드, 속성 및 인터페이스 세트를 구현합니다.

이 구현은 TryDequeue가 실패 할 경우 예상보다 많은 항목으로 끝날 수 있지만, 발생 빈도는 필연적으로 성능을 저해하고 자체 예상치 못한 문제를 야기하는 특수 코드의 가치가없는 것 같습니다.

절대적으로 크기를 보장하려면 Prune () 또는 유사한 메소드를 구현하는 것이 가장 좋습니다. 다른 방법 (TryDequeue 포함)에서 ReaderWriterLockSlim 읽기 잠금을 사용하고 정리할 때만 쓰기 잠금을 사용할 수 있습니다.

class ConcurrentFixedSizeQueue<T> : IProducerConsumerCollection<T>, IReadOnlyCollection<T>, ICollection {
    readonly ConcurrentQueue<T> m_concurrentQueue;
    readonly int m_maxSize;

    public int Count => m_concurrentQueue.Count;
    public bool IsEmpty => m_concurrentQueue.IsEmpty;

    public ConcurrentFixedSizeQueue (int maxSize) : this(Array.Empty<T>(), maxSize) { }

    public ConcurrentFixedSizeQueue (IEnumerable<T> initialCollection, int maxSize) {
        if (initialCollection == null) {
            throw new ArgumentNullException(nameof(initialCollection));
        }

        m_concurrentQueue = new ConcurrentQueue<T>(initialCollection);
        m_maxSize = maxSize;
    }

    public void Enqueue (T item) {
        m_concurrentQueue.Enqueue(item);

        if (m_concurrentQueue.Count > m_maxSize) {
            T result;
            m_concurrentQueue.TryDequeue(out result);
        }
    }

    public void TryPeek (out T result) => m_concurrentQueue.TryPeek(out result);
    public bool TryDequeue (out T result) => m_concurrentQueue.TryDequeue(out result);

    public void CopyTo (T[] array, int index) => m_concurrentQueue.CopyTo(array, index);
    public T[] ToArray () => m_concurrentQueue.ToArray();

    public IEnumerator<T> GetEnumerator () => m_concurrentQueue.GetEnumerator();
    IEnumerator IEnumerable.GetEnumerator () => GetEnumerator();

    // Explicit ICollection implementations.
    void ICollection.CopyTo (Array array, int index) => ((ICollection)m_concurrentQueue).CopyTo(array, index);
    object ICollection.SyncRoot => ((ICollection) m_concurrentQueue).SyncRoot;
    bool ICollection.IsSynchronized => ((ICollection) m_concurrentQueue).IsSynchronized;

    // Explicit IProducerConsumerCollection<T> implementations.
    bool IProducerConsumerCollection<T>.TryAdd (T item) => ((IProducerConsumerCollection<T>) m_concurrentQueue).TryAdd(item);
    bool IProducerConsumerCollection<T>.TryTake (out T item) => ((IProducerConsumerCollection<T>) m_concurrentQueue).TryTake(out item);

    public override int GetHashCode () => m_concurrentQueue.GetHashCode();
    public override bool Equals (object obj) => m_concurrentQueue.Equals(obj);
    public override string ToString () => m_concurrentQueue.ToString();
}

글쎄 그것은 멀티 스레드 환경에서 사용될 때 위의 솔루션 중 일부가 크기를 초과 할 수 있음을 알았던 사용에 달려 있습니다. 어쨌든 내 유스 케이스는 마지막 5 개의 이벤트를 표시하고 큐에 이벤트를 쓰는 스레드와 다른 스레드가 Winform Control에 표시하는 여러 스레드가 있습니다. 그래서 이것은 내 해결책이었습니다.

편집 : 구현 내에서 이미 잠금을 사용하고 있기 때문에 실제로 ConcurrentQueue가 필요하지 않으므로 성능이 향상 될 수 있습니다.

class FixedSizedConcurrentQueue<T> 
{
    readonly Queue<T> queue = new Queue<T>();
    readonly object syncObject = new object();

    public int MaxSize { get; private set; }

    public FixedSizedConcurrentQueue(int maxSize)
    {
        MaxSize = maxSize;
    }

    public void Enqueue(T obj)
    {
        lock (syncObject)
        {
            queue.Enqueue(obj);
            while (queue.Count > MaxSize)
            {
                queue.Dequeue();
            }
        }
    }

    public T[] ToArray()
    {
        T[] result = null;
        lock (syncObject)
        {
            result = queue.ToArray();
        }

        return result;
    }

    public void Clear()
    {
        lock (syncObject)
        {
            queue.Clear();
        }
    }
}

편집 : 우리는 실제로 syncObject위의 예제에서 필요하지 않으며 함수에서 queue다시 초기화하지 않고 어쨌든 queue으로 표시되어 있기 때문에 객체를 사용할 수 있습니다 readonly.


이것은 내 대기열 버전입니다.

public class FixedSizedQueue<T> {
  private object LOCK = new object();
  ConcurrentQueue<T> queue;

  public int MaxSize { get; set; }

  public FixedSizedQueue(int maxSize, IEnumerable<T> items = null) {
     this.MaxSize = maxSize;
     if (items == null) {
        queue = new ConcurrentQueue<T>();
     }
     else {
        queue = new ConcurrentQueue<T>(items);
        EnsureLimitConstraint();
     }
  }

  public void Enqueue(T obj) {
     queue.Enqueue(obj);
     EnsureLimitConstraint();
  }

  private void EnsureLimitConstraint() {
     if (queue.Count > MaxSize) {
        lock (LOCK) {
           T overflow;
           while (queue.Count > MaxSize) {
              queue.TryDequeue(out overflow);
           }
        }
     }
  }


  /// <summary>
  /// returns the current snapshot of the queue
  /// </summary>
  /// <returns></returns>
  public T[] GetSnapshot() {
     return queue.ToArray();
  }
}

IEnumerable을 기반으로하는 생성자를 갖는 것이 유용하다는 것을 알았으며 GetSnapshot을 사용하여 호출 순간에 항목의 다중 스레드 안전 목록 (이 경우 배열)을 갖는 것이 유용하다는 것을 알았습니다. 기본 컬렉션이 변경되면 오류가 발생합니다.

이중 카운트 점검은 일부 상황에서 잠금을 방지하기위한 것입니다.

참고 URL : https://stackoverflow.com/questions/5852863/fixed-size-queue-which-automatically-dequeues-old-values-upon-new-enques

반응형