C#: использование очереди для взаимодействия потоков Печать
Добавил(а) microsin   

В приложении для обеспечения разделения данных и поддержки отзывчивости интерфейса обычно используют несколько потоков. Один из них обрабатывает интерфейс пользователя - клики на кнопках, перерисовку визуальных элементов формы, а другой (такой как например BackgroundWorker [1] или Thread [2]) выполняет фоновые операции - обмен через периферийные устройства, декодирование данных, обработку сетевого трафика и т. п. Для передачи информации между потоками используют стандартные методы ожидания события, такие как мьютексы, семафоры, очереди. В этой статье раccматриваются очереди Queue для организации блокировки потока в ожидании поступления данных.

[Класс-обертка для очереди]

Для удобства использования был создан класс TQueue на базе класса Queue. Он позволяет создавать очереди из объектов произвольного типа.

// Модуль Queue.cs
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Windows.Forms;
using System.IO.Ports;
using System.Threading;
using System.ComponentModel;
 
namespace mynamespace
{
   public class TQueue< T>
   {
      private readonly Queue< T> queue = new Queue< T>();
      private readonly int maxSize;
      public TQueue(int maxSize) { this.maxSize = maxSize; }
 
      public void Enqueue(T item)
      {
         lock (queue)
         {
            while (queue.Count >= maxSize)
            {
               queue.Clear();
            }
            queue.Enqueue(item);
            if (queue.Count == 1)
            {
               // Разбудить все, что заблокировалось
               // на очереди:
               Monitor.PulseAll(queue);
            }
         }
      }
 
      public bool Dequeue(out T item, int timeout)
      {
         item = default(T);
         lock (queue)
         {
            while (queue.Count == 0)
            {
               if (Monitor.Wait(queue, timeout) == false) return false;
            }
            item = queue.Dequeue();
            if (queue.Count == maxSize - 1)
            {
               // Разбудить все, что заблокировалось
               // на очереди:
               Monitor.PulseAll(queue);
            }
            return true;
         }
      }
 
      public int Count()
      {
         int count;
         lock (queue)
         {
            count = queue.Count;
         }
         return (count);
      }
 
      public void Clear()
      {
         lock (queue)
         {
            queue.Clear();
         }
      }
   }
}

Рассмотрим пример, в котором один поток (BackgroundWorker parserTask, находится в классе COMportReader) принимает данные из последовательного порта. Если в начале принятой строки поток находит символ #, то посылает посылает их в очередь из строк StringQueue, которая реализована как публичное свойство класса COMportReader.

namespace mynamespace
{
   public class COMportReader
   {
      // Сообщения, полученные от COM-порта:
      public TQueue< string> StringQueue;
      private BackgroundWorker parserTask;
      static bool _continue;
      log logfile;
      SerialPort _serialPort;
 
      public COMportReader(String port)
      {
         logfile = new log(this.GetType().ToString(), "logapp.txt");
         StringQueue = new TQueue< string>(20);
         _serialPort = new SerialPort();
         _serialPort.PortName = port;
         _serialPort.BaudRate = 115200;
         _serialPort.Parity = Parity.None;
         _serialPort.DataBits = 8;
         _serialPort.StopBits = StopBits.One;
         _serialPort.Handshake = Handshake.None;
         _serialPort.ReadTimeout = 500;
         _serialPort.WriteTimeout = 500;
         _serialPort.Encoding = Encoding.Default;
         _serialPort.NewLine = "\n";
         // Парсер - поток, которые получает данные от последовательного порта.
         parserTask = new BackgroundWorker();
         parserTask.DoWork += ReadData;
      }
 
      public bool IsOpen
      {
         get { return _serialPort.IsOpen; }
      }
 
      public bool Open()
      {
         bool retval = false;
         try
         {
            _serialPort.Open();
            _continue = true;
            if (!parserTask.IsBusy)
            {
               parserTask.RunWorkerAsync();
            }
            retval = true;
         }
         catch (Exception ex)
         {
            logfile.write(ex.Message.ToString());
         }
         return (retval);
      }
 
      public bool Close()
      {
         bool retval = false;
         _continue = false;
         // Если адаптер был открыт, закроем его
         if (_serialPort.IsOpen)
         {
            lock (_serialPort)
            {
               _serialPort.Close();
            }
         }
         // Если адаптер закрыт, вернем true
         if (!_serialPort.IsOpen)
         {
            retval = true;
         }
         return retval;
      }
 
      private void ReadData(object pSender, DoWorkEventArgs pEventArgs)
      {
         while (_continue)
         {
            String message = "";
            char[] charsToTrim = {'\n', ' ', '\r'};
            try
            {
               message = _serialPort.ReadLine();
               message = message.Trim(charsToTrim);
               char firstsym = message[0];
               switch (firstsym)
               {
               case '#':
                  StringQueue.Enqueue(message);
                  break;
               default:
                  logfile.write(message);
                  break;
               }
            }
            catch
            {
            }
         }
      }
   }
}

На этой очереди StringQueue блокируется другой поток BackgroundWorker, метод цикла taskDoWork которого блокируется на очереди StringQueue в ожидании поступления данных. Блокировка на очереди осуществляется вызовом StringQueue.Dequeue. Таймаут блокировки указан 100 миллисекунд. Как только в очереди оказываются новые данные, поток немедленно разблокируется до истечения таймаута.

private void taskDoWork(object pSender, DoWorkEventArgs pEventArgs)
{
   String datastr;
   while (true)
   {
      // Блокировка на очереди в течение 100 мс:
      COMportReader.StringQueue.Dequeue(out datastr, 100);
      if (null != datastr)
      {
         // Обработка данных из очереди, полученные
         // через строку datastr:
         ...
      }
   }
}

Класс TQueue позволяет создавать очередь из более сложных элементов. К примеру, в очередь можно помещать экземпляры классов. Ниже показано, как это делается на примере класса DataMessage.

// Класс DataMessage:
namespace mynamespace
{
   public enum MessageType
   {
      INFO,
      EVENT,
      ERR_TIMEOUT,
      ERR_BAD_CRC,
      ERR_UNKNOWN
   }
 
   class DataMessage
   {
      public DateTime Время;
      public String Сигнал;
      public bool РелеАктивировано;
      public MessageType ТипСообщения;
 
      // Конструктор класса, который заполняет свои поля
      // декодированными данными из datastr:
      public DataMessage(String datastr)
      {
         Время = DateTime.Now;
         Сигнал = "";
         РелеАктивировано = false;
         ТипСообщения = MessageType.ERR_UNKNOWN;
         String[] words;
         switch (datastr[1])
         {
         case 'B':
            ТипСообщения = MessageType.EVENT;
            Сигнал = datastr.Substring(3);
            break;
         case 'M':
            ТипСообщения = MessageType.EVENT;
            if ("UP" == datastr.Substring(3, 2))
            {
               words = datastr.Substring(6).Split(new char[] { '#' });
               if ("relayON" == words[0])
               {
                  РелеАктивировано = true;
               }
            }
            else if ("DN" == datastr.Substring(3, 2))
            {
               words = datastr.Substring(6).Split(new char[] { '#' });
               if ("relayOFF" == words[0])
               {
                  РелеАктивировано = false;
               }
            }
            break;
         case 'E':
            words = datastr.Substring(6).Split(new char[] { '#' });
            if ("TIME" == words[0])
               ТипСообщения = MessageType.ERR_TIMEOUT;
            else if ("CRC" == words[0])
               ТипСообщения = MessageType.ERR_BAD_CRC;
            break;
         case 'T':
            ТипСообщения = MessageType.INFO;
            Сигнал = datastr;
            break;
         default:
            logfile.write("Ошибка декодирования " + datastr);
            break;
         }
      }
   }
}
 
// Создание очереди из классов DataMessage:
DecodedMessageQueue = new TQueue< DataMessage>(20);
...
 
// Постановка сообщений в очередь:
DataMessage datamessage = new DataMessage(datastr);
DecodedMessageQueue.Enqueue(datamessage);
 
// Передача события новых данных datamessage в форму, где
// они будут отображаться на визуальных компонентах
// интерфейса приложения.
guiTask.ReportProgress(0);
...
 
// Обработчик прогресса guiTask, который имеет доступ
// в интерфейсу GUI:
void guiTaskProgressHandler(object sender, ProgressChangedEventArgs pEventArgs)
{
   DataMessage datamessage;
 
   // Блокировка на очереди в течение 1 секунды:
   DecodedMessageQueue.Dequeue(out datamessage, 1000);
   CultureInfo ci = new CultureInfo("en-us");
   switch(datamessage.ТипСообщения)
   {
   case MessageType.EVENT:
      if ("" != datamessage.Сигнал)
      {
         текст = datamessage.Сигнал;
         ДобавитьСтроку(текст);
      }
      else if (datamessage.РелеАктивировано)
      {
         ДобавитьСтрокуСостоянияРеле(datamessage.РелеАктивировано);
      }
      break;
   case MessageType.ERR_TIMEOUT:
      ДобавитьСтрокуОшибки("Таймаут");
      break;
   case MessageType.ERR_BAD_CRC:
      ДобавитьСтрокуОшибки("Не совпала контрольная сумма");
      break;
   case MessageType.INFO:
      СтрокаСтатуса.Text = datamessage.Сигнал;
      break;
   default:
      ДобавитьСтрокуОшибки("Неизвестная ошибка");
      break;
   }
}

[Ссылки]

1. Visual Studio C++ 2010 Express: BackgroundWorker Class.
2. Потоки на C#. Часть 1: введение.