1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package com.rapiddweller.task;
28
29 import com.rapiddweller.common.Context;
30 import com.rapiddweller.common.ErrorHandler;
31 import com.rapiddweller.contiperf.PerformanceTracker;
32 import com.rapiddweller.platform.contiperf.PerfTrackingTaskProxy;
33 import org.apache.logging.log4j.LogManager;
34 import org.apache.logging.log4j.Logger;
35
36 import java.io.PrintWriter;
37 import java.util.List;
38
39
40
41
42
43
44
45
46 public class TaskExecutor {
47
48 private static final Logger LOGGER =
49 LogManager.getLogger(TaskExecutor.class);
50
51 private final Task target;
52 private final Context context;
53 private final ErrorHandler errorHandler;
54 private final List<PageListener> pageListeners;
55 private final long pageSize;
56 private final boolean infoLog;
57 private PerformanceTracker tracker;
58
59 private TaskExecutor(Task target, List<PageListener> pageListeners,
60 long pageSize,
61 boolean stats, Context context,
62 ErrorHandler errorHandler, boolean infoLog) {
63 this.context = context;
64 this.errorHandler = errorHandler;
65 if (stats) {
66 target = new PerfTrackingTaskProxy(target);
67 this.tracker =
68 ((PerfTrackingTaskProxy) target).getOrCreateTracker();
69 }
70 this.target = new StateTrackingTaskProxy<>(target);
71 this.pageListeners = pageListeners;
72 this.pageSize = pageSize;
73 this.infoLog = infoLog;
74 }
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89 public static void execute(Task task, Context context,
90 Long requestedInvocations, Long minInvocations,
91 List<PageListener> pageListeners, long pageSize,
92 boolean stats,
93 ErrorHandler errorHandler, boolean infoLog) {
94 TaskExecutorml#TaskExecutor">TaskExecutor runner = new TaskExecutor(task, pageListeners,
95 pageSize, stats, context, errorHandler, infoLog);
96 runner.run(requestedInvocations, minInvocations);
97 }
98
99 private static long runWithoutPage(Task target, Long invocationCount,
100 Context context,
101 ErrorHandler errorHandler) {
102 long actualCount = 0;
103 for (int i = 0; invocationCount == null || i < invocationCount; i++) {
104 TaskResult stepResult = target.execute(context, errorHandler);
105 if (stepResult != TaskResult.UNAVAILABLE) {
106 actualCount++;
107 }
108 if (stepResult != TaskResult.EXECUTING) {
109 break;
110 }
111 }
112 return actualCount;
113 }
114
115 private static void logExecutionInfo(Task task, Long minInvocations,
116 Long maxInvocations, long pageSize,
117 boolean infoLog) {
118 if (infoLog) {
119 if (LOGGER.isInfoEnabled()) {
120 LOGGER.info(executionInfo(task, minInvocations, maxInvocations,
121 pageSize));
122 }
123 } else if (LOGGER.isDebugEnabled()) {
124 LOGGER.debug(executionInfo(task, minInvocations, maxInvocations,
125 pageSize));
126 }
127 }
128
129 private static String executionInfo(Task task, Long minInvocations,
130 Long maxInvocations, long pageSize) {
131 String invocationInfo =
132 (maxInvocations == null ? "as long as available" :
133 (maxInvocations > 1 ? maxInvocations + " times" : ""));
134 if (minInvocations != null && minInvocations > 0 &&
135 (maxInvocations == null || maxInvocations > minInvocations)) {
136 invocationInfo +=
137 " requiring at least " + minInvocations + " generations";
138 }
139 if (invocationInfo.length() > 0) {
140 invocationInfo +=
141 " with page size " + pageSize + " in a single thread";
142 }
143 return "Running task " + task + " " + invocationInfo;
144 }
145
146 private void run(Long requestedInvocations, Long minInvocations) {
147 logExecutionInfo(target, requestedInvocations, minInvocations, pageSize,
148 infoLog);
149
150 long countValue = run(requestedInvocations);
151
152 if (minInvocations != null && countValue < minInvocations) {
153 throw new TaskUnavailableException(target, minInvocations,
154 countValue);
155 }
156 if (tracker != null) {
157 tracker.getCounters()[0]
158 .printSummary(new PrintWriter(System.out), 90, 95);
159 }
160 }
161
162 private long run(Long requestedInvocations) {
163 if (requestedInvocations != null && requestedInvocations == 0) {
164 return 0;
165 }
166 long queuedInvocations = 0;
167 long actualCount = 0;
168 if (requestedInvocations != null) {
169 queuedInvocations = requestedInvocations;
170 }
171 LOGGER.debug("Starting task {}", getTaskName());
172 int currentPageNo = 0;
173 do {
174 try {
175 if (pageSize > 0) {
176 pageStarting(currentPageNo);
177 }
178 long currentPageSize = currentPageSize(requestedInvocations,
179 queuedInvocations);
180 queuedInvocations -= currentPageSize;
181 actualCount += runPage(currentPageSize, (pageSize > 0));
182 if (pageSize > 0) {
183 pageFinished(currentPageNo, context);
184 }
185 currentPageNo++;
186 } catch (Exception e) {
187 errorHandler.handleError(
188 "Error in execution of task " + getTaskName(), e);
189 }
190 } while (workPending(requestedInvocations, queuedInvocations));
191 LOGGER.debug("Finished task {}", getTaskName());
192 return actualCount;
193 }
194
195
196
197
198
199
200
201
202 protected long currentPageSize(Long requestedInvocations,
203 long queuedInvocations) {
204 if (pageSize > 0) {
205 return (requestedInvocations == null ? pageSize :
206 Math.min(pageSize, queuedInvocations));
207 } else {
208 return (requestedInvocations == null ? 1 :
209 Math.min(requestedInvocations, queuedInvocations));
210 }
211 }
212
213 private String getTaskName() {
214 return target.getTaskName();
215 }
216
217 private long runPage(Long invocationCount, boolean finishPage) {
218 try {
219 return runWithoutPage(target, invocationCount, context,
220 errorHandler);
221 } finally {
222 if (finishPage) {
223 target.pageFinished();
224 }
225 }
226 }
227
228 private boolean workPending(Long maxInvocationCount,
229 long queuedInvocations) {
230 if (((StateTrackingTaskProxy<? extends Task>) target).isAvailable()) {
231 return false;
232 }
233 if (maxInvocationCount == null) {
234 return true;
235 }
236 return (queuedInvocations > 0);
237 }
238
239 private void pageStarting(int currentPageNo) {
240 if (LOGGER.isDebugEnabled()) {
241 LOGGER.debug("Starting page " + (currentPageNo + 1) + " of " +
242 getTaskName() + " with pageSize=" + pageSize);
243 }
244 if (pageListeners != null) {
245 for (PageListener listener : pageListeners) {
246 listener.pageStarting();
247 }
248 }
249 }
250
251 private void pageFinished(int currentPageNo, Context context) {
252 LOGGER.debug("Page {} of {} finished", currentPageNo + 1,
253 getTaskName());
254 if (pageListeners != null) {
255 for (PageListener listener : pageListeners) {
256 listener.pageFinished();
257 }
258 }
259 }
260
261
262
263
264 @Override
265 public String toString() {
266 return getClass().getSimpleName();
267 }
268
269 }