The Problem
I'm running multiple invocations of some external method via an ExecutorService. I would like to be able to interrupt these methods, but unfortunately they do not check the interrupt flag by themselves. Is there any way I can force an exception to be raised from these methods?
I am aware that throwing an exception from an arbitrary location is potentially dangerous, in my specific case I am willing to take this chance and prepared to deal with the consequences.
Details
By "external method" I mean some method(s) that come from an external library, and I cannot modify its code (well I can, but that will make it a maintenance nightmare whenever a new version is released).
The external methods are computationally expensive, not IO-bound, so they don't respond to regular interrupts and I can't forcefully close a channel or a socket or something. As I've mentioned before, they also do not check the interrupt flag.
The code is conceptually something like:
// my code
public void myMethod() {
Object o = externalMethod(x);
}
// External code
public class ExternalLibrary {
public Object externalMethod(Object) {
innerMethod1();
innerMethod1();
innerMethod1();
}
private void innerMethod1() {
innerMethod2();
// computationally intensive operations
}
private void innerMethod2() {
// computationally intensive operations
}
}
What I've Tried
Thread.stop()
will theoretically do what I want, but not only is it deprecated but it is also only available for actual threads, while I'm working with executor tasks (which might also share threads with future tasks, for example when working in a thread pool). Nevertheless, if no better solution is found, I will convert my code to use old-fashioned Threads instead and use this method.
Another option I've tried is to mark myMethod()
and similar methods with a special "Interruptable" annotation and then use AspectJ (which I am admittedly a newbie at) for catching all method invocat开发者_StackOverflow中文版ions there - something like:
@Before("call(* *.*(..)) && withincode(@Interruptable * *.*(..))")
public void checkInterrupt(JoinPoint thisJoinPoint) {
if (Thread.interrupted()) throw new ForcefulInterruption();
}
But withincode
isn't recursive to methods called by the matching methods, so I would have to edit this annotation into the external code.
Finally, this is similar to a previous question of mine - though a notable difference is that now I'm dealing with an external library.
The following weird ideas come to my mind:
- Using a byte code modification library, such as Javassist, to introduce the interrupt-checks at various points within the bytecode. Just at the beginning of methods may not be enough, since you mention that these external methods are not recursive, so you may want to forcefully stop them at any point. Doing this at the byte code level would also make it very responsive, e.g. even if the external code is running within a loop or something, it would be possible to introduce the interrupt checks. However, this will add some overhead, so overall performance will be slower.
- Launching separate processes (e.g. separate VMs) for the external code. Aborting processes may be much easier to code than the other solutions. The disadvantage is that you would need some sort of communication channel between the external code and your code, e.g. IPC, sockets etc. The second disadvantage is that you need much more resources (CPU, memory) to start up new VMs and it may be environment specific. This would work if you start a couple of tasks using the external code, but not hundreds of tasks. Also, the performance would suffer, but the computation itself would be as fast as the original. Processes can be forcefully stopped by using java.lang.Process.destroy()
- Using a custom SecurityManager, which performs the interrupt check on each of the checkXXX methods. If the external code somehow calls privileged methods, it may be sufficient for you to abort at these locations. An example would be java.lang.SecurityManager.checkPropertyAccess(String) if the external code periodically reads a system property.
This solution isn't easy either, but it could work: Using Javassist or CGLIB, you can insert code at the beginning of each internal method (the ones presumably being called by the main run() method) to check if the thread is alive, or some other flag (if it's some other flag, you'll have to add it as well, along with a method to set it).
I'm proposing Javassist/CGLIB instead of extending the class through code because you mention it's external and you don't want to change the source code, and it may change in the future. So adding the interrupt checks at runtime will work for the current version and also in future versions even if the internal method names change (or their parameters, return values, etc). You just have to take the class and add interrupt checks at the beginning of each method that is not the run() method.
An option is to:
- Make the VM connect to itself using JDI.
- Look up the thread that's running your task. This isn't trivial, but since you have access to all the stack frames, it is certainly doable. (If you put a unique
id
field in your task objects, you will be able to identify the thread that is executing it.) - Stop the thread asynchronously.
Although I don't think a stopped thread would seriously interfere with executors (they should be fail-safe after all), there is an alternative solution that doesn't involve stopping the thread.
Provided that your tasks don't modify anything in other parts of the system (which is a fair assumption, otherwise you wouldn't be trying to shoot them down), what you can do is to use JDI to pop unwanted stack frames off and exit the task normally.
public class StoppableTask implements Runnable {
private boolean stopped;
private Runnable targetTask;
private volatile Thread runner;
private String id;
public StoppableTask(TestTask targetTask) {
this.targetTask = targetTask;
this.id = UUID.randomUUID().toString();
}
@Override
public void run() {
if( !stopped ) {
runner = Thread.currentThread();
targetTask.run();
} else {
System.out.println( "Task "+id+" stopped.");
}
}
public Thread getRunner() {
return runner;
}
public String getId() {
return id;
}
}
This is the runnable that wraps all your other runnables. It stores a reference to the executing thread (will be important later) and an id so we can find it with a JDI call.
public class Main {
public static void main(String[] args) throws IOException, IllegalConnectorArgumentsException, InterruptedException, IncompatibleThreadStateException, InvalidTypeException, ClassNotLoadedException {
//connect to the virtual machine
VirtualMachineManager manager = Bootstrap.virtualMachineManager();
VirtualMachine vm = null;
for( AttachingConnector con : manager.attachingConnectors() ) {
if( con instanceof SocketAttachingConnector ) {
SocketAttachingConnector smac = (SocketAttachingConnector)con;
Map<String,? extends Connector.Argument> arg = smac.defaultArguments();
arg.get( "port" ).setValue( "8000");
arg.get( "hostname" ).setValue( "localhost" );
vm = smac.attach( arg );
}
}
//start the test task
ExecutorService service = Executors.newCachedThreadPool();
StoppableTask task = new StoppableTask( new TestTask() );
service.execute( task );
Thread.sleep( 1000 );
// iterate over all the threads
for( ThreadReference thread : vm.allThreads() ) {
//iterate over all the objects referencing the thread
//could take a long time, limiting the number of referring
//objects scanned is possible though, as not many objects will
//reference our runner thread
for( ObjectReference ob : thread.referringObjects( 0 ) ) {
//this cast is safe, as no primitive values can reference a thread
ReferenceType obType = (ReferenceType)ob.type();
//if thread is referenced by a stoppable task
if( obType.name().equals( StoppableTask.class.getName() ) ) {
StringReference taskId = (StringReference)ob.getValue( obType.fieldByName( "id" ));
if( task.getId().equals( taskId.value() ) ) {
//task with matching id found
System.out.println( "Task "+task.getId()+" found.");
//suspend thread
thread.suspend();
Iterator<StackFrame> it = thread.frames().iterator();
while( it.hasNext() ) {
StackFrame frame = it.next();
//find stack frame containing StoppableTask.run()
if( ob.equals( frame.thisObject() ) ) {
//pop all frames up to the frame below run()
thread.popFrames( it.next() );
//set stopped to true
ob.setValue( obType.fieldByName( "stopped") , vm.mirrorOf( true ) );
break;
}
}
//resume thread
thread.resume();
}
}
}
}
}
}
And for reference, the "library" call I tested it with:
public class TestTask implements Runnable {
@Override
public void run() {
long l = 0;
while( true ) {
l++;
if( l % 1000000L == 0 )
System.out.print( ".");
}
}
}
You can try it out by launching the Main
class with the command line option -agentlib:jdwp=transport=dt_socket,server=y,address=localhost:8000,timeout=5000,suspend=n
. It works with two caveats. Firstly, if there is native code being executed (thisObject
of a frame is null), you'll have to wait until it's finished. Secondly, finally
blocks are not invoked, so various resources may potentially be leaking.
You wrote:
Another option I've tried is to mark
myMethod()
and similar methods with a special "Interruptable" annotation and then use AspectJ (which I am admittedly a newbie at) for catching all method invocations there - something like:@Before("call(* *.*(..)) && withincode(@Interruptable * *.*(..))") public void checkInterrupt(JoinPoint thisJoinPoint) { if (Thread.interrupted()) throw new ForcefulInterruption(); }
But
withincode
isn't recursive to methods called by the matching methods, so I would have to edit this annotation into the external code.
The AspectJ idea is good, but you need to
- use
cflow()
orcflowbelow()
in order to recursively match a certain control flow (e.g. something like@Before("cflow(execution(@Interruptable * *(..)))")
). - make sure to also weave your external library, not just you own code. This can be done by either using binary weaving, instrumenting the JAR file's classes and re-packaging them into a new JAR file, or by applying LTW (load-time weaving) during application start-up (i.e. during class loading).
You might not even need your marker annotation if your external library has a package name you can pinpoint with within()
. AspectJ is really powerful, and often there is more than one way to solve a problem. I would recommend using it because it was made for such endeavours as yours.
I have hacked an ugly solution to my problem. It's not pretty, but it works in my case, so I'm posting it here in case it will help anyone else.
What I did was profile the library parts of my application, hoping that I could isolate a small group of methods which are called repeatedly - for example some get
methods or equals()
or something along these lines; and then I could insert the following code segment there:
if (Thread.interrupted()) {
// Not really necessary, but could help if the library does check it itself in some other place:
Thread.currentThread().interrupt();
// Wrapping the checked InterruptedException because the signature doesn't declare it:
throw new RuntimeException(new InterruptedException());
}
Either inserting it manually by editing the library's code, or automatically by writing an appropriate aspect. Notice that if the library attempts to catch and swallow a RuntimeException
, the thrown exception could be replaced with something else the library doesn't try to catch.
Luckily for me, using VisualVM, I was able to find a single method called a very high number of times during the specific usage I was making of the library. After adding the above code segment, it now properly responds to interrupts.
This is of course not maintainable, plus nothing really guarantees the library will call this method repeatedly in other scenarios; but it worked for me, and since it's relatively easy to profile other applications and insert the checks there, I consider this a generic, if ugly, solution.
If internal methods have similar names, then you could use pointcut definition in xml (spring/AspectJ) instead of annotations so no code modification of the external library should be needed.
Like mhaller said, the best option is to launch a new Process. Since your jobs are not that cooperative, you will never have guarantees on the Thread termination.
A nice solution to your problem would be using a library that supports arbitrary pause/stop of ' lightweight threads' such as Akka instead of the executor service, although this may be a bit of an overkill.
Although I've never used Akka and cannot confirm it works as you expect, the docs state there's a stop() method for stopping actors.
To my knowledge there are two ways of using aspect
- AspecJ
- Spring AOP
AspectJ is a customized compiler which intercepts methods that is compiles (meaning it can't get "external methods". Spring AOP (by default) uses class proxying to intercept methods at runtime (so it can intercept "external methods". but the problem with Spring AOP is that it can't proxy already-proxied classes (which AspectJ can do because it does not proxy classes). I think AspectJ could help you in this case.
精彩评论