1   /*
2    * RDFpro - An extensible tool for building stream-oriented RDF processing libraries.
3    * 
4    * Written in 2014 by Francesco Corcoglioniti with support by Marco Amadori, Michele Mostarda,
5    * Alessio Palmero Aprosio and Marco Rospocher. Contact info on http://rdfpro.fbk.eu/
6    * 
7    * To the extent possible under law, the authors have dedicated all copyright and related and
8    * neighboring rights to this software to the public domain worldwide. This software is
9    * distributed without any warranty.
10   * 
11   * You should have received a copy of the CC0 Public Domain Dedication along with this software.
12   * If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.
13   */
14  package eu.fbk.rdfpro.util;
15  
16  import java.io.IOException;
17  import java.io.InputStreamReader;
18  import java.io.Reader;
19  import java.lang.reflect.InvocationTargetException;
20  import java.lang.reflect.Method;
21  import java.net.URL;
22  import java.nio.charset.Charset;
23  import java.util.ArrayList;
24  import java.util.Arrays;
25  import java.util.Collections;
26  import java.util.Enumeration;
27  import java.util.HashMap;
28  import java.util.HashSet;
29  import java.util.Iterator;
30  import java.util.List;
31  import java.util.Map;
32  import java.util.Objects;
33  import java.util.Optional;
34  import java.util.Properties;
35  import java.util.Set;
36  import java.util.concurrent.ConcurrentHashMap;
37  import java.util.concurrent.CountDownLatch;
38  import java.util.concurrent.ExecutorService;
39  import java.util.concurrent.Executors;
40  import java.util.concurrent.ThreadFactory;
41  import java.util.concurrent.atomic.AtomicInteger;
42  import java.util.concurrent.atomic.AtomicReference;
43  
44  import javax.annotation.Nullable;
45  
46  import com.google.common.base.Splitter;
47  import com.google.common.base.Throwables;
48  import com.google.common.collect.ImmutableList;
49  import com.google.common.collect.Lists;
50  
51  import org.slf4j.Logger;
52  import org.slf4j.LoggerFactory;
53  
54  public final class Environment {
55  
56      private static final Logger LOGGER = LoggerFactory.getLogger(Environment.class);
57  
58      @Nullable
59      private static List<String> propertyNames;
60  
61      private static Map<String, String> configuredProperties = new HashMap<>();
62  
63      private static Map<String, String> loadedProperties = new HashMap<>();
64  
65      private static Map<String, Optional<String>> frozenProperties = new ConcurrentHashMap<>();
66  
67      private static ExecutorService configuredPool = null;
68  
69      private static ExecutorService frozenPool = null;
70  
71      private static List<Plugin> frozenPlugins = null;
72  
73      private static int frozenCores = 0;
74  
75      static {
76          final Properties properties = new Properties();
77          properties.setProperty("rdfpro.cores", "" + Runtime.getRuntime().availableProcessors());
78          try {
79              final List<String> envSources = Lists.newArrayList("rdfpro.properties");
80              envSources.addAll(Splitter.on(',').omitEmptyStrings()
81                      .splitToList(System.getProperty("rdfpro.environment.sources", "")));
82              final List<URL> urls = new ArrayList<>();
83              final ClassLoader cl = Environment.class.getClassLoader();
84              for (final String envSource : envSources) {
85                  for (final String p : new String[] { "META-INF/" + envSource, envSource }) {
86                      for (final Enumeration<URL> e = cl.getResources(p); e.hasMoreElements();) {
87                          urls.add(e.nextElement());
88                      }
89                  }
90              }
91              for (final URL url : urls) {
92                  final Reader in = new InputStreamReader(url.openStream(), Charset.forName("UTF-8"));
93                  try {
94                      properties.load(in);
95                      LOGGER.debug("Loaded configuration from '" + url + "'");
96                  } catch (final Throwable ex) {
97                      LOGGER.warn("Could not load configuration from '" + url + "' - ignoring", ex);
98                  } finally {
99                      in.close();
100                 }
101             }
102         } catch (final IOException ex) {
103             LOGGER.warn("Could not complete loading of configuration from classpath resources", ex);
104         }
105         for (final Map.Entry<?, ?> entry : properties.entrySet()) {
106             loadedProperties.put((String) entry.getKey(), (String) entry.getValue());
107         }
108         for (final Map.Entry<?, ?> entry : System.getProperties().entrySet()) {
109             loadedProperties.put((String) entry.getKey(), (String) entry.getValue());
110         }
111         for (final Map.Entry<String, String> entry : System.getenv().entrySet()) {
112             final String key = entry.getKey().toString().toLowerCase().replace('_', '.');
113             loadedProperties.put(key, entry.getValue());
114         }
115     }
116 
117     public static void configurePool(@Nullable final ExecutorService pool) {
118         synchronized (Environment.class) {
119             if (frozenPool != null) {
120                 throw new IllegalStateException("Thread pool already in use");
121             }
122             configuredPool = pool; // to be frozen later
123         }
124     }
125 
126     public static void configureProperty(final String name, @Nullable final String value) {
127         Objects.requireNonNull(name);
128         synchronized (Environment.class) {
129             if (frozenPlugins != null && name.startsWith("plugin,")) {
130                 throw new IllegalStateException("Plugin configuration already loaded");
131             }
132             if (frozenProperties.containsKey(name)) {
133                 throw new IllegalStateException("Property " + name + " already in use (value "
134                         + frozenProperties.get(name) + ")");
135             }
136             propertyNames = null; // invalidate
137             if (value == null) {
138                 configuredProperties.remove(name);
139             } else {
140                 configuredProperties.put(name, value);
141             }
142         }
143     }
144 
145     public static int getCores() {
146         if (frozenCores <= 0) {
147             frozenCores = Integer.parseInt(getProperty("rdfpro.cores"));
148         }
149         return frozenCores;
150     }
151 
152     public static ExecutorService getPool() {
153         if (frozenPool == null) {
154             synchronized (Environment.class) {
155                 if (frozenPool == null) {
156                     frozenPool = configuredPool;
157                     if (frozenPool == null) {
158                         final ThreadFactory factory = new ThreadFactory() {
159 
160                             private final AtomicInteger counter = new AtomicInteger(0);
161 
162                             @Override
163                             public Thread newThread(final Runnable runnable) {
164                                 final int index = this.counter.getAndIncrement();
165                                 final Thread thread = new Thread(runnable);
166                                 thread.setName(String.format("rdfpro-%03d", index));
167                                 thread.setPriority(Thread.NORM_PRIORITY);
168                                 thread.setDaemon(true);
169                                 return thread;
170                             }
171 
172                         };
173                         frozenPool = Executors.newCachedThreadPool(factory);
174                     }
175                     LOGGER.debug("Using pool {}", frozenPool);
176                 }
177             }
178         }
179         return frozenPool;
180     }
181 
182     public static void run(final Iterable<? extends Runnable> runnables) {
183 
184         final List<Runnable> runnableList = ImmutableList.copyOf(runnables);
185         final int parallelism = Math.min(Environment.getCores(), runnableList.size());
186 
187         final CountDownLatch latch = new CountDownLatch(parallelism);
188         final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
189         final AtomicInteger index = new AtomicInteger(0);
190 
191         final List<Runnable> threadRunnables = new ArrayList<Runnable>();
192         for (int i = 0; i < parallelism; ++i) {
193             threadRunnables.add(new Runnable() {
194 
195                 @Override
196                 public void run() {
197                     try {
198                         while (true) {
199                             final int i = index.getAndIncrement();
200                             if (i >= runnableList.size() || exception.get() != null) {
201                                 break;
202                             }
203                             runnableList.get(i).run();
204                         }
205                     } catch (final Throwable ex) {
206                         exception.set(ex);
207                     } finally {
208                         latch.countDown();
209                     }
210                 }
211 
212             });
213         }
214 
215         try {
216             for (int i = 1; i < parallelism; ++i) {
217                 Environment.getPool().submit(threadRunnables.get(i));
218             }
219             if (!threadRunnables.isEmpty()) {
220                 threadRunnables.get(0).run();
221             }
222             latch.await();
223             if (exception.get() != null) {
224                 throw exception.get();
225             }
226         } catch (final Throwable ex) {
227             Throwables.propagate(ex);
228         }
229     }
230 
231     @Nullable
232     public static String getProperty(final String name) {
233         Objects.requireNonNull(name);
234         Optional<String> holder = frozenProperties.get(name);
235         if (holder == null) {
236             synchronized (Environment.class) {
237                 holder = frozenProperties.get(name);
238                 if (holder == null) {
239                     String value;
240                     if (configuredProperties.containsKey(name)) {
241                         value = configuredProperties.get(name);
242                     } else {
243                         value = loadedProperties.get(name);
244                     }
245                     holder = Optional.ofNullable(value);
246                     frozenProperties.put(name, holder);
247                     if (value != null) {
248                         LOGGER.debug("Using {} = {}", name, value);
249                     }
250                 }
251             }
252         }
253         return holder.orElse(null);
254     }
255 
256     @Nullable
257     public static String getProperty(final String name, @Nullable final String valueIfNull) {
258         final String value = getProperty(name);
259         return value != null ? value : valueIfNull;
260     }
261 
262     public static List<String> getPropertyNames() {
263         synchronized (Environment.class) {
264             if (propertyNames == null) {
265                 propertyNames = new ArrayList<>();
266                 propertyNames.addAll(loadedProperties.keySet());
267                 for (final String property : configuredProperties.keySet()) {
268                     if (!loadedProperties.containsKey(property)) {
269                         propertyNames.add(property);
270                     }
271                 }
272                 Collections.sort(propertyNames);
273             }
274         }
275         return propertyNames;
276     }
277 
278     public static Map<String, String> getPlugins(final Class<?> baseClass) {
279 
280         Objects.requireNonNull(baseClass);
281 
282         if (frozenPlugins == null) {
283             loadPlugins();
284         }
285 
286         final Map<String, String> map = new HashMap<>();
287         for (final Plugin plugin : frozenPlugins) {
288             if (baseClass.isAssignableFrom(plugin.factory.getReturnType())) {
289                 map.put(plugin.names.get(0), plugin.description);
290             }
291         }
292         return map;
293     }
294 
295     public static <T> T newPlugin(final Class<T> baseClass, final String name,
296             final String... args) {
297 
298         Objects.requireNonNull(baseClass);
299         Objects.requireNonNull(name);
300         if (Arrays.asList(args).contains(null)) {
301             throw new NullPointerException();
302         }
303 
304         if (frozenPlugins == null) {
305             loadPlugins();
306         }
307 
308         for (final Plugin plugin : frozenPlugins) {
309             if (baseClass.isAssignableFrom(plugin.factory.getReturnType())
310                     && plugin.names.contains(name)) {
311                 try {
312                     return baseClass.cast(plugin.factory.invoke(null, name, args));
313                 } catch (final IllegalAccessException ex) {
314                     throw new Error("Unexpected error (!)", ex); // checked when loading plugins
315                 } catch (final InvocationTargetException ex) {
316                     final Throwable cause = ex.getCause();
317                     throw cause instanceof RuntimeException ? (RuntimeException) cause
318                             : new RuntimeException(ex);
319                 }
320             }
321         }
322 
323         throw new IllegalArgumentException("Unknown " + baseClass.getSimpleName() + " plugin '"
324                 + name + "'");
325     }
326 
327     @SuppressWarnings("unchecked")
328     private static void loadPlugins() {
329         synchronized (Environment.class) {
330             if (frozenPlugins != null) {
331                 return;
332             }
333             final Set<String> disabledNames = new HashSet<>();
334             final List<Plugin> plugins = new ArrayList<>();
335             for (final Map<String, String> map : new Map[] { loadedProperties,
336                     configuredProperties }) {
337                 for (final Map.Entry<String, String> entry : map.entrySet()) {
338                     final String name = entry.getKey();
339                     final String value = entry.getValue();
340                     if (name.startsWith("plugin.enable.") || name.startsWith("plugin,enable,")) {
341                         final List<String> names = Arrays.asList(name.substring(
342                                 "plugin.enable.".length()).split("[.,]"));
343                         if (value.equalsIgnoreCase("true")) {
344                             disabledNames.removeAll(names);
345                         } else {
346                             disabledNames.addAll(names);
347                         }
348                     } else if (name.startsWith("plugin,") || name.startsWith("plugin.")) {
349                         try {
350                             final String s = name.substring("plugin.".length());
351                             String[] tokens = s.split(",");
352                             if (tokens.length == 1) {
353                                 final String[] allTokens = s.split("\\.");
354                                 for (int i = 0; i < allTokens.length; ++i) {
355                                     if (Character.isUpperCase(allTokens[i].charAt(0))) {
356                                         tokens = new String[allTokens.length - i];
357                                         tokens[0] = String.join(".",
358                                                 Arrays.copyOfRange(allTokens, 0, i + 1));
359                                         System.arraycopy(allTokens, i + 1, tokens, 1,
360                                                 allTokens.length - i - 1);
361                                     }
362                                 }
363                             }
364                             final String className = tokens[0];
365                             final String methodName = tokens[1];
366                             final List<String> pluginNames = Arrays.asList(Arrays.copyOfRange(
367                                     tokens, 2, tokens.length));
368                             final Class<?> clazz = Class.forName(className);
369                             final Method method = clazz.getDeclaredMethod(methodName,
370                                     String.class, String[].class);
371                             method.setAccessible(true);
372                             plugins.add(new Plugin(pluginNames, value, method));
373                         } catch (final Throwable ex) {
374                             LOGGER.warn("Invalid plugin definition " + name + " - ignoring", ex);
375                         }
376                     }
377                 }
378             }
379             for (final Iterator<Plugin> i = plugins.iterator(); i.hasNext();) {
380                 final List<String> names = i.next().names;
381                 for (final String name : names) {
382                     if (disabledNames.contains(name)) {
383                         i.remove();
384                         break;
385                     }
386                 }
387             }
388             frozenPlugins = plugins;
389         }
390     }
391 
392     private static final class Plugin {
393 
394         public final List<String> names;
395 
396         public final String description;
397 
398         public final Method factory;
399 
400         Plugin(final List<String> names, final String description, final Method factory) {
401             this.names = names;
402             this.description = description;
403             this.factory = factory;
404         }
405 
406     }
407 
408 }