1   /*
2    * RDFpro - An extensible tool for building stream-oriented RDF processing libraries.
3    * 
4    * Written in 2014 by Francesco Corcoglioniti with support by Marco Amadori, Michele Mostarda,
5    * Alessio Palmero Aprosio and Marco Rospocher. Contact info on http://rdfpro.fbk.eu/
6    * 
7    * To the extent possible under law, the authors have dedicated all copyright and related and
8    * neighboring rights to this software to the public domain worldwide. This software is
9    * distributed without any warranty.
10   * 
11   * You should have received a copy of the CC0 Public Domain Dedication along with this software.
12   * If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.
13   */
14  package eu.fbk.rdfpro.util;
15  
16  import java.util.Map;
17  import java.util.Objects;
18  import java.util.TreeMap;
19  import java.util.concurrent.atomic.AtomicInteger;
20  import java.util.concurrent.atomic.AtomicLong;
21  
22  import javax.annotation.Nullable;
23  
24  import org.slf4j.Logger;
25  import org.slf4j.LoggerFactory;
26  
27  public final class Tracker {
28  
29      private static final Logger STATUS_LOGGER = LoggerFactory.getLogger("status."
30              + Tracker.class.getName());
31  
32      private static final Map<Integer, String> STATUS_DATA = new TreeMap<Integer, String>();
33  
34      private static final AtomicInteger STATUS_KEY_COUNTER = new AtomicInteger(0);
35  
36      private final Logger logger;
37  
38      @Nullable
39      private final String startMessage;
40  
41      @Nullable
42      private final String endMessage;
43  
44      @Nullable
45      private final String statusMessage;
46  
47      @Nullable
48      private Integer statusKey;
49  
50      private final AtomicLong counter;
51  
52      private long counterAtTs = 0;
53  
54      private long ts0;
55  
56      private long ts1;
57  
58      private long ts;
59  
60      private long chunkSize;
61  
62      public Tracker(final Logger logger, @Nullable final String startMessage,
63              @Nullable final String endMessage, @Nullable final String statusMessage) {
64          this.logger = Objects.requireNonNull(logger);
65          this.startMessage = startMessage;
66          this.endMessage = endMessage;
67          this.statusMessage = statusMessage;
68          this.statusKey = null;
69          this.counter = new AtomicLong(0L);
70      }
71  
72      public void start() {
73          this.counter.set(0L);
74          this.counterAtTs = 0;
75          this.ts0 = 0;
76          this.ts1 = 0;
77          this.ts = 0;
78          this.chunkSize = 1L;
79          if (this.startMessage != null) {
80              this.logger.info(this.startMessage);
81          }
82      }
83  
84      public void increment() {
85          final long counter = this.counter.getAndIncrement();
86          if (counter % this.chunkSize == 0 && this.statusMessage != null) {
87              updateStatus(counter + 1);
88          }
89      }
90  
91      public void add(final long delta) {
92          final long counter = this.counter.addAndGet(delta);
93          if (this.statusMessage != null
94                  && (counter - delta) / this.chunkSize < counter / this.chunkSize) {
95              updateStatus(counter);
96          }
97      }
98  
99      public void end() {
100         if (this.statusMessage != null && this.statusKey != null) {
101             registerStatus(this.statusKey, null);
102         }
103         if (this.endMessage != null) {
104             final long ts = this.ts1;
105             final long avgThroughput = this.counter.get() * 1000 / (ts - this.ts0 + 1);
106             if (this.logger.isInfoEnabled()) {
107                 this.logger
108                         .info(String.format(this.endMessage, this.counter.get(), avgThroughput));
109             }
110         }
111     }
112 
113     private synchronized void updateStatus(final long counter) {
114         synchronized (this) {
115             final long ts = System.currentTimeMillis();
116             this.ts1 = ts;
117             if (this.ts0 == 0) {
118                 this.ts0 = ts;
119                 this.ts = ts;
120                 this.statusKey = STATUS_KEY_COUNTER.getAndIncrement();
121             }
122             final long delta = ts - this.ts0;
123             if (delta > 0) {
124                 final long avgThroughput = counter * 1000 / delta;
125                 this.chunkSize = avgThroughput < 10 ? 1
126                         : avgThroughput < 10000 ? avgThroughput / 10 : 1000;
127                 if (ts / 1000 - this.ts / 1000 >= 1 || this.chunkSize == 1) {
128                     final long throughput = ts == this.ts ? 0L : (counter - this.counterAtTs)
129                             * 1000 / (ts - this.ts);
130                     this.ts = ts;
131                     this.counterAtTs = counter;
132                     registerStatus(this.statusKey, String.format(this.statusMessage, counter - 1,
133                             throughput, avgThroughput));
134                 }
135             }
136         }
137     }
138 
139     private static void registerStatus(final Integer key, @Nullable final String message) {
140         synchronized (STATUS_DATA) {
141             if (message == null) {
142                 STATUS_DATA.remove(key);
143             } else {
144                 STATUS_DATA.put(key, message);
145             }
146             if (STATUS_LOGGER.isInfoEnabled()) {
147                 final StringBuilder builder = new StringBuilder();
148                 int count = 0;
149                 for (final String value : STATUS_DATA.values()) {
150                     if (count == 4) {
151                         builder.append(" ..."); // max 4 elements printed
152                         break;
153                     } else if (count > 0) {
154                         builder.append(" | ");
155                     }
156                     builder.append(value);
157                     ++count;
158                 }
159                 builder.append((char) 0);
160                 STATUS_LOGGER.info(builder.toString());
161             }
162         }
163     }
164 
165 }