Вопрос: Последовательность в многопоточности | Добавлено: 29.08.10 13:11 |
Автор вопроса: ![]() |
Подскажите в какую сторону копать. Запускается некоторое число потоков (50 к примеру), при этом нужно ограничить максимальное число одновременно работающих потоков (в примере реализовано с помощью Semaphore), при этом потоки запускаются в случайной последовательности. Как можно сделать чтобы потоки запускались последовательно? Читал про многопоточность, знаю что если запустить сразу несколько потоков, то они выполняются не в том порядке, в которм запускаются.
Private Sub Button2_Click(ByVal sender As System.Object, ByVal e As System.EventArgs) Handles Button2.Click
Dim i As Integer _pool = New Semaphore(0, 2) For i = 1 To 50 Dim t As New Thread(New ParameterizedThreadStart(AddressOf Worker1)) t.Start(i) Next i _pool.Release(2) End Sub Private Shared Sub Worker1(ByVal num As Object) _pool.WaitOne() Dim padding As Integer = Interlocked.Add(_padding, 100) Dim fFile As Short fFile = FreeFile() FileOpen(fFile, num & ".txt", OpenMode.Append) PrintLine(fFile, "Thread {0} enters the semaphore." & num) FileClose(fFile) Thread.Sleep(1000) _pool.Release() End Sub |
Ответы | Всего ответов: 36 |
Номер ответа: 1 Автор ответа: ![]() ![]() ![]() ![]() ![]() Вопросов: 58 Ответов: 4255 ![]() |
Профиль | Цитата | #1 | Добавлено: 29.08.10 13:46 |
Читал про многопоточность, знаю что если запустить сразу несколько потоков, то они выполняются не в том порядке, в которм запускаются.
На то они и потоки что у них нету никакого порядка.. они работают каждый сам по себе.. New Semaphore(0, 2)
ты где этот пример брал? Ты создаешь семафор без единого свободного слота..!!! Потом запускаешь все потоки (они тупо висят и ждут свободных слотов) а потом, после запуска, осбождаешь все слоты.. ваще фигня какая то.. ((( |
Номер ответа: 2 Автор ответа: ![]() ![]() ![]() ![]() Вопросов: 6 Ответов: 25 |
Профиль | Цитата | #2 | Добавлено: 29.08.10 13:52 |
На то они и потоки что у них нету никакого порядка.. они работают каждый сам по себе.
Вот и спрашиваю про алгоритм, про то как такое сделать...а пример выше работает на ура. создаёт по 2 файла каждую секунду.
|
Номер ответа: 3 Автор ответа: ![]() ![]() ![]() ![]() Вопросов: 6 Ответов: 25 |
Профиль | Цитата | #3 | Добавлено: 29.08.10 13:53 |
Брал на сайте майкрософта http://msdn.microsoft.com/ru-ru/library/system.threading.semaphore%28VS.90%29.aspx |
Номер ответа: 4 Автор ответа: ![]() ![]() ![]() ![]() ![]() Вопросов: 58 Ответов: 4255 ![]() |
Профиль | Цитата | #4 | Добавлено: 29.08.10 14:09 |
Взять и скопипастить пример - много ума не надо.. гораздо сложнее понять как он работает.. Ты создаешь 50 ЗАБЛОКИРОВАННЫХ потоков.. и ни один из них не будет работать пока ты не вызовешь _pool.Release(2). Ты скопипастил пример совершенно не понимая его сути..
Кроме того FileOpen - это аццтой.. в НЕТ так не делается.. читай про работу с файлами.. И последнее.. зачем тебе понадобились потоки и тем более их последовательное выполнение? Что ты вообще пытаешься сделать? Я мог бы рассказать тебе про синхронизацию потоков и т.д... но судя по всему ты этого не осилишь, во всяком случае не сейчас.. Гораздо проще выяснить ЧТО ты хочешь и попробовать подсказать КАК это сделать.. |
Номер ответа: 5 Автор ответа: ![]() ![]() ![]() ![]() Вопросов: 6 Ответов: 25 |
Профиль | Цитата | #5 | Добавлено: 29.08.10 14:23 |
Взять и скопипастить пример - много ума не надо.. гораздо сложнее понять как он работает.. Ты создаешь 50 ЗАБЛОКИРОВАННЫХ потоков.. и ни один из них не будет работать пока ты не вызовешь _pool.Release(2)
![]() Кроме того FileOpen - это аццтой.. в НЕТ так не делается.. читай про работу с файлами..
это просто для примера...в Worker1 будет несколько гет и пост запросов...а в цикле For i = 1 To 50 будут перебираться ссылки. Я мог бы рассказать тебе про синхронизацию потоков и т.д... но судя по всему ты этого не осилишь
С потоками недавно начал работать, поэтому и спрашиваю в какую сторону копать чтобы последовательно выполнялись потоки и при этом можно было ограничивать максимальное кол-во запущенных. Может можно как-нибудь узнать что поток отработал (или узнать сколько потоков в данный момент выполняются), и запускать не сразу 50, а запустить поток-менеджер, который будет мониторить количество отработанных потоков и в случае если потоку-менеджеру отрапортовали что поток завершился, то он запускает следующий? |
Номер ответа: 6 Автор ответа: ![]() ![]() ![]() ![]() ![]() Вопросов: 58 Ответов: 4255 ![]() |
Профиль | Цитата | #6 | Добавлено: 29.08.10 14:33 |
Ну попробуй запусти мой пример. всё работает.
мне не надо запускать.. я и так вижу то, что твой пример работает через одно место.. в какую сторону копать чтобы последовательно выполнялись потоки
Тебе не надо чтоб они последовательно выполнялись.. Самое просто решение в твоем случае это создать очередь, заполнить ее ссылками в нужном порядке а потоки будут брать ссылку из очереди и выполнять нужные запросы.. Может можно как-нибудь узнать что поток отработал
Тебе ничто не мешает в конце работы потока кидать какое нибудь событие узнать сколько потоков в данный момент выполняются
Надо завести переменную и с помощью того же Interlocker при старте потока увеличивать значение, при завершении уменьшать.. Таким образом эта переменная всегда будет показывать количество работающих потоков (но оно не привысит значение установленное в семафоре) и запускать не сразу 50, а запустить поток-менеджер, который будет мониторить количество отработанных потоков и в случае если потоку-менеджеру отрапортовали что поток завершился, то он запускает следующий?
Эти функции у тебя как раз и будет выполнять семафор.. а порядок выполнения определишь в созданной и заполненной очереди.. |
Номер ответа: 7 Автор ответа: ![]() ![]() ![]() ![]() ![]() ![]() ![]() ![]() Разработчик Вопросов: 130 Ответов: 6602 |
Профиль | Цитата | #7 | Добавлено: 29.08.10 14:44 |
Вообще если тебе надо ограничить кол-во одновременно выполняемых запросов, то это вообще не обязательно так сложно делать.
Берешь BlockingCollection, заполняешь ее данными, запускаешь нужное кол-во потоков (если тебе нужно чтоб одновременно было не более 2 запросов, то 2 потока). Потоки берут из BlockingCollection данные для выполнения нового запроса, делают запрос, сохраняют куда-то результат, переходят к следующему элементу. Когда элементы кончаются, поток завершается. Никакой синхронизации не нужно (BlockingCollection полностью безопасен). Ну только при сохранении нужно будет синхронизировать. |
Номер ответа: 8 Автор ответа: ![]() ![]() ![]() ![]() ![]() Вопросов: 58 Ответов: 4255 ![]() |
Профиль | Цитата | #8 | Добавлено: 29.08.10 14:47 |
Берешь BlockingCollection
Ты забыл добавить что для этого ему понадобится FW 4.0 Кроме того, очередь тоже является потокобезопасной.. |
Номер ответа: 9 Автор ответа: ![]() ![]() ![]() ![]() ![]() ![]() ![]() ![]() Разработчик Вопросов: 130 Ответов: 6602 |
Профиль | Цитата | #9 | Добавлено: 29.08.10 20:34 |
Гм, это ты называешь потокобезопасностью? Необходимость поостоянно работать через Synchronized?..
Посмотри что может BlockingCollection, увидишь что на самом деле значит потокобезопасный класс ![]() Кстати в FCL до сих пор нет списка, который бы можно было использовать в многопоточной среде вместе с IEnumerable. А у меня есть ![]() |
Номер ответа: 10 Автор ответа: ![]() ![]() ![]() ![]() ![]() Вопросов: 58 Ответов: 4255 ![]() |
Профиль | Цитата | #10 | Добавлено: 30.08.10 02:08 |
Необходимость поостоянно работать через Synchronized?..
С очередью??? Ты ничего не путаешь? Если я не ошибаюсь, очередь - один из 3 полностью потокобезопасных классов ... |
Номер ответа: 11 Автор ответа: ![]() ![]() ![]() ![]() ![]() ![]() ![]() ![]() Разработчик Вопросов: 130 Ответов: 6602 |
Профиль | Цитата | #11 | Добавлено: 30.08.10 05:43 |
На практике все очень интересно.
Вот тебе пример. Есть колекция, изначально забитая некими данными. Один поток постоянно дописывает данные, 10 потоков считывают.
Запускаем. Почти сразу начинают падать в консоль сообщения
Причина понятна - потоки выбрали все данные из очереди, новые не успели добавиться. Добавим перед чтением проверку, есть ли в очереди данные, и если нету, то делаем задержку и повторяем попытку
Конец немного предсказуем
Причина уже интереснее, и она состоит в том что после того как поток A проверил, есть ли в очереди элемент, поток B этот элемент из нее забрал. В результате поток A падает при попытке считать данные из пустой очереди. Чтоб этого избежать, нужно использовать критическую секцию. private static void ReadThread() { while (true) { try { bool hasData = false; string text; lock (((ICollection)queue).SyncRoot) { if (queue.Count > 0) { hasData = true; text = queue.Dequeue(); } } if (!hasData) System.Threading.Thread.Sleep(50); } catch (Exception ex) { Console.WriteLine(ex.ToString()); } } } Теперь процедура чтения работает полностью корректно и в этом месте код не будет падать. Но теперь начинается самое интересное. Исключения начинают падать в потоке записи на строчке queue.Enqueue("hello" ![]() Нужно заметить что по документации никаких исключений для метода Enqueue не задекларировано. И самое разумное чего можно ожидать - это OutOfMemoryException. Но мы получаем совсем другие исключения:
Исключения падают абсолютно от балды, а вернее от того что два потока одновременно выполняют несогласованые модификации (добавление и извлечение элемента - это модификация очереди). Поэтому критическую секцию нужно добавить и для потока записи. В результате код будет выглядеть таким образом
Теперь код работает абсолютно корректно и нигде ничего не падает. Но возникает резонный вопрос - в каком же контексте этот класс можно назвать потокобезопасным???? |
Номер ответа: 12 Автор ответа: ![]() ![]() ![]() ![]() ![]() ![]() ![]() ![]() Разработчик Вопросов: 130 Ответов: 6602 |
Профиль | Цитата | #12 | Добавлено: 30.08.10 05:48 |
А теперь посмотрим как работают действительно потокобезопасные классы.
Для примера возьмем System.Collections.Concurrent.ConcurrentQueue<T> из 4.0 Не привожу код инициализации, так как он не отличается, только рабочий код:
Все работает из коробки, не нужно ничего допиливать |
Номер ответа: 13 Автор ответа: ![]() ![]() ![]() ![]() ![]() ![]() ![]() ![]() Разработчик Вопросов: 130 Ответов: 6602 |
Профиль | Цитата | #13 | Добавлено: 30.08.10 06:04 |
А теперь гордость Microsoft класс BlockingCollection. Несмотря на то, что в классе есть слово Collection, это не должно смущать, так как он по умолчанию работает именно как очередь. Но можно его использовать как стек, а также можно добавить собственный механизм порядка обрабокти элементов.
Особенностью этого класса является то, что если какая-то операция не может быть выполнена (потому что в коллекция пустая, или наоборот, переполнена), то поток подвисает до того как операцию можно будет выполнить. По сути этот класс является готовой основой для полностью потокобезопасной очереди сообщений - можно абсолютно свободно ложить сообщения в эту очередь из любых потоков, а также запустить любое количество рабочих потоков, которые будут считывать сообщения из очереди и обрабатывать их, и при этом не написав ни одной строчки для синхронизации.
Интересный момент - эту очередь можно пометить как завершенную, вызвав метод CompleteAdding. Это значит что добавление данных в очередь завершено и новые данные добавляться не будут, все потоки, которые ждали данных, вызвав TryTake, получат false, и это значит что потоку нужно завершить свою работу - и не нужно варварскими способами убивать потоки, или передавть им флаги для завершения, как это делалось раньше. Вот такие классы в моем понимании можно назвать потокобезопасными. |
Номер ответа: 14 Автор ответа: ![]() ![]() ![]() ![]() ![]() ![]() ![]() ![]() Разработчик Вопросов: 130 Ответов: 6602 |
Профиль | Цитата | #14 | Добавлено: 30.08.10 06:06 |
Также можно ограничить размер BlockingCollection, указав максимальное количество элементов, которые могут в ней храниться. И в результате подвисать будут потоки, которые вызвыали метод Add в случаях, когда кол-во элементов в очереди достигло максимального указанного, а потоки чтения не успевают забирать даныне из очереди. И буду висеть, пока в очереди не появится свободное место для новых элементов |
Номер ответа: 15 Автор ответа: ![]() ![]() ![]() ![]() ![]() Вопросов: 58 Ответов: 4255 ![]() |
Профиль | Цитата | #15 | Добавлено: 30.08.10 15:59 |
в каком же контексте этот класс можно назвать потокобезопасным????
С критической секцией - это баян, прописная истина если хочешь.. а потокобезопасный в том плане что мы никогда не получим Cross-Thread Excepthion или, что чаще всего бывает с другими коллекциями, Collection was modified А юзать FW 4.0 только ради BlockingCollection, это уж слишком.. я уж лучше по старинке.. с критическими секциями.. |
|