Страница: 1 | 2 | 3 |
|
Вопрос: Последовательность в многопоточности
|
Добавлено: 29.08.10 13:11
|
|
Автор вопроса: Nevep
|
Подскажите в какую сторону копать. Запускается некоторое число потоков (50 к примеру), при этом нужно ограничить максимальное число одновременно работающих потоков (в примере реализовано с помощью Semaphore), при этом потоки запускаются в случайной последовательности. Как можно сделать чтобы потоки запускались последовательно? Читал про многопоточность, знаю что если запустить сразу несколько потоков, то они выполняются не в том порядке, в которм запускаются.
Private Sub Button2_Click(ByVal sender As System.Object, ByVal e As System.EventArgs) Handles Button2.Click
Dim i As Integer
_pool = New Semaphore(0, 2)
For i = 1 To 50
Dim t As New Thread(New ParameterizedThreadStart(AddressOf Worker1))
t.Start(i)
Next i
_pool.Release(2)
End Sub
Private Shared Sub Worker1(ByVal num As Object)
_pool.WaitOne()
Dim padding As Integer = Interlocked.Add(_padding, 100)
Dim fFile As Short
fFile = FreeFile()
FileOpen(fFile, num & ".txt", OpenMode.Append)
PrintLine(fFile, "Thread {0} enters the semaphore." & num)
FileClose(fFile)
Thread.Sleep(1000)
_pool.Release()
End Sub
Ответить
|
Номер ответа: 5 Автор ответа: Nevep
Вопросов: 6 Ответов: 25
|
Профиль | | #5
|
Добавлено: 29.08.10 14:23
|
Взять и скопипастить пример - много ума не надо.. гораздо сложнее понять как он работает.. Ты создаешь 50 ЗАБЛОКИРОВАННЫХ потоков.. и ни один из них не будет работать пока ты не вызовешь _pool.Release(2)
Как пример работает я разобрался. Ну попробуй запусти мой пример. всё работает.
Кроме того FileOpen - это аццтой.. в НЕТ так не делается.. читай про работу с файлами..
это просто для примера...в Worker1 будет несколько гет и пост запросов...а в цикле For i = 1 To 50 будут перебираться ссылки.
Я мог бы рассказать тебе про синхронизацию потоков и т.д... но судя по всему ты этого не осилишь
С потоками недавно начал работать, поэтому и спрашиваю в какую сторону копать чтобы последовательно выполнялись потоки и при этом можно было ограничивать максимальное кол-во запущенных.
Может можно как-нибудь узнать что поток отработал (или узнать сколько потоков в данный момент выполняются), и запускать не сразу 50, а запустить поток-менеджер, который будет мониторить количество отработанных потоков и в случае если потоку-менеджеру отрапортовали что поток завершился, то он запускает следующий?
Ответить
|
Номер ответа: 6 Автор ответа: EROS
Вопросов: 58 Ответов: 4255
|
Профиль | | #6
|
Добавлено: 29.08.10 14:33
|
Ну попробуй запусти мой пример. всё работает.
мне не надо запускать.. я и так вижу то, что твой пример работает через одно место..
в какую сторону копать чтобы последовательно выполнялись потоки
Тебе не надо чтоб они последовательно выполнялись.. Самое просто решение в твоем случае это создать очередь, заполнить ее ссылками в нужном порядке а потоки будут брать ссылку из очереди и выполнять нужные запросы..
Может можно как-нибудь узнать что поток отработал
Тебе ничто не мешает в конце работы потока кидать какое нибудь событие
узнать сколько потоков в данный момент выполняются
Надо завести переменную и с помощью того же Interlocker при старте потока увеличивать значение, при завершении уменьшать.. Таким образом эта переменная всегда будет показывать количество работающих потоков (но оно не привысит значение установленное в семафоре)
и запускать не сразу 50, а запустить поток-менеджер, который будет мониторить количество отработанных потоков и в случае если потоку-менеджеру отрапортовали что поток завершился, то он запускает следующий?
Эти функции у тебя как раз и будет выполнять семафор.. а порядок выполнения определишь в созданной и заполненной очереди..
Ответить
|
Номер ответа: 11 Автор ответа: Artyom
Разработчик
Вопросов: 130 Ответов: 6602
|
Профиль | | #11
|
Добавлено: 30.08.10 05:43
|
На практике все очень интересно.
Вот тебе пример. Есть колекция, изначально забитая некими данными.
Один поток постоянно дописывает данные, 10 потоков считывают.
- private static Queue<string> queue = new Queue<string>();
-
- static void Main()
- {
- for (int i=0;i<10000;i++)
- queue.Enqueue("hello");
-
- System.Threading.Thread writeThread = new System.Threading.Thread(WriteThread);
- writeThread.Start();
-
- for (int i = 0; i < 10; i++)
- {
- System.Threading.Thread readThread = new System.Threading.Thread(ReadThread);
- readThread.Start();
- }
- Console.ReadLine();
- }
-
- private static void WriteThread()
- {
- while (true)
- {
- queue.Enqueue("hello");
- }
- }
-
- private static void ReadThread()
- {
- while (true)
- {
- try
- {
- string text = queue.Dequeue();
- }
- catch (Exception ex)
- {
- Console.WriteLine(ex);
- }
- }
- }
Запускаем. Почти сразу начинают падать в консоль сообщения
- System.InvalidOperationException: Queue empty.
Причина понятна - потоки выбрали все данные из очереди, новые не успели добавиться.
Добавим перед чтением проверку, есть ли в очереди данные, и если нету, то делаем задержку и повторяем попытку
- private static void ReadThread()
- {
- while (true)
- {
- try
- {
- if (queue.Count > 0)
- {
- string text = queue.Dequeue();
- }
- else
- System.Threading.Thread.Sleep(50);
- }
- catch (Exception ex)
- {
- Console.WriteLine(ex.ToString());
- }
- }
- }
Конец немного предсказуем
- System.InvalidOperationException: Queue empty.
Причина уже интереснее, и она состоит в том что после того как поток A проверил, есть ли в очереди элемент, поток B этот элемент из нее забрал. В результате поток A падает при попытке считать данные из пустой очереди.
Чтоб этого избежать, нужно использовать критическую секцию.
private static void ReadThread()
{
while (true)
{
try
{
bool hasData = false;
string text;
lock (((ICollection)queue).SyncRoot)
{
if (queue.Count > 0)
{
hasData = true;
text = queue.Dequeue();
}
}
if (!hasData)
System.Threading.Thread.Sleep(50);
}
catch (Exception ex)
{
Console.WriteLine(ex.ToString());
}
}
}
Теперь процедура чтения работает полностью корректно и в этом месте код не будет падать.
Но теперь начинается самое интересное.
Исключения начинают падать в потоке записи на строчке
queue.Enqueue("hello"
Нужно заметить что по документации никаких исключений для метода Enqueue не задекларировано. И самое разумное чего можно ожидать - это OutOfMemoryException. Но мы получаем совсем другие исключения:
- System.IndexOutOfRangeException was unhandled
- Message=Index was outside the bounds of the array.
- Source=System
- StackTrace:
- at System.Collections.Generic.Queue`1.Enqueue(T item)
- at ConsoleApplication9.Program.WriteThread() in H:\Work\Projects\Test\ConsoleApplication9\Program.cs:line 34
- at System.Threading.ThreadHelper.ThreadStart_Context(Object state)
- at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean ignoreSyncCtx)
- at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state)
- at System.Threading.ThreadHelper.ThreadStart()
- InnerException:
- System.ArgumentException was unhandled
- Message=Source array was not long enough. Check srcIndex and length, and the array
- Source=mscorlib
- ParamName=""
- StackTrace:
- at System.Array.Copy(Array sourceArray, Int32 sourceIndex, Array destinationArray, Int32 destinationIndex, Int32 length, Boolean reliable)
- at System.Collections.Generic.Queue`1.SetCapacity(Int32 capacity)
- at System.Collections.Generic.Queue`1.Enqueue(T item)
- at ConsoleApplication9.Program.WriteThread() in H:\Work\Projects\Test\ConsoleApplication9\Program.cs:line 35
- at System.Threading.ThreadHelper.ThreadStart_Context(Object state)
- at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean ignoreSyncCtx)
- at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state)
- at System.Threading.ThreadHelper.ThreadStart()
- InnerException:
Исключения падают абсолютно от балды, а вернее от того что два потока одновременно выполняют несогласованые модификации (добавление и извлечение элемента - это модификация очереди).
Поэтому критическую секцию нужно добавить и для потока записи.
В результате код будет выглядеть таким образом
- private static void WriteThread()
- {
- while (true)
- {
- lock (((ICollection)queue).SyncRoot)
- {
- queue.Enqueue("hello");
- }
- }
- }
-
- private static void ReadThread()
- {
- while (true)
- {
- try
- {
- bool hasData = false;
- string text;
-
- lock (((ICollection)queue).SyncRoot)
- {
- if (queue.Count > 0)
- {
- hasData = true;
- text = queue.Dequeue();
- }
- }
-
- if (!hasData)
- System.Threading.Thread.Sleep(50);
- }
- catch (Exception ex)
- {
- Console.WriteLine(ex.ToString());
- }
- }
- }
Теперь код работает абсолютно корректно и нигде ничего не падает.
Но возникает резонный вопрос - в каком же контексте этот класс можно назвать потокобезопасным????
Ответить
|
Номер ответа: 13 Автор ответа: Artyom
Разработчик
Вопросов: 130 Ответов: 6602
|
Профиль | | #13
|
Добавлено: 30.08.10 06:04
|
А теперь гордость Microsoft класс BlockingCollection. Несмотря на то, что в классе есть слово Collection, это не должно смущать, так как он по умолчанию работает именно как очередь. Но можно его использовать как стек, а также можно добавить собственный механизм порядка обрабокти элементов.
Особенностью этого класса является то, что если какая-то операция не может быть выполнена (потому что в коллекция пустая, или наоборот, переполнена), то поток подвисает до того как операцию можно будет выполнить.
По сути этот класс является готовой основой для полностью потокобезопасной очереди сообщений - можно абсолютно свободно ложить сообщения в эту очередь из любых потоков, а также запустить любое количество рабочих потоков, которые будут считывать сообщения из очереди и обрабатывать их, и при этом не написав ни одной строчки для синхронизации.
- private static BlockingCollection<string> queue = new BlockingCollection<string>();
-
- private static void WriteThread()
- {
- while (true)
- {
- queue.Add("hello");
- }
- }
-
- private static void ReadThread()
- {
- while (true)
- {
- try
- {
- string text;
-
- if (!queue.TryTake(out text))
- return;
- }
- catch (Exception ex)
- {
- Console.WriteLine(ex.ToString());
- }
- }
- }
Интересный момент - эту очередь можно пометить как завершенную, вызвав метод CompleteAdding. Это значит что добавление данных в очередь завершено и новые данные добавляться не будут, все потоки, которые ждали данных, вызвав TryTake, получат false, и это значит что потоку нужно завершить свою работу - и не нужно варварскими способами убивать потоки, или передавть им флаги для завершения, как это делалось раньше.
Вот такие классы в моем понимании можно назвать потокобезопасными.
Ответить
|
Страница: 1 | 2 | 3 |
Поиск по форуму