Clover icon

sunshower-core

  1. Project Clover database Fri Apr 6 2018 03:27:42 UTC
  2. Package io.sunshower.service.task.exec

File ParallelTaskExecutor.java

 

Coverage histogram

../../../../../img/srcFileCovDistChart8.png
14% of files have more coverage

Code metrics

24
164
36
3
464
398
50
0.3
4.56
12
1.39

Classes

Class Line # Actions
ParallelTaskExecutor 29 84 19
0.935779893.6%
ParallelTaskExecutor.PSchedule 234 48 13
0.62562.5%
ParallelTaskExecutor.ParallelTask 348 32 18
0.62745162.7%
 

Contributing tests

This file is covered by 14 tests. .

Source view

1    package io.sunshower.service.task.exec;
2   
3    import static org.apache.tinkerpop.gremlin.process.traversal.P.eq;
4    import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.*;
5   
6    import io.reactivex.Observer;
7    import io.reactivex.subjects.PublishSubject;
8    import io.reactivex.subjects.Subject;
9    import io.sunshower.common.Identifier;
10    import io.sunshower.persist.Identifiers;
11    import io.sunshower.persist.Sequence;
12    import io.sunshower.service.model.task.*;
13    import io.sunshower.service.task.*;
14    import java.util.*;
15    import java.util.concurrent.CompletableFuture;
16    import java.util.concurrent.CountDownLatch;
17    import java.util.concurrent.ExecutorService;
18    import java.util.logging.Level;
19    import java.util.logging.Logger;
20    import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal;
21    import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
22    import org.apache.tinkerpop.gremlin.structure.T;
23    import org.apache.tinkerpop.gremlin.structure.Vertex;
24    import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph;
25    import org.javatuples.Triplet;
26    import org.springframework.context.ApplicationContext;
27   
28    /** Created by haswell on 2/4/17. */
 
29    public class ParallelTaskExecutor implements Thread.UncaughtExceptionHandler {
30   
31    public static final Logger log = Logger.getLogger(ParallelTaskExecutor.class.getName());
32   
33    static final Sequence<Identifier> sequence = Identifiers.randomSequence();
34   
35    public static final String TASK_STARTING = "task-starting";
36    public static final String TASK_COMPLETE = "task-complete";
37    public static final String TASK_ERROR = "task-error";
38    public static final String EXECUTION_STARTING = "exec-starting";
39    public static final String EXECUTION_FINISHED = "exec-finished";
40   
41    private final ElementContext elementContext;
42    private final ExecutorService executorService;
43    private final ApplicationContext applicationContext;
44   
 
45  2 toggle public ParallelTaskExecutor(
46    ElementContext elementContext,
47    ExecutorService executorService,
48    ApplicationContext applicationContext) {
49   
50  2 this.elementContext = elementContext;
51  2 this.executorService = executorService;
52  2 this.applicationContext = applicationContext;
53   
54  2 Thread.setDefaultUncaughtExceptionHandler(this);
55    }
56   
 
57  1 toggle public ExecutionMonitor run(ExecutionPlan plan) {
58  1 ParallelTask task = (ParallelTask) plan.getPayload();
59  1 executorService.submit(task);
60  1 return task;
61    }
62   
 
63  1 toggle public ExecutionMonitor run(TaskGraph graph, TaskContext context) {
64   
65  1 final ParallelTask task = new ParallelTask(graph, context);
66  1 executorService.submit(task);
67  1 return task;
68    }
69   
 
70  19 toggle public ExecutionMonitor run(TaskGraph taskGraph) {
71  19 final ParallelTask task = new ParallelTask(taskGraph, new TaskContext());
72  19 executorService.submit(task);
73  19 return task;
74    }
75   
 
76  3 toggle public ExecutionMonitor createPlan(TaskGraph graph) {
77  3 return new ParallelTask(graph, new TaskContext());
78    }
79   
 
80  23 toggle private PSchedule createParallelSchedule(
81    TaskContext context,
82    TinkerGraph graph,
83    TinkerGraph shadowGraph,
84    TaskGraph taskGraph,
85    Subject<TaskEvent> subject,
86    Map<Identifier, Vertex> vertices,
87    Identifier taskId,
88    ParallelTask task) {
89  23 log.log(Level.INFO, "Computing schedule for request {0}", taskId);
90  23 final GraphTraversalSource source = shadowGraph.traversal();
91   
92  23 GraphTraversal<Vertex, Vertex> group = source.V().where(inE().count().is(eq(0)));
93   
94  23 final PSchedule schedule = new PSchedule(task, taskId, subject, graph, taskGraph);
95  23 Set<Identifier> ls = new LinkedHashSet<>();
96  23 boolean running = true;
97  96 while (group.hasNext() && running) {
98  73 LevelSet<TaskElement> set =
99    createLevelSet(
100    group, task.taskLogger, context, ls, vertices, graph, schedule.id, subject);
101  73 schedule.add(set);
102  73 Iterator<Identifier> ns = ls.iterator();
103  73 if (!ns.hasNext()) {
104  0 running = false;
105    }
106  73 Identifier fst = ns.next();
107  73 Identifier[] remaining = new Identifier[ls.size() - 1];
108  73 int count = 0;
109  161 while (ns.hasNext()) {
110  88 remaining[count++] = ns.next();
111    }
112   
113  73 source
114    .V()
115    .where(inE().count().is(eq(0)))
116    .and(hasId(fst, (Object[]) remaining))
117    .drop()
118    .iterate();
119  73 group = source.V().where(inE().count().is(eq(0)));
120  73 ls.clear();
121    }
122  23 shadowGraph.clear();
123  23 shadowGraph.close();
124  23 log.info("Schedule computed");
125  23 if (log.getLevel() == Level.INFO) {
126  0 log.info(schedule.toString());
127    }
128   
129  23 return schedule;
130    }
131   
 
132  73 toggle private LevelSet<TaskElement> createLevelSet(
133    GraphTraversal<Vertex, Vertex> group,
134    TaskLogger logger,
135    TaskContext context,
136    Set<Identifier> ls,
137    Map<Identifier, Vertex> vertices,
138    TinkerGraph graph,
139    Identifier scheduleId,
140    final Subject<TaskEvent> subject) {
141  73 final LevelSet<TaskElement> set =
142    new ParallelLevelSet(
143    logger, context, scheduleId, graph, subject, executorService, applicationContext, this);
144  234 while (group.hasNext()) {
145  161 Vertex t = group.next();
146  161 Identifier id = (Identifier) t.id();
147  161 Vertex vertex = vertices.get(id);
148  161 set.add(new TaskElement(vertex));
149  161 ls.add((Identifier) t.id());
150    }
151  73 return set;
152    }
153   
 
154  23 toggle private Triplet<TinkerGraph, TinkerGraph, Map<Identifier, Vertex>> buildGraph(
155    TaskGraph taskGraph) {
156  23 log.info("building internal graph from taskGraph");
157  23 final TinkerGraph graph = TinkerGraph.open();
158  23 final TinkerGraph shadowGraph = TinkerGraph.open();
159  23 final Map<Identifier, Vertex> vertices = new LinkedHashMap<>();
160   
161  23 log.info("Computing vertex structure...");
162  23 final Map<Identifier, Vertex> shadowVertices = new LinkedHashMap<>();
163  23 final ContextResolver resolver = new ExpressionContextResolver(graph);
164  23 computeVertices(taskGraph, graph, shadowGraph, taskGraph, resolver, vertices, shadowVertices);
165   
166  23 log.info("Vertex structure complete");
167  23 log.info("Computing edge structure");
168  23 computeEdges(graph, shadowGraph, taskGraph, vertices, shadowVertices);
169  23 log.info("Edge structure complete");
170  23 log.info("Transforming graph...");
171  23 elementContext.transform(graph, TinkerGraph.class);
172  23 elementContext.transform(shadowGraph, TinkerGraph.class);
173  23 log.info("Successfully transformed graph");
174  23 return Triplet.with(graph, shadowGraph, vertices);
175    }
176   
 
177  23 toggle private void computeVertices(
178    TaskGraph taskGraph,
179    TinkerGraph graph,
180    TinkerGraph shadowGraph,
181    TaskGraph result,
182    ContextResolver resolver,
183    Map<Identifier, Vertex> vertices,
184    Map<Identifier, Vertex> shadowVertices) {
185  23 for (Node v : result.getNodes()) {
186  161 if (!vertices.containsKey(v.getId())) {
187  161 final ElementDescriptor descriptor = elementContext.resolve(v, taskGraph, resolver);
188  161 final String name = v.getName() != null ? v.getName() : v.getId().toString();
189  161 final Vertex vertex = graph.addVertex(T.id, v.getId(), T.label, name);
190  161 Vertex shadowVertex = shadowGraph.addVertex(T.id, v.getId(), T.label, name);
191  161 shadowVertices.put(v.getId(), shadowVertex);
192  161 vertex.property("descriptor", descriptor);
193  161 vertices.put(v.getId(), vertex);
194  161 log.log(Level.INFO, "Vertex {0} computed: ", v.getId());
195    }
196    }
197    }
198   
 
199  23 toggle private void computeEdges(
200    TinkerGraph graph,
201    TinkerGraph shadowGraph,
202    TaskGraph result,
203    Map<Identifier, Vertex> vertices,
204    Map<Identifier, Vertex> shadowVertices) {
205   
206  23 for (Edge edge : result.getEdges()) {
207   
208  165 Vertex shadowSource =
209    shadowVertices.computeIfAbsent(
210    edge.getSource(), k -> shadowGraph.addVertex(edge.getSource()));
211   
212  165 Vertex shadowTarget =
213    shadowVertices.computeIfAbsent(
214    edge.getTarget(), k -> shadowGraph.addVertex(edge.getTarget()));
215   
216  165 Vertex source =
217    vertices.computeIfAbsent(edge.getSource(), k -> graph.addVertex(edge.getSource()));
218   
219  165 Vertex target =
220    vertices.computeIfAbsent(edge.getTarget(), k -> graph.addVertex(edge.getTarget()));
221   
222  165 final String relationshipKey = elementContext.resolveRelationship(edge, result).getKey();
223   
224  165 source.addEdge(relationshipKey, target);
225  165 shadowSource.addEdge(relationshipKey, shadowTarget);
226    }
227    }
228   
 
229  0 toggle @Override
230    public void uncaughtException(Thread t, Throwable e) {
231  0 e.printStackTrace();
232    }
233   
 
234    private class PSchedule implements ParallelSchedule<TaskElement>, Runnable {
235    final Map<Integer, LevelSet<TaskElement>> levels;
236    private final TinkerGraph graph;
237    private int current = 0;
238    private final Identifier id;
239    private volatile boolean running;
240    private final ExecutionPlan tree;
241    private CountDownLatch latch;
242    private final Subject<TaskEvent> subject;
243    private final TaskGraph taskGraph;
244   
 
245  23 toggle private PSchedule(
246    ParallelTask task,
247    Identifier taskId,
248    Subject<TaskEvent> subject,
249    TinkerGraph graph,
250    TaskGraph taskGraph) {
251  23 this.id = taskId;
252  23 this.graph = graph;
253  23 this.subject = subject;
254  23 this.taskGraph = taskGraph;
255  23 this.levels = new TreeMap<>();
256  23 this.tree = new ExecutionPlan(id, task);
257    }
258   
 
259  73 toggle @Override
260    public void add(LevelSet<TaskElement> set) {
261  73 ParallelLevelSet pset = (ParallelLevelSet) set;
262  73 pset.order = current;
263  73 levels.put(current, pset);
264  73 tree.addLevel(createLevel(current, set));
265  73 current++;
266    }
267   
 
268  73 toggle private ExecutionLevel createLevel(int current, LevelSet<TaskElement> set) {
269  73 final ExecutionLevel result = new ExecutionLevel();
270  73 result.setLevel(current);
271  73 int count = 0;
272  73 for (TaskElement e : set) {
273  161 Object id = e.vertex.id();
274  161 String r = id == null ? "" : id.toString();
275  161 result.addTask(new ExecutionTask(count++, r, e.vertex.label()));
276    }
277  73 return result;
278    }
279   
 
280  0 toggle public ExecutionPlan getPlan() {
281  0 return tree;
282    }
283   
 
284  21 toggle public void run() {
285  21 try {
286  21 this.running = true;
287  21 this.latch = new CountDownLatch(levels.size());
288  21 ParallelLevelSet last = null;
289  21 for (LevelSet<TaskElement> set : this) {
290  68 final ParallelLevelSet pset = (ParallelLevelSet) set;
291  68 last = pset;
292  68 pset.latch = latch;
293  68 ((ParallelLevelSet) set).run();
294    }
295  21 if (last != null) {
296  21 last.last = true;
297    }
298  21 this.running = false;
299    } finally {
300  21 subject.onComplete();
301    }
302    }
303   
 
304  0 toggle @Override
305    public ParallelLevelSet get(int level) {
306  0 return (ParallelLevelSet) levels.get(level);
307    }
308   
 
309  0 toggle @Override
310    @SuppressWarnings("unchecked")
311    public <T> T unwrap(Class<T> clazz) {
312  0 return (T) graph;
313    }
314   
 
315  0 toggle public String toString() {
316  0 return "ParallelSchedule{"
317    + "\n\texecution plan:\n"
318    + showExecutionPlan()
319    + "\trunning: "
320    + running
321    + "\n"
322    + "}";
323    }
324   
 
325  0 toggle private String showExecutionPlan() {
326  0 final StringBuilder b = new StringBuilder();
327  0 for (LevelSet<TaskElement> t : this) {
328  0 ParallelLevelSet pt = (ParallelLevelSet) t;
329  0 b.append("\tlevel" + pt.order + ":").append("\n\t");
330  0 int count = 0;
331  0 for (TaskElement te : t) {
332  0 b.append("\t").append(te.vertex.label()).append(" ");
333  0 if (count++ < t.size() - 1) {
334  0 b.append("->").append(" ");
335    }
336    }
337  0 b.append("\n");
338    }
339  0 return b.toString();
340    }
341   
 
342  21 toggle @Override
343    public Iterator<LevelSet<TaskElement>> iterator() {
344  21 return levels.values().iterator();
345    }
346    }
347   
 
348    class ParallelTask implements ExecutionMonitor, Runnable {
349   
350    final Identifier taskId;
351   
352    final TaskContext context;
353    final TaskGraph taskGraph;
354    final Subject<TaskEvent> subject;
355    final TaskLogger taskLogger;
356   
357    final PSchedule schedule;
358    volatile TinkerGraph graph;
359   
 
360  23 toggle ParallelTask(TaskGraph taskGraph, TaskContext context) {
361  23 this.context = context;
362  23 this.taskLogger = new RxTaskLogger(this);
363  23 this.taskId = sequence.next();
364  23 this.taskGraph = taskGraph;
365  23 this.subject = PublishSubject.create();
366  23 this.schedule = createSchedule();
367    }
368   
 
369  21 toggle @Override
370    public void run() {
371  21 executorService.submit(schedule);
372    }
373   
 
374  6 toggle private PSchedule get() {
375  6 return schedule;
376    }
377   
 
378  23 toggle private PSchedule createSchedule() {
379  23 log.info("Creating schedule");
380  23 Triplet<TinkerGraph, TinkerGraph, Map<Identifier, Vertex>> result = buildGraph(taskGraph);
381  23 this.graph = result.getValue0();
382   
383  23 return createParallelSchedule(
384    context,
385    result.getValue0(),
386    result.getValue1(),
387    taskGraph,
388    subject,
389    result.getValue2(),
390    taskId,
391    this);
392    }
393   
 
394  0 toggle @Override
395    public Identifier getId() {
396  0 return taskId;
397    }
398   
 
399  0 toggle @Override
400    public void start() {
401  0 run();
402    }
403   
 
404  10 toggle @Override
405    public TaskLogger getLogger() {
406  10 return taskLogger;
407    }
408   
 
409  20 toggle public Iterable<TaskEvent> join() throws InterruptedException {
410  20 return subject.toList().blockingGet();
411    }
412   
 
413  1 toggle @Override
414    @SuppressWarnings("unchecked")
415    public CompletableFuture<Void> toFuture() {
416  1 return CompletableFuture.supplyAsync(
417    () -> {
418  1 this.joinWithoutException();
419  1 return null;
420    },
421    executorService);
422    }
423   
 
424  1 toggle void joinWithoutException() {
425  1 try {
426  1 join();
427    } catch (InterruptedException e) {
428  0 throw new RuntimeException(e);
429    }
430    }
431   
 
432  0 toggle @SuppressWarnings("unchecked")
433    public <T> T unwrap(Class<T> type) {
434  0 if (type == Subject.class) {
435  0 return (T) subject;
436  0 } else if (type == TinkerGraph.class) {
437  0 return (T) get().graph;
438    }
439  0 return null;
440    }
441   
 
442  6 toggle @Override
443    public ExecutionPlan getExecutionPlan() {
444  6 return get().tree;
445    }
446   
 
447  0 toggle @Override
448    public TaskDefinition resolve(String name) {
449  0 return null;
450    }
451   
 
452  9 toggle @Override
453    public TaskDefinition resolve(Identifier id) {
454  9 ElementDescriptor<?> descriptor =
455    (ElementDescriptor<?>) graph.vertices(id).next().property("descriptor").value();
456  9 return new TaskDefinition(descriptor);
457    }
458   
 
459  0 toggle @Override
460    public void subscribe(Observer<? super TaskEvent> observer) {
461  0 subject.subscribe(observer);
462    }
463    }
464    }