Для своей программы мы создали такую модель: несколько источников (заказчиков), несколько исполнителей и одна очередь, которая является главным организатором работы. Дело в том, что мало создать многопоточный процесс, необходимо его ещё и правильно организовать, при параллельной работе нескольких потоков может возникать множество разнообразных проблем: исполнители могут одновременно пытаться взять одну и ту же работу, или наоборот — первый ждёт, пока второй выполнит работу, а второй ждёт, пока первый выполнит свою, в результате — все стоят. В общем смысле, все проблемы упираются в то, что в программе есть неделимые отрезки, во время выполнения которых нельзя внедряться в процесс и начинать даже пытаться делать что-нибудь другое. В жизни это может выглядеть примерно так: на первом курсе студентам читают динамику точки в курсе лекций по теоретической механике, где необходимо интегрировать дифференциальные уравнения, однако в их курсе математики до темы дифференциальные уравнения ещё очень далеко, то есть нарушена последовательность получения знаний. В связи с этим мы создали отдельный управляющий и организовывающий поток — очередь, которая следит за правильностью последовательности действий всех участников программы.
В главе всего стоит очередь. Она объявляет заказчикам, что готова получать работу (в нашем случае работа заключается просто в ожидании), заказчики отдают её всю имеющуюся у них работу, затем очередь объявляет исполнителям, что у неё есть для них работа, и последовательно раздаёт её. Затем очередь проверят, всю ли работу раздали заказчики, всю ли получили исполнители, и если это условие выполнено, она закрывается.
Цель задачи:
Моделирование параллельной/многопоточной обработки данных.
Развернутое описание задачи:
Задаётся начальный класс (WorkInfo), содержащий информацию необходимую для выполнения работы, в моем случае это время выполнения работы. Необходимо написать классы WorkSource (источник) и WorkPerformer (исполнитель), которые будут записывать и выполнять эти задания путем добавления их в очередь (WorkQueue). Экземпляры классов WorkSource и WorkPerformer должны работать каждый в своем потоке.
Алгоритм работы программы:
WorkQueue:
Информируем о начале работы очереди. Проверяем на присутствие источников, исполнителей, очереди заданий.
Информируем о запуске всех источников. Запускаем источники.
Информируем о запуске всех исполнителей. Запускаем исполнителей.
Дожидаемся конца работы исполнителей, конца работы, конца работы исполнителей.
Оповещаем о конце всей работы.
WorkPerformer:
Информируем что начал работу поток-исполнитель
Берем задание из очереди
Смотрим, есть ли на данный момент в очереди работа.
Если есть – берем.
Проверяем выполнена ли вся работа
Если выполнена – оповещаем всех исполнителей
Переходим в п.3.
Если нет – смотрим, есть ли на данный момент источники.
Если есть – ждем добавления работы и переходим в п.2a.
Если нет – заканчиваем работу всех потоков-исполнителей.
Выполняем задание
Информируем о начале выполнения задания в соответствующем потоке
Выполняем задание
Информируем о конце выполнения задания в соответствующем потоке
Проверяем флаг, показывающий выполнена ли вся работа
Если не выполнена – п.2
Если выполнена – п.5
Информируем о конце работы соответствующего потока-исполнителя. Заканчиваем работу.
WorkSource
Информируем о начале работы потока-источника
Для всех работ в списке
Добавляем в очередь
Засыпаем
Информируем о конце работы потока-источника
Текст программы:
Класс Main:
package threadkurs;
import java.util.ArrayList;
public class Main {
public static void main(String[] args) {
WorkQueue WQ = new WorkQueue(10);
ArrayList listWI = new ArrayList();
for(int i = 0; i= listW.size() && cntActivWS == 0);
}
finally{
lockW.unlock();
}
return res;
}
void startPerform(long ID){
System.out.printf("НАЧАЛ РАБОТУ ВЫПОЛНЯТОР (ID = %d)n",ID);
}
void endPerform(long ID){
lockWP.lock();
try{
--cntActivWP;//говорим что закончили работу
System.out.printf("ЗАКОНЧИЛ РАБОТУ ВЫПОЛНЯТОР (ID = %d)n",ID);
}finally{
condWP.signalAll();
lockWP.unlock();
}
}
void startSource(long ID){
System.out.printf("НАЧАЛ РАБОТУ ДОБАВЛЯТОР (ID = %d)n",ID);
}
void endSource(long ID){
lockW.lock();
try{
--cntActivWS;
}finally{
condW.signalAll();
lockW.unlock();
}
System.out.printf("ЗАКОНЧИЛ РАБОТУ ДОБАВЛЯТОР (ID = %d)n",ID);
}
void addWork(WorkInfo WI, long ID){
lockW.lock();
try{
System.out.printf("++Добавление работы (ID = %d)n",ID);
listW.add(WI);
}finally{
condW.signalAll();
lockW.unlock();
}
}
//оповещение всех исполнителей о конце всей работы
void callAllWP(){
lockWP.lock();
try{
for(WorkPerformer WP : listWP){
WP.endMyPerform();
}
}
finally{
lockWP.unlock();
}
}
}
Класс WorkSource:
package threadkurs;
import java.util.ArrayList;
public class WorkSource extends Thread{
private long threadID;//идентификатор данного потока
WorkQueue WQ;//очередь
private Integer time; //время сна между добавления заданий
private ArrayList listW;//список работы
WorkSource(Integer time, ArrayList listW){
this.time = time;
this.listW = listW;
}
@Override
public void run(){
threadID = Thread.currentThread().getId();
WQ.startSource(threadID);
for(WorkInfo WI : listW){
addWork(WI);
try{
Thread.sleep(time);
}catch(Exception Ex){System.out.printf("Игнорирована попытка прерывания работы источника работ - добавляем дальше");}
}
WQ.endSource(threadID);
}
void addWork(WorkInfo WI){
WQ.addWork(WI, threadID);
}
}
Класс WorkPerformer:
package threadkurs;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class WorkPerformer extends Thread{
ReentrantLock lockEnd = new ReentrantLock();//блокировка на флаг о выполнении всей работы
Condition condEnd = lockEnd.newCondition();
long threadID;//идентификатор данного потока
WorkInfo WI;//текущая работа
WorkQueue WQ;//очередь
boolean workIsDone = false;//флаг о выполнении всей работы
@Override
public void run(){
threadID = Thread.currentThread().getId();
WQ.startPerform(threadID);
executeAll();
}
void executeAll(){
getWI();
while(!isDone()){
execute(WI);
getWI();
}
}
void execute (WorkInfo WI){
System.out.printf("->Начало выполнения работы(ID = %d)n",threadID);
try{
Thread.sleep(WI.time);
}catch(Exception Ex){
System.out.printf(">