Skip to content

Commit 0f8a131

Browse files
committed
GH2701: Fuseki Mod to list and abort running executions.
1 parent 5028583 commit 0f8a131

File tree

20 files changed

+1802
-1
lines changed

20 files changed

+1802
-1
lines changed
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package org.apache.jena.sparql.exec.tracker;
2+
3+
import org.apache.jena.sparql.core.DatasetGraph;
4+
import org.apache.jena.sparql.core.DatasetGraphWrapper;
5+
import org.apache.jena.sparql.core.DatasetGraphWrapperView;
6+
import org.apache.jena.sparql.util.Context;
7+
8+
public class DatasetGraphWithExecTracker
9+
extends DatasetGraphWrapper
10+
implements DatasetGraphWrapperView
11+
{
12+
public static DatasetGraph wrap(DatasetGraph dsg) {
13+
DatasetGraph result;
14+
if (dsg instanceof DatasetGraphWithExecTracker) {
15+
result = dsg;
16+
} else {
17+
// Put an exec tracker into the dataset's context.
18+
Context context = dsg.getContext();
19+
ExecTracker.ensureTracker(context);
20+
result = new DatasetGraphWithExecTracker(dsg);
21+
}
22+
return result;
23+
}
24+
25+
protected DatasetGraphWithExecTracker(DatasetGraph dsg) {
26+
super(dsg);
27+
}
28+
}
Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
package org.apache.jena.sparql.exec.tracker;
2+
3+
import java.time.Duration;
4+
import java.time.Instant;
5+
import java.util.Collections;
6+
import java.util.IdentityHashMap;
7+
import java.util.Iterator;
8+
import java.util.Map.Entry;
9+
import java.util.Objects;
10+
import java.util.Set;
11+
import java.util.concurrent.ConcurrentNavigableMap;
12+
import java.util.concurrent.ConcurrentSkipListMap;
13+
import java.util.concurrent.atomic.AtomicLong;
14+
15+
import org.apache.jena.sparql.SystemARQ;
16+
import org.apache.jena.sparql.util.Context;
17+
import org.apache.jena.sparql.util.Symbol;
18+
import org.slf4j.Logger;
19+
import org.slf4j.LoggerFactory;
20+
21+
public class ExecTracker {
22+
private static final Logger logger = LoggerFactory.getLogger(ExecTracker.class);
23+
24+
public record StartRecord(long requestId, Instant timestamp, Object requestObject, Runnable abortAction) {}
25+
26+
public record CompletionRecord(StartRecord start, Instant timestamp, Throwable throwable) {
27+
public Duration duration() {
28+
return Duration.between(start.timestamp, timestamp);
29+
}
30+
31+
public boolean isSuccess() {
32+
return throwable == null;
33+
}
34+
}
35+
36+
private Set<ExecTrackerListener> eventListeners = Collections.synchronizedSet(Collections.newSetFromMap(new IdentityHashMap<>()));
37+
38+
protected AtomicLong nextId = new AtomicLong();
39+
protected ConcurrentNavigableMap<Long, StartRecord> idToStartRecord = new ConcurrentSkipListMap<>();
40+
protected int maxHistorySize = 1000;
41+
protected ConcurrentNavigableMap<Long, CompletionRecord> history = new ConcurrentSkipListMap<>();
42+
43+
public ConcurrentNavigableMap<Long, StartRecord> getActiveTasks() {
44+
return idToStartRecord;
45+
}
46+
47+
public ConcurrentNavigableMap<Long, CompletionRecord> getHistory() {
48+
return history;
49+
}
50+
51+
public void setMaxHistorySize(int maxHistorySize) {
52+
this.maxHistorySize = maxHistorySize;
53+
}
54+
55+
public long put(Object requestObject, Runnable abortAction) {
56+
long result = nextId.getAndIncrement();
57+
StartRecord record = new StartRecord(result, Instant.now(), requestObject, abortAction);
58+
idToStartRecord.put(result, record);
59+
broadcastStartEvent(record);
60+
return result;
61+
}
62+
63+
protected void trimHistory() {
64+
if (history.size() >= maxHistorySize) {
65+
Iterator<Entry<Long, CompletionRecord>> it = history.entrySet().iterator();
66+
while (history.size() >= maxHistorySize && it.hasNext()) {
67+
it.next();
68+
it.remove();
69+
}
70+
}
71+
}
72+
73+
public CompletionRecord remove(long id, Throwable t) {
74+
StartRecord startRecord = idToStartRecord.remove(id);
75+
CompletionRecord result = null;
76+
if (startRecord != null) {
77+
trimHistory();
78+
Instant now = Instant.now();
79+
result = new CompletionRecord(startRecord, now, t);
80+
// long requestId = startRecord.requestId();
81+
// history.put(now, result);
82+
history.put(id, result);
83+
broadcastCompletionEvent(result);
84+
}
85+
return result;
86+
}
87+
88+
protected void broadcastStartEvent(StartRecord startRecord) {
89+
for (ExecTrackerListener listener : eventListeners) {
90+
try {
91+
listener.onStart(startRecord);
92+
} catch (Throwable t) {
93+
if (logger.isWarnEnabled()) {
94+
logger.warn("Failure during event handler.", t);
95+
}
96+
}
97+
}
98+
}
99+
100+
protected void broadcastCompletionEvent(CompletionRecord completionRecord) {
101+
for (ExecTrackerListener listener : eventListeners) {
102+
try {
103+
listener.onComplete(completionRecord);
104+
} catch (Throwable t) {
105+
if (logger.isWarnEnabled()) {
106+
logger.warn("Failure during event handler.", t);
107+
}
108+
}
109+
}
110+
}
111+
112+
public Runnable addListener(ExecTrackerListener listener) {
113+
Objects.requireNonNull(listener);
114+
eventListeners.add(listener);
115+
return () -> eventListeners.remove(listener);
116+
}
117+
118+
public Set<ExecTrackerListener> getEventListeners() {
119+
return eventListeners;
120+
}
121+
122+
@Override
123+
public String toString() {
124+
return "Active: " + idToStartRecord.size() + ", History: " + history.size() + "/" + maxHistorySize;
125+
}
126+
127+
public static final Symbol symTracker = SystemARQ.allocSymbol("execTracker");
128+
129+
public static ExecTracker getTracker(Context context) {
130+
return context.get(symTracker);
131+
}
132+
133+
public static ExecTracker requireTracker(Context context) {
134+
ExecTracker result = getTracker(context);
135+
Objects.requireNonNull("No ExecTracker registered in context");
136+
return result;
137+
}
138+
139+
public static ExecTracker ensureTracker(Context context) {
140+
// FIXME The spatial index PR adds an atomic context.compute method.
141+
ExecTracker result = context.get(symTracker);
142+
if (result == null) {
143+
synchronized (context) {
144+
result = context.get(symTracker);
145+
if (result == null) {
146+
result = new ExecTracker();
147+
context.set(symTracker, result);
148+
}
149+
}
150+
}
151+
return result;
152+
}
153+
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package org.apache.jena.sparql.exec.tracker;
2+
3+
import org.apache.jena.sparql.exec.tracker.ExecTracker.CompletionRecord;
4+
import org.apache.jena.sparql.exec.tracker.ExecTracker.StartRecord;
5+
6+
public interface ExecTrackerListener {
7+
void onStart(StartRecord startRecord);
8+
void onComplete(CompletionRecord endRecord);
9+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package org.apache.jena.sparql.exec.tracker;
2+
3+
import org.apache.jena.sparql.engine.QueryEngineRegistry;
4+
import org.apache.jena.sparql.modify.UpdateEngineRegistry;
5+
import org.apache.jena.sys.JenaSubsystemLifecycle;
6+
7+
public class InitExecTracker
8+
implements JenaSubsystemLifecycle
9+
{
10+
@Override
11+
public void start() {
12+
QueryEngineRegistry queryReg = QueryEngineRegistry.get();
13+
init(queryReg);
14+
15+
UpdateEngineRegistry updateReg = UpdateEngineRegistry.get();
16+
init(updateReg);
17+
}
18+
19+
@Override
20+
public void stop() {
21+
}
22+
23+
public static void init(QueryEngineRegistry reg) {
24+
reg.add(new QueryEngineFactoryExecTracker());
25+
}
26+
27+
public static void init(UpdateEngineRegistry reg) {
28+
reg.add(new UpdateEngineFactoryExecTracker());
29+
}
30+
31+
@Override
32+
public int level() {
33+
// Register the 'wrapper engine factories' late
34+
// such that upon execution they are consulted early.
35+
return 1_000_000;
36+
}
37+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package org.apache.jena.sparql.exec.tracker;
2+
3+
import org.apache.jena.atlas.io.IndentedWriter;
4+
import org.apache.jena.shared.PrefixMapping;
5+
import org.apache.jena.sparql.algebra.Op;
6+
import org.apache.jena.sparql.engine.Plan;
7+
import org.apache.jena.sparql.engine.QueryIterator;
8+
import org.apache.jena.sparql.serializer.SerializationContext;
9+
10+
public interface PlanWrapper
11+
extends Plan
12+
{
13+
Plan getDelegate();
14+
15+
@Override
16+
default void output(IndentedWriter out, SerializationContext sCxt) {
17+
getDelegate().output(out, sCxt);
18+
}
19+
20+
@Override
21+
default String toString(PrefixMapping pmap) {
22+
return getDelegate().toString(pmap);
23+
}
24+
25+
@Override
26+
default void output(IndentedWriter out) {
27+
getDelegate().output(out);
28+
}
29+
30+
@Override
31+
default void close() {
32+
getDelegate().close();
33+
}
34+
35+
@Override
36+
default Op getOp() {
37+
return getDelegate().getOp();
38+
}
39+
40+
@Override
41+
default QueryIterator iterator() {
42+
return getDelegate().iterator();
43+
}
44+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package org.apache.jena.sparql.exec.tracker;
2+
3+
import org.apache.jena.sparql.engine.Plan;
4+
5+
public class PlanWrapperBase
6+
implements PlanWrapper
7+
{
8+
protected Plan delegate;
9+
10+
public PlanWrapperBase(Plan delegate) {
11+
super();
12+
this.delegate = delegate;
13+
}
14+
15+
@Override
16+
public Plan getDelegate() {
17+
return delegate;
18+
}
19+
}

0 commit comments

Comments
 (0)