1
2
3
4
5
6
7
8
9
10
11
12
13
14 package eu.fbk.rdfpro;
15
16 import java.io.IOException;
17 import java.util.ArrayList;
18 import java.util.Arrays;
19 import java.util.Collections;
20 import java.util.HashMap;
21 import java.util.HashSet;
22 import java.util.List;
23 import java.util.Map;
24 import java.util.Objects;
25 import java.util.Set;
26 import java.util.concurrent.ConcurrentHashMap;
27 import java.util.function.Consumer;
28
29 import javax.annotation.Nullable;
30
31 import org.openrdf.model.Resource;
32 import org.openrdf.model.Statement;
33 import org.openrdf.model.URI;
34 import org.openrdf.model.Value;
35 import org.openrdf.rio.RDFHandler;
36 import org.openrdf.rio.RDFHandlerException;
37
38 import eu.fbk.rdfpro.util.Hash;
39 import eu.fbk.rdfpro.util.IO;
40 import eu.fbk.rdfpro.util.Sorter;
41 import eu.fbk.rdfpro.util.Statements;
42
43 final class ProcessorUnique implements RDFProcessor {
44
45 private final boolean mergeContexts;
46
47 ProcessorUnique(final boolean mergeContexts) {
48 this.mergeContexts = mergeContexts;
49 }
50
51 @SuppressWarnings("resource")
52 @Override
53 public RDFHandler wrap(final RDFHandler handler) {
54 Objects.requireNonNull(handler);
55 return this.mergeContexts ? new MergeHandler(RDFHandlers.decouple(handler))
56 : new Handler(handler, true);
57 }
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397 private static class Handler extends AbstractRDFHandlerWrapper {
398
399 private final boolean parallelize;
400
401 private Sorter<Statement> sorter;
402
403 Handler(final RDFHandler handler, final boolean parallelize) {
404 super(handler);
405 this.parallelize = parallelize;
406 this.sorter = null;
407 }
408
409 @Override
410 public void startRDF() throws RDFHandlerException {
411 super.startRDF();
412 this.sorter = Sorter.newStatementSorter(true);
413 try {
414 this.sorter.start(true);
415 } catch (final IOException ex) {
416 throw new RDFHandlerException(ex);
417 }
418 }
419
420 @Override
421 public void handleStatement(final Statement statement) throws RDFHandlerException {
422 try {
423 this.sorter.emit(statement);
424 } catch (final Throwable ex) {
425 throw new RDFHandlerException(ex);
426 }
427 }
428
429 @Override
430 public void endRDF() throws RDFHandlerException {
431 try {
432 this.sorter.end(this.parallelize, new Consumer<Statement>() {
433
434 @Override
435 public void accept(final Statement statement) {
436 try {
437 handleStatementSorted(statement);
438 } catch (final RDFHandlerException ex) {
439 throw new RuntimeException(ex);
440 }
441 }
442
443 });
444 this.sorter.close();
445 this.sorter = null;
446 handleEndRDF();
447 } catch (final IOException ex) {
448 throw new RDFHandlerException(ex);
449 }
450 super.endRDF();
451 }
452
453 @Override
454 public final void close() {
455 IO.closeQuietly(this.sorter);
456 super.close();
457 }
458
459 void handleStatementSorted(final Statement statement) throws RDFHandlerException {
460 this.handler.handleStatement(statement);
461 }
462
463 void handleEndRDF() throws RDFHandlerException {
464 }
465
466 }
467
468 private static final class MergeHandler extends Handler {
469
470 private final Map<Resource, List<Statement>> contextsStatements;
471
472 private final Map<ContextSet, Resource> mergedContexts;
473
474 @Nullable
475 private Statement statement;
476
477 @Nullable
478 private Resource statementSubj;
479
480 @Nullable
481 private URI statementPred;
482
483 @Nullable
484 private Value statementObj;
485
486 @Nullable
487 private Resource statementCtx;
488
489 private final Set<Resource> statementContexts;
490
491 public MergeHandler(final RDFHandler handler) {
492 super(handler, false);
493 this.contextsStatements = new ConcurrentHashMap<>();
494 this.mergedContexts = new HashMap<>();
495 this.statementSubj = null;
496 this.statementPred = null;
497 this.statementObj = null;
498 this.statementCtx = null;
499 this.statementContexts = new HashSet<>();
500 }
501
502 @Override
503 public void handleStatement(final Statement statement) throws RDFHandlerException {
504 super.handleStatement(statement);
505 final Resource context = statement.getContext();
506 if (context != null) {
507 this.contextsStatements.putIfAbsent(context, Collections.<Statement>emptyList());
508 }
509 }
510
511 @Override
512 void handleStatementSorted(final Statement statement) throws RDFHandlerException {
513
514 final Resource subj = statement.getSubject();
515 final URI pred = statement.getPredicate();
516 final Value obj = statement.getObject();
517 final Resource ctx = statement.getContext();
518
519 List<Statement> contextStatements = this.contextsStatements.get(subj);
520 if (contextStatements != null) {
521 if (contextStatements.isEmpty()) {
522 contextStatements = new ArrayList<>();
523 this.contextsStatements.put(subj, contextStatements);
524 }
525 contextStatements.add(statement);
526
527 } else if (subj.equals(this.statementSubj) && pred.equals(this.statementPred)
528 && obj.equals(this.statementObj)) {
529 if (this.statementCtx != null) {
530 if (ctx == null) {
531 this.statementCtx = null;
532 this.statement = statement;
533 } else {
534 if (this.statementContexts.isEmpty()) {
535
536
537 this.statementContexts.add(this.statementCtx);
538 }
539 this.statementContexts.add(ctx);
540 }
541 }
542
543 } else {
544 flush();
545 this.statement = statement;
546 this.statementSubj = subj;
547 this.statementPred = pred;
548 this.statementObj = obj;
549 this.statementCtx = ctx;
550 this.statementContexts.clear();
551 }
552 }
553
554 @Override
555 void handleEndRDF() throws RDFHandlerException {
556 flush();
557 for (final List<Statement> statements : this.contextsStatements.values()) {
558 for (final Statement statement : statements) {
559 this.handler.handleStatement(statement);
560 }
561 }
562 for (final Map.Entry<ContextSet, Resource> entry : this.mergedContexts.entrySet()) {
563 final ContextSet set = entry.getKey();
564 final Resource context = entry.getValue();
565 final Set<Statement> statements = new HashSet<Statement>();
566 for (final Resource source : set.contexts) {
567 for (final Statement statement : this.contextsStatements.get(source)) {
568 statements.add(Statements.VALUE_FACTORY.createStatement(context,
569 statement.getPredicate(), statement.getObject(),
570 statement.getContext()));
571 }
572 }
573 for (final Statement statement : statements) {
574 this.handler.handleStatement(statement);
575 }
576 }
577 }
578
579 private void flush() throws RDFHandlerException {
580 if (this.statement != null) {
581 Statement statement;
582 if (this.statementCtx == null || this.statementContexts.size() <= 1) {
583 statement = this.statement;
584 } else {
585 final Resource mergedContext = mergeContexts(this.statementContexts);
586 statement = mergedContext.equals(this.statement.getContext()) ? this.statement
587 : Statements.VALUE_FACTORY.createStatement(this.statementSubj,
588 this.statementPred, this.statementObj, mergedContext);
589 }
590 this.handler.handleStatement(statement);
591 }
592 }
593
594 @Nullable
595 private Resource mergeContexts(final Set<Resource> contexts) {
596
597 final ContextSet set = new ContextSet(contexts.toArray(new Resource[contexts.size()]));
598
599 Resource context = this.mergedContexts.get(set);
600 if (context == null) {
601 final String[] args = new String[contexts.size()];
602 String namespace = null;
603 int index = 0;
604 for (final Resource source : contexts) {
605 args[index++] = source.stringValue();
606 if (source instanceof URI) {
607 final String ns = ((URI) source).getNamespace();
608 if (namespace == null) {
609 namespace = ns;
610 } else {
611 final int length = Math.min(ns.length(), namespace.length());
612 for (int i = 0; i < length; ++i) {
613 if (ns.charAt(i) != namespace.charAt(i)) {
614 namespace = ns.substring(0, i);
615 break;
616 }
617 }
618 }
619 }
620 }
621 Arrays.sort(args);
622 if (namespace == null || "".equals(namespace)) {
623 namespace = "urn:graph:";
624 } else if (!namespace.endsWith("/") && !namespace.endsWith("#")
625 && !namespace.endsWith(":")) {
626 namespace = namespace + "/";
627 }
628 final String localName = Hash.murmur3(args).toString();
629 context = Statements.VALUE_FACTORY.createURI(namespace, localName);
630 this.mergedContexts.put(set, context);
631 }
632 return context;
633 }
634
635 private static final class ContextSet {
636
637 Resource[] contexts;
638
639 int hash;
640
641 ContextSet(final Resource[] contexts) {
642
643 int hash = 0;
644 for (final Resource context : contexts) {
645 hash += context.hashCode();
646 }
647
648 this.contexts = contexts;
649 this.hash = hash;
650 }
651
652 @Override
653 public boolean equals(final Object object) {
654 if (object == this) {
655 return true;
656 }
657 if (!(object instanceof ContextSet)) {
658 return false;
659 }
660 final ContextSet other = (ContextSet) object;
661 if (this.hash != other.hash || this.contexts.length != other.contexts.length) {
662 return false;
663 }
664 final boolean[] matched = new boolean[this.contexts.length];
665 outer: for (int i = 0; i < this.contexts.length; ++i) {
666 final Resource thisContext = this.contexts[i];
667 for (int j = 0; j < this.contexts.length; ++j) {
668 if (!matched[j]) {
669 final Resource otherContext = other.contexts[j];
670 if (thisContext.equals(otherContext)) {
671 matched[j] = true;
672 continue outer;
673 }
674 }
675 }
676 return false;
677 }
678 return true;
679 }
680
681 @Override
682 public int hashCode() {
683 return this.hash;
684 }
685
686 }
687
688 }
689
690 }