Visual Basic, .NET, ASP, VBScript
 

   
   
     

Форум - Общий форум

Страница: 1 | 2 | 3 |

 

  Вопрос: Последовательность в многопоточности Добавлено: 29.08.10 13:11  

Автор вопроса:  Nevep
Подскажите в какую сторону копать. Запускается некоторое число потоков (50 к примеру), при этом нужно ограничить максимальное число одновременно работающих потоков (в примере реализовано с помощью Semaphore), при этом потоки запускаются в случайной последовательности. Как можно сделать чтобы потоки запускались последовательно? Читал про многопоточность, знаю что если запустить сразу несколько потоков, то они выполняются не в том порядке, в которм запускаются.
Private Sub Button2_Click(ByVal sender As System.Object, ByVal e As System.EventArgs) Handles Button2.Click
        Dim i As Integer
        _pool = New Semaphore(0, 2)
        For i = 1 To 50
            Dim t As New Thread(New ParameterizedThreadStart(AddressOf Worker1))
            t.Start(i)
        Next i
        _pool.Release(2)
    End Sub
    Private Shared Sub Worker1(ByVal num As Object)
        _pool.WaitOne()
        Dim padding As Integer = Interlocked.Add(_padding, 100)
        Dim fFile As Short
        fFile = FreeFile()
        FileOpen(fFile, num & ".txt", OpenMode.Append)
        PrintLine(fFile, "Thread {0} enters the semaphore." & num)
        FileClose(fFile)
        Thread.Sleep(1000)
        _pool.Release()
    End Sub

Ответить

  Ответы Всего ответов: 36  

Номер ответа: 1
Автор ответа:
 EROS



Вопросов: 58
Ответов: 4255
 Профиль | | #1 Добавлено: 29.08.10 13:46
Читал про многопоточность, знаю что если запустить сразу несколько потоков, то они выполняются не в том порядке, в которм запускаются.

На то они и потоки что у них нету никакого порядка.. они работают каждый сам по себе..
New Semaphore(0, 2)

ты где этот пример брал? Ты создаешь семафор без единого свободного слота..!!! Потом запускаешь все потоки (они тупо висят и ждут свободных слотов) а потом, после запуска, осбождаешь все слоты.. ваще фигня какая то.. (((

Ответить

Номер ответа: 2
Автор ответа:
 Nevep



Вопросов: 6
Ответов: 25
 Профиль | | #2 Добавлено: 29.08.10 13:52
На то они и потоки что у них нету никакого порядка.. они работают каждый сам по себе.

Вот и спрашиваю про алгоритм, про то как такое сделать...а пример выше работает на ура. создаёт по 2 файла каждую секунду.
  1.    Private Shared _pool As Semaphore
  2.     Private Shared _padding As Integer
  3. Private Sub Button2_Click(ByVal sender As System.Object, ByVal e As System.EventArgs) Handles Button2.Click
  4.         Dim i As Integer
  5.         _pool = New Semaphore(0, 2)
  6.         For i = 1 To 50
  7.             Dim t As New Thread(New ParameterizedThreadStart(AddressOf Worker1))
  8.             t.Start(i)
  9.         Next i
  10.         _pool.Release(2)
  11.     End Sub
  12.     Private Shared Sub Worker1(ByVal num As Object)
  13.         _pool.WaitOne()
  14.         Dim padding As Integer = Interlocked.Add(_padding, 100)
  15.         Dim fFile As Short
  16.         fFile = FreeFile()
  17.         FileOpen(fFile, num & ".txt", OpenMode.Append)
  18.         PrintLine(fFile, "Thread {0} enters the semaphore." & num)
  19.         FileClose(fFile)
  20.         Thread.Sleep(1000)
  21.         _pool.Release()
  22.     End Sub

Ответить

Номер ответа: 3
Автор ответа:
 Nevep



Вопросов: 6
Ответов: 25
 Профиль | | #3 Добавлено: 29.08.10 13:53
Брал на сайте майкрософта http://msdn.microsoft.com/ru-ru/library/system.threading.semaphore%28VS.90%29.aspx

Ответить

Номер ответа: 4
Автор ответа:
 EROS



Вопросов: 58
Ответов: 4255
 Профиль | | #4 Добавлено: 29.08.10 14:09
Взять и скопипастить пример - много ума не надо.. гораздо сложнее понять как он работает.. Ты создаешь 50 ЗАБЛОКИРОВАННЫХ потоков.. и ни один из них не будет работать пока ты не вызовешь _pool.Release(2). Ты скопипастил пример совершенно не понимая его сути..
Кроме того FileOpen - это аццтой.. в НЕТ так не делается.. читай про работу с файлами..
И последнее.. зачем тебе понадобились потоки и тем более их последовательное выполнение? Что ты вообще пытаешься сделать?
Я мог бы рассказать тебе про синхронизацию потоков и т.д... но судя по всему ты этого не осилишь, во всяком случае не сейчас.. Гораздо проще выяснить ЧТО ты хочешь и попробовать подсказать КАК это сделать..

Ответить

Номер ответа: 5
Автор ответа:
 Nevep



Вопросов: 6
Ответов: 25
 Профиль | | #5 Добавлено: 29.08.10 14:23
Взять и скопипастить пример - много ума не надо.. гораздо сложнее понять как он работает.. Ты создаешь 50 ЗАБЛОКИРОВАННЫХ потоков.. и ни один из них не будет работать пока ты не вызовешь _pool.Release(2)

:) Как пример работает я разобрался. Ну попробуй запусти мой пример. всё работает.
Кроме того FileOpen - это аццтой.. в НЕТ так не делается.. читай про работу с файлами..

это просто для примера...в Worker1 будет несколько гет и пост запросов...а в цикле For i = 1 To 50 будут перебираться ссылки.
Я мог бы рассказать тебе про синхронизацию потоков и т.д... но судя по всему ты этого не осилишь

С потоками недавно начал работать, поэтому и спрашиваю в какую сторону копать чтобы последовательно выполнялись потоки и при этом можно было ограничивать максимальное кол-во запущенных.
Может можно как-нибудь узнать что поток отработал (или узнать сколько потоков в данный момент выполняются), и запускать не сразу 50, а запустить поток-менеджер, который будет мониторить количество отработанных потоков и в случае если потоку-менеджеру отрапортовали что поток завершился, то он запускает следующий?

Ответить

Номер ответа: 6
Автор ответа:
 EROS



Вопросов: 58
Ответов: 4255
 Профиль | | #6 Добавлено: 29.08.10 14:33
Ну попробуй запусти мой пример. всё работает.

мне не надо запускать.. я и так вижу то, что твой пример работает через одно место..
в какую сторону копать чтобы последовательно выполнялись потоки

Тебе не надо чтоб они последовательно выполнялись.. Самое просто решение в твоем случае это создать очередь, заполнить ее ссылками в нужном порядке а потоки будут брать ссылку из очереди и выполнять нужные запросы..
Может можно как-нибудь узнать что поток отработал

Тебе ничто не мешает в конце работы потока кидать какое нибудь событие
узнать сколько потоков в данный момент выполняются

Надо завести переменную и с помощью того же Interlocker при старте потока увеличивать значение, при завершении уменьшать.. Таким образом эта переменная всегда будет показывать количество работающих потоков (но оно не привысит значение установленное в семафоре)
и запускать не сразу 50, а запустить поток-менеджер, который будет мониторить количество отработанных потоков и в случае если потоку-менеджеру отрапортовали что поток завершился, то он запускает следующий?

Эти функции у тебя как раз и будет выполнять семафор.. а порядок выполнения определишь в созданной и заполненной очереди..

Ответить

Номер ответа: 7
Автор ответа:
 Artyom



Разработчик

Вопросов: 130
Ответов: 6602
 Профиль | | #7 Добавлено: 29.08.10 14:44
Вообще если тебе надо ограничить кол-во одновременно выполняемых запросов, то это вообще не обязательно так сложно делать.
Берешь BlockingCollection, заполняешь ее данными, запускаешь нужное кол-во потоков (если тебе нужно чтоб одновременно было не более 2 запросов, то 2 потока).
Потоки берут из BlockingCollection данные для выполнения нового запроса, делают запрос, сохраняют куда-то результат, переходят к следующему элементу. Когда элементы кончаются, поток завершается.
Никакой синхронизации не нужно (BlockingCollection полностью безопасен). Ну только при сохранении нужно будет синхронизировать.

Ответить

Номер ответа: 8
Автор ответа:
 EROS



Вопросов: 58
Ответов: 4255
 Профиль | | #8 Добавлено: 29.08.10 14:47
Берешь BlockingCollection

Ты забыл добавить что для этого ему понадобится FW 4.0
Кроме того, очередь тоже является потокобезопасной..

Ответить

Номер ответа: 9
Автор ответа:
 Artyom



Разработчик

Вопросов: 130
Ответов: 6602
 Профиль | | #9 Добавлено: 29.08.10 20:34
Гм, это ты называешь потокобезопасностью? Необходимость поостоянно работать через Synchronized?..
Посмотри что может BlockingCollection, увидишь что на самом деле значит потокобезопасный класс :)

Кстати в FCL до сих пор нет списка, который бы можно было использовать в многопоточной среде вместе с IEnumerable. А у меня есть :-P

Ответить

Номер ответа: 10
Автор ответа:
 EROS



Вопросов: 58
Ответов: 4255
 Профиль | | #10 Добавлено: 30.08.10 02:08
Необходимость поостоянно работать через Synchronized?..

С очередью??? Ты ничего не путаешь? Если я не ошибаюсь, очередь - один из 3 полностью потокобезопасных классов ...

Ответить

Номер ответа: 11
Автор ответа:
 Artyom



Разработчик

Вопросов: 130
Ответов: 6602
 Профиль | | #11 Добавлено: 30.08.10 05:43
На практике все очень интересно.

Вот тебе пример. Есть колекция, изначально забитая некими данными.
Один поток постоянно дописывает данные, 10 потоков считывают.

  1.         private static Queue<string> queue = new Queue<string>();
  2.  
  3.         static void Main()
  4.         {
  5.             for (int i=0;i<10000;i++)
  6.                 queue.Enqueue("hello");
  7.  
  8.             System.Threading.Thread writeThread = new System.Threading.Thread(WriteThread);
  9.             writeThread.Start();
  10.  
  11.             for (int i = 0; i < 10; i++)
  12.             {
  13.                 System.Threading.Thread readThread = new System.Threading.Thread(ReadThread);
  14.                 readThread.Start();
  15.             }
  16.             Console.ReadLine();
  17.         }
  18.  
  19.         private static void WriteThread()
  20.         {
  21.             while (true)
  22.             {
  23.                 queue.Enqueue("hello");
  24.             }
  25.         }
  26.  
  27.         private static void ReadThread()
  28.         {
  29.             while (true)
  30.             {
  31.                 try
  32.                 {
  33.                     string text = queue.Dequeue();
  34.                 }
  35.                 catch (Exception ex)
  36.                 {
  37.                     Console.WriteLine(ex);
  38.                 }
  39.             }
  40.         }


Запускаем. Почти сразу начинают падать в консоль сообщения
  1. System.InvalidOperationException: Queue empty.

Причина понятна - потоки выбрали все данные из очереди, новые не успели добавиться.

Добавим перед чтением проверку, есть ли в очереди данные, и если нету, то делаем задержку и повторяем попытку
  1.         private static void ReadThread()
  2.         {
  3.             while (true)
  4.             {
  5.                 try
  6.                 {
  7.                     if (queue.Count > 0)
  8.                     {
  9.                         string text = queue.Dequeue();
  10.                     }
  11.                     else
  12.                         System.Threading.Thread.Sleep(50);
  13.                 }
  14.                 catch (Exception ex)
  15.                 {
  16.                     Console.WriteLine(ex.ToString());
  17.                 }
  18.             }
  19.         }

Конец немного предсказуем
  1. System.InvalidOperationException: Queue empty.


Причина уже интереснее, и она состоит в том что после того как поток A проверил, есть ли в очереди элемент, поток B этот элемент из нее забрал. В результате поток A падает при попытке считать данные из пустой очереди.

Чтоб этого избежать, нужно использовать критическую секцию.

        private static void ReadThread()
        {
            while (true)
            {
                try
                {
                    bool hasData = false;
                    string text;

                    lock (((ICollection)queue).SyncRoot)
                    {
                        if (queue.Count > 0)
                        {
                            hasData = true;
                            text = queue.Dequeue();
                        }
                    }

                    if (!hasData)
                        System.Threading.Thread.Sleep(50);
                }
                catch (Exception ex)
                {
                    Console.WriteLine(ex.ToString());
                }
            }
        }

Теперь процедура чтения работает полностью корректно и в этом месте код не будет падать.
Но теперь начинается самое интересное.

Исключения начинают падать в потоке записи на строчке
    queue.Enqueue("hello";);
Нужно заметить что по документации никаких исключений для метода Enqueue не задекларировано. И самое разумное чего можно ожидать - это OutOfMemoryException. Но мы получаем совсем другие исключения:

  1. System.IndexOutOfRangeException was unhandled
  2.   Message=Index was outside the bounds of the array.
  3.   Source=System
  4.   StackTrace:
  5.        at System.Collections.Generic.Queue`1.Enqueue(T item)
  6.        at ConsoleApplication9.Program.WriteThread() in H:\Work\Projects\Test\ConsoleApplication9\Program.cs:line 34
  7.        at System.Threading.ThreadHelper.ThreadStart_Context(Object state)
  8.        at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean ignoreSyncCtx)
  9.        at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state)
  10.        at System.Threading.ThreadHelper.ThreadStart()
  11.   InnerException:


  1. System.ArgumentException was unhandled
  2.   Message=Source array was not long enough. Check srcIndex and length, and the array's lower bounds.
  3.   Source=mscorlib
  4.   ParamName=""
  5.   StackTrace:
  6.        at System.Array.Copy(Array sourceArray, Int32 sourceIndex, Array destinationArray, Int32 destinationIndex, Int32 length, Boolean reliable)
  7.        at System.Collections.Generic.Queue`1.SetCapacity(Int32 capacity)
  8.        at System.Collections.Generic.Queue`1.Enqueue(T item)
  9.        at ConsoleApplication9.Program.WriteThread() in H:\Work\Projects\Test\ConsoleApplication9\Program.cs:line 35
  10.        at System.Threading.ThreadHelper.ThreadStart_Context(Object state)
  11.        at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean ignoreSyncCtx)
  12.        at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state)
  13.        at System.Threading.ThreadHelper.ThreadStart()
  14.   InnerException:


Исключения падают абсолютно от балды, а вернее от того что два потока одновременно выполняют несогласованые модификации (добавление и извлечение элемента - это модификация очереди).
Поэтому критическую секцию нужно добавить и для потока записи.
В результате код будет выглядеть таким образом

  1.         private static void WriteThread()
  2.         {
  3.             while (true)
  4.             {
  5.                 lock (((ICollection)queue).SyncRoot)
  6.                 {
  7.                     queue.Enqueue("hello");
  8.                 }
  9.             }
  10.         }
  11.  
  12.         private static void ReadThread()
  13.         {
  14.             while (true)
  15.             {
  16.                 try
  17.                 {
  18.                     bool hasData = false;
  19.                     string text;
  20.  
  21.                     lock (((ICollection)queue).SyncRoot)
  22.                     {
  23.                         if (queue.Count > 0)
  24.                         {
  25.                             hasData = true;
  26.                             text = queue.Dequeue();
  27.                         }
  28.                     }
  29.  
  30.                     if (!hasData)
  31.                         System.Threading.Thread.Sleep(50);
  32.                 }
  33.                 catch (Exception ex)
  34.                 {
  35.                     Console.WriteLine(ex.ToString());
  36.                 }
  37.             }
  38.         }


Теперь код работает абсолютно корректно и нигде ничего не падает.

Но возникает резонный вопрос - в каком же контексте этот класс можно назвать потокобезопасным????

Ответить

Номер ответа: 12
Автор ответа:
 Artyom



Разработчик

Вопросов: 130
Ответов: 6602
 Профиль | | #12 Добавлено: 30.08.10 05:48
А теперь посмотрим как работают действительно потокобезопасные классы.
Для примера возьмем System.Collections.Concurrent.ConcurrentQueue<T> из 4.0

Не привожу код инициализации, так как он не отличается, только рабочий код:

  1.         private static ConcurrentQueue<string> queue = new ConcurrentQueue<string>();
  2.         private static void WriteThread()
  3.         {
  4.             while (true)
  5.             {
  6.                 queue.Enqueue("hello");
  7.             }
  8.         }
  9.  
  10.         private static void ReadThread()
  11.         {
  12.             while (true)
  13.             {
  14.                 try
  15.                 {
  16.                     string text;
  17.  
  18.                     if (!queue.TryDequeue(out text))
  19.                         System.Threading.Thread.Sleep(50);
  20.                 }
  21.                 catch (Exception ex)
  22.                 {
  23.                     Console.WriteLine(ex.ToString());
  24.                 }
  25.             }
  26.         }


Все работает из коробки, не нужно ничего допиливать

Ответить

Номер ответа: 13
Автор ответа:
 Artyom



Разработчик

Вопросов: 130
Ответов: 6602
 Профиль | | #13 Добавлено: 30.08.10 06:04
А теперь гордость Microsoft класс BlockingCollection. Несмотря на то, что в классе есть слово Collection, это не должно смущать, так как он по умолчанию работает именно как очередь. Но можно его использовать как стек, а также можно добавить собственный механизм порядка обрабокти элементов.

Особенностью этого класса является то, что если какая-то операция не может быть выполнена (потому что в коллекция пустая, или наоборот, переполнена), то поток подвисает до того как операцию можно будет выполнить.

По сути этот класс является готовой основой для полностью потокобезопасной очереди сообщений - можно абсолютно свободно ложить сообщения в эту очередь из любых потоков, а также запустить любое количество рабочих потоков, которые будут считывать сообщения из очереди и обрабатывать их, и при этом не написав ни одной строчки для синхронизации.

  1.         private static BlockingCollection<string> queue = new BlockingCollection<string>();
  2.  
  3.         private static void WriteThread()
  4.         {
  5.             while (true)
  6.             {
  7.                 queue.Add("hello");
  8.             }
  9.         }
  10.  
  11.         private static void ReadThread()
  12.         {
  13.             while (true)
  14.             {
  15.                 try
  16.                 {
  17.                     string text;
  18.  
  19.                     if (!queue.TryTake(out text))
  20.                         return;
  21.                 }
  22.                 catch (Exception ex)
  23.                 {
  24.                     Console.WriteLine(ex.ToString());
  25.                 }
  26.             }
  27.         }


Интересный момент - эту очередь можно пометить как завершенную, вызвав метод CompleteAdding. Это значит что добавление данных в очередь завершено и новые данные добавляться не будут, все потоки, которые ждали данных, вызвав TryTake, получат false, и это значит что потоку нужно завершить свою работу - и не нужно варварскими способами убивать потоки, или передавть им флаги для завершения, как это делалось раньше.

Вот такие классы в моем понимании можно назвать потокобезопасными.

Ответить

Номер ответа: 14
Автор ответа:
 Artyom



Разработчик

Вопросов: 130
Ответов: 6602
 Профиль | | #14 Добавлено: 30.08.10 06:06
Также можно ограничить размер BlockingCollection, указав максимальное количество элементов, которые могут в ней храниться. И в результате подвисать будут потоки, которые вызвыали метод Add в случаях, когда кол-во элементов в очереди достигло максимального указанного, а потоки чтения не успевают забирать даныне из очереди. И буду висеть, пока в очереди не появится свободное место для новых элементов

Ответить

Номер ответа: 15
Автор ответа:
 EROS



Вопросов: 58
Ответов: 4255
 Профиль | | #15 Добавлено: 30.08.10 15:59
в каком же контексте этот класс можно назвать потокобезопасным????

С критической секцией - это баян, прописная истина если хочешь..
а потокобезопасный в том плане что мы никогда не получим Cross-Thread Excepthion или, что чаще всего бывает с другими коллекциями, Collection was modified
А юзать FW 4.0 только ради BlockingCollection, это уж слишком.. я уж лучше по старинке.. с критическими секциями..

Ответить

Страница: 1 | 2 | 3 |

Поиск по форуму



© Copyright 2002-2011 VBNet.RU | Пишите нам