View Javadoc
1   /*
2    * (c) Copyright 2006-2020 by rapiddweller GmbH & Volker Bergmann. All rights reserved.
3    *
4    * Redistribution and use in source and binary forms, with or without
5    * modification, is permitted under the terms of the
6    * GNU General Public License.
7    *
8    * For redistributing this software or a derivative work under a license other
9    * than the GPL-compatible Free Software License as defined by the Free
10   * Software Foundation or approved by OSI, you must first obtain a commercial
11   * license to this software product from rapiddweller GmbH & Volker Bergmann.
12   *
13   * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
14   * WITHOUT A WARRANTY OF ANY KIND. ALL EXPRESS OR IMPLIED CONDITIONS,
15   * REPRESENTATIONS AND WARRANTIES, INCLUDING ANY IMPLIED WARRANTY OF
16   * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE OR NON-INFRINGEMENT, ARE
17   * HEREBY EXCLUDED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
18   * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
19   * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
20   * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
21   * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
22   * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
23   * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
24   * POSSIBILITY OF SUCH DAMAGE.
25   */
26  
27  package com.rapiddweller.task;
28  
29  import com.rapiddweller.common.Context;
30  import com.rapiddweller.common.ErrorHandler;
31  import com.rapiddweller.contiperf.PerformanceTracker;
32  import com.rapiddweller.platform.contiperf.PerfTrackingTaskProxy;
33  import org.apache.logging.log4j.LogManager;
34  import org.apache.logging.log4j.Logger;
35  
36  import java.io.PrintWriter;
37  import java.util.List;
38  
39  /**
40   * Single-threaded non-locking {@link Task} executor.<br/><br/>
41   * Created: 19.12.2012 09:54:56
42   *
43   * @author Volker Bergmann
44   * @since 0.8.0
45   */
46  public class TaskExecutor {
47  
48    private static final Logger LOGGER =
49        LogManager.getLogger(TaskExecutor.class);
50  
51    private final Task target;
52    private final Context context;
53    private final ErrorHandler errorHandler;
54    private final List<PageListener> pageListeners;
55    private final long pageSize;
56    private final boolean infoLog;
57    private PerformanceTracker tracker;
58  
59    private TaskExecutor(Task target, List<PageListener> pageListeners,
60                         long pageSize,
61                         boolean stats, Context context,
62                         ErrorHandler errorHandler, boolean infoLog) {
63      this.context = context;
64      this.errorHandler = errorHandler;
65      if (stats) {
66        target = new PerfTrackingTaskProxy(target);
67        this.tracker =
68            ((PerfTrackingTaskProxy) target).getOrCreateTracker();
69      }
70      this.target = new StateTrackingTaskProxy<>(target);
71      this.pageListeners = pageListeners;
72      this.pageSize = pageSize;
73      this.infoLog = infoLog;
74    }
75  
76    /**
77     * Execute.
78     *
79     * @param task                 the task
80     * @param context              the context
81     * @param requestedInvocations the requested invocations
82     * @param minInvocations       the min invocations
83     * @param pageListeners        the page listeners
84     * @param pageSize             the page size
85     * @param stats                the stats
86     * @param errorHandler         the error handler
87     * @param infoLog              the info log
88     */
89    public static void execute(Task task, Context context,
90                               Long requestedInvocations, Long minInvocations,
91                               List<PageListener> pageListeners, long pageSize,
92                               boolean stats,
93                               ErrorHandler errorHandler, boolean infoLog) {
94      TaskExecutorml#TaskExecutor">TaskExecutor runner = new TaskExecutor(task, pageListeners,
95          pageSize, stats, context, errorHandler, infoLog);
96      runner.run(requestedInvocations, minInvocations);
97    }
98  
99    private static long runWithoutPage(Task target, Long invocationCount,
100                                      Context context,
101                                      ErrorHandler errorHandler) {
102     long actualCount = 0;
103     for (int i = 0; invocationCount == null || i < invocationCount; i++) {
104       TaskResult stepResult = target.execute(context, errorHandler);
105       if (stepResult != TaskResult.UNAVAILABLE) {
106         actualCount++;
107       }
108       if (stepResult != TaskResult.EXECUTING) {
109         break;
110       }
111     }
112     return actualCount;
113   }
114 
115   private static void logExecutionInfo(Task task, Long minInvocations,
116                                        Long maxInvocations, long pageSize,
117                                        boolean infoLog) {
118     if (infoLog) {
119       if (LOGGER.isInfoEnabled()) {
120         LOGGER.info(executionInfo(task, minInvocations, maxInvocations,
121             pageSize));
122       }
123     } else if (LOGGER.isDebugEnabled()) {
124       LOGGER.debug(executionInfo(task, minInvocations, maxInvocations,
125           pageSize));
126     }
127   }
128 
129   private static String executionInfo(Task task, Long minInvocations,
130                                       Long maxInvocations, long pageSize) {
131     String invocationInfo =
132         (maxInvocations == null ? "as long as available" :
133             (maxInvocations > 1 ? maxInvocations + " times" : ""));
134     if (minInvocations != null && minInvocations > 0 &&
135         (maxInvocations == null || maxInvocations > minInvocations)) {
136       invocationInfo +=
137           " requiring at least " + minInvocations + " generations";
138     }
139     if (invocationInfo.length() > 0) {
140       invocationInfo +=
141           " with page size " + pageSize + " in a single thread";
142     }
143     return "Running task " + task + " " + invocationInfo;
144   }
145 
146   private void run(Long requestedInvocations, Long minInvocations) {
147     logExecutionInfo(target, requestedInvocations, minInvocations, pageSize,
148         infoLog);
149     // first run without verification
150     long countValue = run(requestedInvocations);
151     // afterwards verify execution count
152     if (minInvocations != null && countValue < minInvocations) {
153       throw new TaskUnavailableException(target, minInvocations,
154           countValue);
155     }
156     if (tracker != null) {
157       tracker.getCounters()[0]
158           .printSummary(new PrintWriter(System.out), 90, 95);
159     }
160   }
161 
162   private long run(Long requestedInvocations) {
163     if (requestedInvocations != null && requestedInvocations == 0) {
164       return 0;
165     }
166     long queuedInvocations = 0;
167     long actualCount = 0;
168     if (requestedInvocations != null) {
169       queuedInvocations = requestedInvocations;
170     }
171     LOGGER.debug("Starting task {}", getTaskName());
172     int currentPageNo = 0;
173     do {
174       try {
175         if (pageSize > 0) {
176           pageStarting(currentPageNo);
177         }
178         long currentPageSize = currentPageSize(requestedInvocations,
179             queuedInvocations);
180         queuedInvocations -= currentPageSize;
181         actualCount += runPage(currentPageSize, (pageSize > 0));
182         if (pageSize > 0) {
183           pageFinished(currentPageNo, context);
184         }
185         currentPageNo++;
186       } catch (Exception e) {
187         errorHandler.handleError(
188             "Error in execution of task " + getTaskName(), e);
189       }
190     } while (workPending(requestedInvocations, queuedInvocations));
191     LOGGER.debug("Finished task {}", getTaskName());
192     return actualCount;
193   }
194 
195   /**
196    * Current page size long.
197    *
198    * @param requestedInvocations the requested invocations
199    * @param queuedInvocations    the queued invocations
200    * @return the long
201    */
202   protected long currentPageSize(Long requestedInvocations,
203                                  long queuedInvocations) {
204     if (pageSize > 0) {
205       return (requestedInvocations == null ? pageSize :
206           Math.min(pageSize, queuedInvocations));
207     } else {
208       return (requestedInvocations == null ? 1 :
209           Math.min(requestedInvocations, queuedInvocations));
210     }
211   }
212 
213   private String getTaskName() {
214     return target.getTaskName();
215   }
216 
217   private long runPage(Long invocationCount, boolean finishPage) {
218     try {
219       return runWithoutPage(target, invocationCount, context,
220           errorHandler);
221     } finally {
222       if (finishPage) {
223         target.pageFinished();
224       }
225     }
226   }
227 
228   private boolean workPending(Long maxInvocationCount,
229                               long queuedInvocations) {
230     if (((StateTrackingTaskProxy<? extends Task>) target).isAvailable()) {
231       return false;
232     }
233     if (maxInvocationCount == null) {
234       return true;
235     }
236     return (queuedInvocations > 0);
237   }
238 
239   private void pageStarting(int currentPageNo) {
240     if (LOGGER.isDebugEnabled()) {
241       LOGGER.debug("Starting page " + (currentPageNo + 1) + " of " +
242           getTaskName() + " with pageSize=" + pageSize);
243     }
244     if (pageListeners != null) {
245       for (PageListener listener : pageListeners) {
246         listener.pageStarting();
247       }
248     }
249   }
250 
251   private void pageFinished(int currentPageNo, Context context) {
252     LOGGER.debug("Page {} of {} finished", currentPageNo + 1,
253         getTaskName());
254     if (pageListeners != null) {
255       for (PageListener listener : pageListeners) {
256         listener.pageFinished();
257       }
258     }
259   }
260 
261 
262   // java.lang.Object overrides --------------------------------------------------------------------------------------
263 
264   @Override
265   public String toString() {
266     return getClass().getSimpleName();
267   }
268 
269 }