1   /*
2    * RDFpro - An extensible tool for building stream-oriented RDF processing libraries.
3    * 
4    * Written in 2015 by Francesco Corcoglioniti with support by Alessio Palmero Aprosio and Marco
5    * Rospocher. Contact info on http://rdfpro.fbk.eu/
6    * 
7    * To the extent possible under law, the authors have dedicated all copyright and related and
8    * neighboring rights to this software to the public domain worldwide. This software is
9    * distributed without any warranty.
10   * 
11   * You should have received a copy of the CC0 Public Domain Dedication along with this software.
12   * If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.
13   */
14  package eu.fbk.rdfpro.util;
15  
16  import java.util.HashSet;
17  import java.util.Iterator;
18  import java.util.Objects;
19  import java.util.Set;
20  
21  import javax.annotation.Nullable;
22  
23  import org.openrdf.model.Namespace;
24  import org.openrdf.model.Resource;
25  import org.openrdf.model.Statement;
26  import org.openrdf.model.URI;
27  import org.openrdf.model.Value;
28  import org.openrdf.model.impl.NamespaceImpl;
29  import org.openrdf.model.util.ModelException;
30  import org.openrdf.query.BindingSet;
31  import org.openrdf.query.Dataset;
32  import org.openrdf.query.algebra.TupleExpr;
33  import org.openrdf.query.impl.EmptyBindingSet;
34  import org.openrdf.sail.NotifyingSailConnection;
35  import org.openrdf.sail.SailConnection;
36  import org.openrdf.sail.SailConnectionListener;
37  import org.openrdf.sail.SailException;
38  
39  import info.aduna.iteration.CloseableIteration;
40  
41  final class QuadModelSailAdapter extends QuadModel implements AutoCloseable {
42  
43      private static final long serialVersionUID = 1L;
44  
45      private final SailConnection connection;
46  
47      private final boolean trackChanges;
48  
49      private long addCounter;
50  
51      private long removeCounter;
52  
53      QuadModelSailAdapter(final SailConnection connection, final boolean trackChanges) {
54          this.connection = Objects.requireNonNull(connection);
55          this.trackChanges = trackChanges;
56          if (trackChanges && connection instanceof NotifyingSailConnection) {
57              this.addCounter = 0;
58              this.removeCounter = 0;
59              ((NotifyingSailConnection) connection)
60                      .addConnectionListener(new SailConnectionListener() {
61  
62                          @Override
63                          public void statementAdded(final Statement stmt) {
64                              ++QuadModelSailAdapter.this.addCounter;
65                          }
66  
67                          @Override
68                          public void statementRemoved(final Statement stmt) {
69                              ++QuadModelSailAdapter.this.removeCounter;
70                          }
71  
72                      });
73          } else {
74              this.addCounter = -1;
75              this.removeCounter = -1;
76          }
77      }
78  
79      @Override
80      public void close() {
81          IO.closeQuietly(this.connection);
82      }
83  
84      @Override
85      protected Set<Namespace> doGetNamespaces() {
86          try {
87              final Set<Namespace> namespaces = new HashSet<>();
88              CloseableIteration<? extends Namespace, SailException> iteration;
89              iteration = this.connection.getNamespaces();
90              try {
91                  while (iteration.hasNext()) {
92                      namespaces.add(iteration.next());
93                  }
94              } finally {
95                  iteration.close();
96              }
97              return namespaces;
98          } catch (final SailException ex) {
99              throw new ModelException(ex);
100         }
101     }
102 
103     @Override
104     @Nullable
105     protected Namespace doGetNamespace(final String prefix) {
106         try {
107             final String name = this.connection.getNamespace(prefix);
108             return name == null ? null : new NamespaceImpl(prefix, name);
109         } catch (final SailException ex) {
110             throw new ModelException(ex);
111         }
112     }
113 
114     @Override
115     protected Namespace doSetNamespace(final String prefix, @Nullable final String name) {
116         try {
117             final String oldName = this.connection.getNamespace(prefix);
118             final Namespace oldNamespace = oldName == null ? null : new NamespaceImpl(prefix,
119                     oldName);
120             if (name == null) {
121                 this.connection.removeNamespace(prefix);
122             } else {
123                 this.connection.setNamespace(prefix, name);
124             }
125             return oldNamespace;
126         } catch (final SailException ex) {
127             throw new ModelException(ex);
128         }
129     }
130 
131     @Override
132     protected int doSize(@Nullable final Resource subj, @Nullable final URI pred,
133             @Nullable final Value obj, final Resource[] ctxs) {
134         try {
135             if (subj == null && pred == null && obj == null) {
136                 return (int) this.connection.size(ctxs);
137             } else {
138                 int size = 0;
139                 CloseableIteration<? extends Statement, SailException> iteration;
140                 iteration = this.connection.getStatements(subj, pred, obj, false, ctxs);
141                 try {
142                     while (iteration.hasNext()) {
143                         iteration.next();
144                         ++size;
145                     }
146                 } finally {
147                     iteration.close();
148                 }
149                 return size;
150             }
151         } catch (final SailException ex) {
152             throw new ModelException(ex);
153         }
154     }
155 
156     @Override
157     protected int doSizeEstimate(@Nullable final Resource subj, @Nullable final URI pred,
158             @Nullable final Value obj, @Nullable final Resource ctx) {
159         return Integer.MAX_VALUE; // no way to efficiently estimate cardinality
160     }
161 
162     @Override
163     protected Iterator<Statement> doIterator(@Nullable final Resource subj,
164             @Nullable final URI pred, @Nullable final Value obj, final Resource[] ctxs) {
165         try {
166             return Iterators.forIteration(this.connection.getStatements(subj, pred, obj, false,
167                     ctxs));
168         } catch (final SailException ex) {
169             throw new ModelException(ex);
170         }
171     }
172 
173     @Override
174     protected boolean doAdd(final Resource subj, final URI pred, final Value obj,
175             final Resource[] ctxs) {
176         try {
177             if (!this.trackChanges) {
178                 this.connection.addStatement(subj, pred, obj, ctxs);
179                 return true;
180             } else if (this.addCounter >= 0) {
181                 final long addCounterBefore = this.addCounter;
182                 this.connection.addStatement(subj, pred, obj, ctxs);
183                 return this.addCounter > addCounterBefore;
184             } else {
185                 final Resource[] queryCtxs = ctxs.length > 0 ? ctxs : new Resource[] { null };
186                 int count = 0;
187                 CloseableIteration<? extends Statement, SailException> iteration;
188                 iteration = this.connection.getStatements(subj, pred, obj, false, queryCtxs);
189                 try {
190                     while (iteration.hasNext()) {
191                         iteration.next();
192                         ++count;
193                     }
194                 } finally {
195                     iteration.close();
196                 }
197                 if (count >= ctxs.length) {
198                     return false;
199                 }
200                 this.connection.addStatement(subj, pred, obj, ctxs);
201                 return true;
202             }
203         } catch (final SailException ex) {
204             throw new ModelException(ex);
205         }
206     }
207 
208     @Override
209     protected boolean doRemove(@Nullable final Resource subj, @Nullable final URI pred,
210             @Nullable final Value obj, final Resource[] ctxs) {
211         try {
212             if (!this.trackChanges) {
213                 this.connection.removeStatements(subj, pred, obj, ctxs);
214                 return true;
215             } else if (this.removeCounter >= 0) {
216                 final long removeCounterBefore = this.removeCounter;
217                 this.connection.removeStatements(subj, pred, obj, ctxs);
218                 return this.removeCounter != removeCounterBefore;
219             } else {
220                 CloseableIteration<? extends Statement, SailException> iteration;
221                 iteration = this.connection.getStatements(subj, pred, obj, false, ctxs);
222                 try {
223                     if (!iteration.hasNext()) {
224                         return false;
225                     }
226                 } finally {
227                     iteration.close();
228                 }
229                 this.connection.removeStatements(subj, pred, obj, ctxs);
230                 return true;
231             }
232         } catch (final SailException ex) {
233             throw new ModelException(ex);
234         }
235     }
236 
237     @Override
238     protected Iterator<BindingSet> doEvaluate(final TupleExpr expr,
239             @Nullable final Dataset dataset, @Nullable BindingSet bindings) {
240         try {
241             bindings = bindings != null ? bindings : EmptyBindingSet.getInstance();
242             return Iterators
243                     .forIteration(this.connection.evaluate(expr, dataset, bindings, false));
244         } catch (final SailException ex) {
245             throw new ModelException(ex);
246         }
247     }
248 
249 }