Clover icon

sunshower-core

  1. Project Clover database Fri Apr 6 2018 03:27:42 UTC
  2. Package io.sunshower.service.task.exec

File TaskStep.java

 

Coverage histogram

../../../../../img/srcFileCovDistChart7.png
24% of files have more coverage

Code metrics

26
100
19
1
267
221
38
0.38
5.26
19
2

Classes

Class Line # Actions
TaskStep 23 100 38
0.641379364.1%
 

Contributing tests

This file is covered by 12 tests. .

Source view

1    package io.sunshower.service.task.exec;
2   
3    import io.reactivex.subjects.Subject;
4    import io.sunshower.common.Identifier;
5    import io.sunshower.service.model.task.TaskEvent;
6    import io.sunshower.service.model.task.TaskLogger;
7    import io.sunshower.service.task.*;
8    import java.lang.annotation.Annotation;
9    import java.lang.reflect.Field;
10    import java.lang.reflect.Method;
11    import java.util.*;
12    import java.util.concurrent.Callable;
13    import java.util.concurrent.CountDownLatch;
14    import java.util.logging.Level;
15    import javax.annotation.PostConstruct;
16    import org.apache.tinkerpop.gremlin.structure.VertexProperty;
17    import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph;
18    import org.javatuples.Pair;
19    import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
20    import org.springframework.context.ApplicationContext;
21   
22    /** Created by haswell on 2/15/17. */
 
23    class TaskStep implements Callable<ExecutionResult> {
24   
25    private final TaskContext context;
26    private Identifier taskId;
27    private final Identifier scheduleId;
28    private final TinkerGraph graph;
29    private final CountDownLatch latch;
30    private final TaskElement taskElement;
31    private final Subject<TaskEvent> subject;
32    private ParallelTaskExecutor parallelTaskExecutor;
33    private final ApplicationContext applicationContext;
34    private final TaskLogger taskLogger;
35   
 
36  153 toggle public TaskStep(
37    TaskLogger logger,
38    TaskContext context,
39    TinkerGraph graph,
40    TaskElement e,
41    Identifier scheduleId,
42    CountDownLatch latch,
43    Subject<TaskEvent> subject,
44    ApplicationContext applicationContext,
45    ParallelTaskExecutor parallelTaskExecutor) {
46  153 this.taskLogger = logger;
47  153 this.graph = graph;
48  153 this.latch = latch;
49  153 this.taskElement = e;
50  153 this.subject = subject;
51  153 this.context = context;
52  153 this.scheduleId = scheduleId;
53  153 this.applicationContext = applicationContext;
54  153 this.parallelTaskExecutor = parallelTaskExecutor;
55  153 ParallelTaskExecutor.log.log(Level.INFO, "Initializing task step for task ", scheduleId);
56    }
57   
 
58  144 toggle final Map<Class<?>, Object> createInjectionContext(Class<?> type, Object value) {
59  145 final Map<Class<?>, Object> result = new HashMap<>();
60  149 result.put(TaskLogger.class, taskLogger);
61  145 fromPreconditions(type, value, result);
62  153 return result;
63    }
64   
 
65  144 toggle private void fromPreconditions(Class<?> type, Object value, Map<Class<?>, Object> result) {
66  148 ParallelTaskExecutor.log.info("Computing preconditions...");
67  151 if (type.isAnnotationPresent(Preconditions.class)) {
68  0 final Pair<Class<?>, Object>[] preconditions =
69    compute(type.getAnnotation(Preconditions.class));
70  0 for (Pair<Class<?>, Object> precondition : preconditions) {
71  0 result.put(precondition.getValue0(), precondition.getValue1());
72  0 runPrecondition(precondition.getValue0(), precondition.getValue1(), result);
73    }
74    }
75  149 ParallelTaskExecutor.log.info("Preconditions computed");
76    }
77   
 
78  0 toggle private void runPrecondition(Class<?> value0, Object value1, Map<Class<?>, Object> result) {
79  0 Object r = runRuns(value0, value1);
80  0 if (r != null) {
81  0 result.put(r.getClass(), r);
82    }
83    }
84   
 
85  0 toggle private Pair<Class<?>, Object>[] compute(Preconditions annotation) {
86  0 Precondition[] value = annotation.value();
87  0 @SuppressWarnings("unchecked")
88    final Pair<Class<?>, Object>[] results = new Pair[value.length];
89  0 Arrays.sort(value, (lhs, rhs) -> lhs.order() < rhs.order() ? -1 : 1);
90  0 for (int i = 0; i < value.length; i++) {
91  0 results[i] = Pair.with(value[i].condition(), createPrecondition(value[i].condition()));
92    }
93  0 return results;
94    }
95   
 
96  0 toggle private Object createPrecondition(Class<?> condition) {
97  0 return applicationContext
98    .getAutowireCapableBeanFactory()
99    .createBean(condition, AutowireCapableBeanFactory.AUTOWIRE_NO, true);
100    }
101   
 
102  0 toggle private Identifier resolveId() {
103  0 if (taskId != null) {
104  0 return taskId;
105    }
106   
107  0 final Object id = taskElement.vertex.id();
108  0 try {
109  0 return (taskId = Identifier.decode((String) id));
110    } catch (Exception ex) {
111  0 return (taskId = Identifier.random());
112    }
113    }
114   
 
115  152 toggle public ExecutionResult<?> call() {
116  152 ParallelTaskExecutor.log.info("Beginning task execution");
117  153 subject.onNext(new TaskEvent(TaskEvent.Type.TaskBeginning, scheduleId, taskId));
118  151 try {
119  153 Object result = doRun();
120  152 subject.onNext(new TaskEvent(TaskEvent.Type.TaskBeginning, scheduleId, taskId));
121  152 return new ExecResult(result);
122    } catch (Exception e) {
123  0 ParallelTaskExecutor.log.log(
124    Level.WARNING,
125    "Failed to execute task {0} in plan {1}. " + "Reason: {2}. Full trace at debug",
126    new Object[] {resolveId(), scheduleId, e.getMessage()});
127  0 e.printStackTrace();
128  0 subject.onError(new TaskException(e, scheduleId, taskId));
129  0 return new ExecResult(e);
130    } finally {
131  153 latch.countDown();
132    }
133    }
134   
 
135  151 toggle private Object doRun() {
136  151 final VertexProperty<Object> property = taskElement.vertex.property("descriptor");
137  150 final ElementDescriptor descriptor = (ElementDescriptor) property.value();
138  153 prepare(descriptor);
139  148 final Class<?> type = descriptor.getType();
140  148 final Object value = descriptor.getInstance();
141  145 inject(type, value);
142  146 runPostConstructs(type, value);
143  150 runBefores(type, value);
144  153 Object result = runRuns(type, value);
145  152 if (result != null) {
146  152 taskElement.vertex.property("result", result);
147    }
148  152 runAfters(type, value);
149  153 return result;
150    }
151   
 
152  150 toggle @SuppressWarnings("all")
153    private void prepare(ElementDescriptor descriptor) {
154  150 final InjectionContext injectionContext =
155    new InjectionContext(context, descriptor, graph, applicationContext);
156  149 descriptor.setInstance(injectionContext.invoke());
157    }
158   
 
159  146 toggle private void runPostConstructs(Class<?> type, Object value) {
160  148 runWithAnnotation(type, value, PostConstruct.class);
161    }
162   
 
163  604 toggle private Object runWithAnnotation(
164    Class<?> type, Object value, Class<? extends Annotation> annotation) {
165  607 Object result = null;
166   
167  610 Set<Method> methods = new HashSet<>();
168  606 Class<?> proxyType = value.getClass();
169  607 for (Method m : type.getMethods()) {
170  7339 if (m.isAnnotationPresent(annotation) && !methods.contains(m)) {
171  150 try {
172  151 final Method proxyMethod = proxyType.getMethod(m.getName());
173  152 proxyMethod.setAccessible(true);
174  152 Object r = proxyMethod.invoke(value);
175  152 if (r != null) {
176  151 result = r;
177    }
178  153 methods.add(m);
179    } catch (Exception e) {
180  0 e.printStackTrace();
181    }
182    }
183    }
184  612 return result;
185    }
186   
 
187  150 toggle private void runBefores(Class<?> type, Object value) {
188  150 runWithAnnotation(type, value, Before.class);
189    }
190   
 
191  148 toggle private Object runRuns(Class<?> type, Object value) {
192  149 return runWithAnnotation(type, value, Run.class);
193    }
194   
 
195  152 toggle private Object runAfters(Class<?> type, Object value) {
196  152 return runWithAnnotation(type, value, After.class);
197    }
198   
 
199  145 toggle private void inject(Class<?> objectype, Object element) {
200   
201  147 runAndInjectPreconditions(objectype, element);
202   
203  150 findAndInjectProperties(objectype, element);
204    }
205   
 
206  146 toggle private void findAndInjectProperties(Class<?> objectype, Object element) {
207    // final Identifier id = resolveId();
208    // GraphTraversal<
209    // Vertex,
210    // Vertex
211    // > next = this.graph.traversal().V(this.taskElement.vertex.id()).out();
212    // if (next.hasNext()) {
213    // Vertex v = next.next();
214    // try {
215    // VertexProperty<Object> result = v.property("result");
216    // if (result != null && result.value() != null) {
217    // final Object value = result.value();
218    // injectProperties(objectype, element, value);
219    // }
220    // }
221    // catch (IllegalStateException ex) {
222    // ParallelTaskExecutor.log.log(Level.WARNING, "Caught unexpected exception while
223    // processing result", ex);
224    // }
225    // }
226    }
227   
 
228  144 toggle private void runAndInjectPreconditions(Class<?> objectype, Object element) {
229  146 Map<Class<?>, Object> injectionContext = createInjectionContext(objectype, element);
230  453 for (Class<?> type = objectype; type != null; type = type.getSuperclass()) {
231  304 for (Field f : type.getDeclaredFields()) {
232  600 if (f.isAnnotationPresent(Context.class)) {
233  600 f.setAccessible(true);
234  598 Object value = injectionContext.get(f.getType());
235  597 if (value != null) {
236  147 try {
237  147 f.set(element, value);
238    } catch (IllegalAccessException e) {
239  0 ParallelTaskExecutor.log.log(
240    Level.WARNING,
241    "Failed to set field {0} on type {1}." + " Did you declare it final?",
242    new Object[] {f.getName(), objectype.getName()});
243    }
244    }
245    }
246    }
247    }
248    }
249   
 
250  0 toggle private void injectProperties(Class<?> objectype, Object element, Object value) {
251  0 for (Class<?> type = objectype; type != null; type = type.getSuperclass()) {
252  0 for (Field f : type.getDeclaredFields()) {
253  0 if (f.isAnnotationPresent(Property.class)) {
254  0 f.setAccessible(true);
255  0 try {
256  0 f.set(element, value);
257    } catch (IllegalAccessException e) {
258  0 ParallelTaskExecutor.log.log(
259    Level.WARNING,
260    "Failed to set field {0} on type {1}." + " Did you declare it final?",
261    new Object[] {f.getName(), objectype.getName()});
262    }
263    }
264    }
265    }
266    }
267    }