차단 대기열 및 다중 스레드 소비자, 중지시기를 아는 방법
나는 ArrayBlockingQueue(고정 크기)에 추가되는 일부 작업 객체를 생성하는 단일 스레드 생산자를 가지고 있습니다 .
또한 다중 스레드 소비자를 시작합니다. 이것은 고정 스레드 풀 ( Executors.newFixedThreadPool(threadCount);) 로 빌드됩니다 . 그런 다음이 threadPool에 일부 ConsumerWorker 인텐스를 제출합니다. 각 ConsumerWorker는 위에서 언급 한 ArrayBlockingQueue 인스턴스를 참조합니다.
각 작업자는 take()대기열에서 작업을 수행하고 작업을 처리합니다.
내 문제는 더 이상 수행 할 작업이 없을 때 작업자에게 알릴 수있는 가장 좋은 방법입니다. 즉, 작업자에게 생산자가 대기열에 추가를 완료했음을 알리는 방법은 무엇이며이 시점부터 각 작업자는 대기열이 비어 있음을 알게되면 중지해야합니다.
내가 지금 가지고있는 것은 내 생산자가 작업을 마칠 때 트리거되는 콜백으로 초기화되는 설정입니다 (대기열에 물건을 추가하는 것). 또한 내가 생성하여 ThreadPool에 제출 한 모든 ConsumerWorkers 목록을 유지합니다. 생산자 콜백에서 생산자가 끝났다고 알려 주면 각 작업자에게이를 알릴 수 있습니다. 이 시점에서 그들은 단순히 큐가 비어 있지 않은지 계속 확인해야하며, 비어있게되면 중지해야합니다. 따라서 ExecutorService 스레드 풀을 정상적으로 종료 할 수 있습니다. 이런 거예요
public class ConsumerWorker implements Runnable{
private BlockingQueue<Produced> inputQueue;
private volatile boolean isRunning = true;
public ConsumerWorker(BlockingQueue<Produced> inputQueue) {
this.inputQueue = inputQueue;
}
@Override
public void run() {
//worker loop keeps taking en element from the queue as long as the producer is still running or as
//long as the queue is not empty:
while(isRunning || !inputQueue.isEmpty()) {
System.out.println("Consumer "+Thread.currentThread().getName()+" START");
try {
Object queueElement = inputQueue.take();
//process queueElement
} catch (Exception e) {
e.printStackTrace();
}
}
}
//this is used to signal from the main thread that he producer has finished adding stuff to the queue
public void setRunning(boolean isRunning) {
this.isRunning = isRunning;
}
}
여기서 문제는 때때로 생산자가 완료하고 신호를 보내고 ConsumerWorkers가 대기열의 모든 것을 소비하기 전에 중지되는 명백한 경쟁 조건이 있다는 것입니다.
내 질문은 모두 정상적으로 작동하도록 동기화하는 가장 좋은 방법은 무엇입니까? 생산자가 실행 중인지 확인하는 전체 부분을 동기화하고 큐가 비어 있는지 확인하고 한 블록의 큐에서 무언가를 가져와야합니까 (큐 개체에서)? isRunningConsumerWorker 인스턴스 에서 부울 업데이트 만 동기화해야합니까 ? 다른 제안이 있습니까?
업데이트, 내가 사용한 작업 구현은 다음과 같습니다.
public class ConsumerWorker implements Runnable{
private BlockingQueue<Produced> inputQueue;
private final static Produced POISON = new Produced(-1);
public ConsumerWorker(BlockingQueue<Produced> inputQueue) {
this.inputQueue = inputQueue;
}
@Override
public void run() {
//worker loop keeps taking en element from the queue as long as the producer is still running or as
//long as the queue is not empty:
while(true) {
System.out.println("Consumer "+Thread.currentThread().getName()+" START");
try {
Produced queueElement = inputQueue.take();
Thread.sleep(new Random().nextInt(100));
if(queueElement==POISON) {
break;
}
//process queueElement
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("Consumer "+Thread.currentThread().getName()+" END");
}
}
//this is used to signal from the main thread that he producer has finished adding stuff to the queue
public void stopRunning() {
try {
inputQueue.put(POISON);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
이것은 약간의 수정만으로 JohnVint의 아래 답변에서 크게 영감을 받았습니다.
=== @vendhan의 의견으로 인해 업데이트되었습니다.
당신의 순종에 감사드립니다. 당신이 맞습니다.이 질문의 첫 번째 코드 스 니펫에는 (다른 문제 중에서) while(isRunning || !inputQueue.isEmpty())실제로 의미가없는 부분이 있습니다.
In my actual final implementation of this, I do something which is closer to your suggestion of replacing "||" (or) with "&&" (and), in the sense that each worker (consumer) now only checks if the element he's got from the list is a poison pill, and if so stops (so theoretically we can say that the worker has to be running AND the queue must not be empty).
You should continue to take() from the queue. You can use a poison pill to tell the worker to stop. For example:
private final Object POISON_PILL = new Object();
@Override
public void run() {
//worker loop keeps taking en element from the queue as long as the producer is still running or as
//long as the queue is not empty:
while(isRunning) {
System.out.println("Consumer "+Thread.currentThread().getName()+" START");
try {
Object queueElement = inputQueue.take();
if(queueElement == POISON_PILL) {
inputQueue.add(POISON_PILL);//notify other threads to stop
return;
}
//process queueElement
} catch (Exception e) {
e.printStackTrace();
}
}
}
//this is used to signal from the main thread that he producer has finished adding stuff to the queue
public void finish() {
//you can also clear here if you wanted
isRunning = false;
inputQueue.add(POISON_PILL);
}
I'd send the workers a special work packet to signal that they should shut down:
public class ConsumerWorker implements Runnable{
private static final Produced DONE = new Produced();
private BlockingQueue<Produced> inputQueue;
public ConsumerWorker(BlockingQueue<Produced> inputQueue) {
this.inputQueue = inputQueue;
}
@Override
public void run() {
for (;;) {
try {
Produced item = inputQueue.take();
if (item == DONE) {
inputQueue.add(item); // keep in the queue so all workers stop
break;
}
// process `item`
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
To stop the workers, simply add ConsumerWorker.DONE to the queue.
In your code-block where you attempt to retrive element from the queue , use poll(time,unit) instead of the take().
try {
Object queueElement = inputQueue.poll(timeout,unit);
//process queueElement
} catch (InterruptedException e) {
if(!isRunning && queue.isEmpty())
return ;
}
By specifying appropriate values of timeout , you ensure that threads wont keep blocking in case there is a unfortunate sequence of
isRunningis true- Queue becomes empty , so threads enter blocked wait ( if using
take() isRunningis set to false
Can not we do it using a CountDownLatch, where the size is the number of records in the producer. And every consumer will countDown after process a record. And its crosses the awaits() method when all tasks finished. Then stop all ur consumers. As all records are processed.
There are a number of strategies you could use, but one simple one is to have a subclass of task that signals the end of the job. The producer doesn't send this signal directly. Instead, it enqueues an instance of this task subclass. When one of your consumers pulls off this task and executes it, that causes the signal to be sent.
I had to use a multi-threaded producer and a multi-threaded consumer. I ended up with a Scheduler -- N Producers -- M Consumers scheme, each two communicate via a queue (two queues total). The Scheduler fills the first queue with requests to produce data, and then fills it with N "poison pills". There is a counter of active producers (atomic int), and the last producer that receives the last poison pill sends M poison pills to the consumer queue.
'program story' 카테고리의 다른 글
| CSS 공급 업체 접두사 목록? (0) | 2020.11.15 |
|---|---|
| indexedDB는 HTML5 로컬 저장소와 개념적으로 어떻게 다른가요? (0) | 2020.11.15 |
| QR 코드에 얼마나 많은 데이터 / 정보를 저장 / 저장할 수 있습니까? (0) | 2020.11.15 |
| JMeter : 콘텐츠 유형 헤더로 요청을 보내는 방법은 무엇입니까? (0) | 2020.11.15 |
| R은 변수 열 인덱스를 ggplot2에 전달합니다. (0) | 2020.11.15 |