1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package com.ontotext.ordi.mapper.processor;
19
20 import java.util.HashSet;
21 import java.util.Set;
22 import java.util.concurrent.BlockingQueue;
23 import java.util.concurrent.LinkedBlockingQueue;
24
25 import com.ontotext.ordi.exception.ORDIRuntimeException;
26 import com.ontotext.ordi.mapper.model.RDFResultSet;
27
28 public class RDFResultManager {
29
30 public static final int MAX_CONCURRENT_THREADS = 5;
31 public static volatile int threadCount;
32 private Set<RDFResultSet> registered = new HashSet<RDFResultSet>();
33 private BlockingQueue<RDFResultSet> queue = new LinkedBlockingQueue<RDFResultSet>();
34
35
36 public synchronized void register(RDFResultSet resultSet) {
37 if (resultSet == null) {
38 throw new IllegalArgumentException();
39 }
40 registered.add(resultSet);
41 }
42
43 public void registerComplete(RDFResultSet resultSet) {
44 if (resultSet == null) {
45 throw new IllegalArgumentException();
46 }
47 boolean isInserted = queue.offer(resultSet);
48 if (isInserted == false) {
49 throw new ORDIRuntimeException("RDFResult set was not queued!");
50 }
51 }
52
53 public synchronized RDFResultSet getNextCompleted() {
54 try {
55 if (registered.size() == 0) {
56 return null;
57 }
58 RDFResultSet next = queue.take();
59 registered.remove(next);
60 return next;
61 } catch (InterruptedException e) {
62 throw new ORDIRuntimeException(
63 "Error while waitting for the next RDFResultSet!", e);
64 }
65 }
66 }