Propagating ThreadLocal to a new Thread fetched from a ExecutorService

33,770

Solution 1

The set of ThreadLocal instances associated with a thread are held in private members of each Thread. Your only chance to enumerate these is to do some reflection on the Thread; this way, you can override the access restrictions on the thread's fields.

Once you can get the set of ThreadLocal, you could copy in the background threads using the beforeExecute() and afterExecute() hooks of ThreadPoolExecutor, or by creating a Runnable wrapper for your tasks that intercepts the run() call to set an unset the necessary ThreadLocal instances. Actually, the latter technique might work better, since it would give you a convenient place to store the ThreadLocal values at the time the task is queued.


Update: Here's a more concrete illustration of the second approach. Contrary to my original description, all that is stored in the wrapper is the calling thread, which is interrogated when the task is executed.

static Runnable wrap(Runnable task)
{
  Thread caller = Thread.currentThread();
  return () -> {
    Iterable<ThreadLocal<?>> vars = copy(caller);
    try {
      task.run();
    }
    finally {
      for (ThreadLocal<?> var : vars)
        var.remove();
    }
  };
}

/**
 * For each {@code ThreadLocal} in the specified thread, copy the thread's 
 * value to the current thread.  
 * 
 * @param caller the calling thread
 * @return all of the {@code ThreadLocal} instances that are set on current thread
 */
private static Collection<ThreadLocal<?>> copy(Thread caller)
{
  /* Use a nasty bunch of reflection to do this. */
  throw new UnsupportedOperationException();
}

Solution 2

Based on @erickson answer I wrote this code. It is working for inheritableThreadLocals. It builds list of inheritableThreadLocals using same method as is used in Thread contructor. Of course I use reflection to do this. Also I override the executor class.

public class MyThreadPoolExecutor extends ThreadPoolExecutor
{
   @Override
   public void execute(Runnable command)
   {
      super.execute(new Wrapped(command, Thread.currentThread()));
   }
}

Wrapper:

   private class Wrapped implements Runnable
   {
      private final Runnable task;

      private final Thread caller;

      public Wrapped(Runnable task, Thread caller)
      {
         this.task = task;
         this.caller = caller;
      }

      public void run()
      {
         Iterable<ThreadLocal<?>> vars = null;
         try
         {
            vars = copy(caller);
         }
         catch (Exception e)
         {
            throw new RuntimeException("error when coping Threads", e);
         }
         try {
            task.run();
         }
         finally {
            for (ThreadLocal<?> var : vars)
               var.remove();
         }
      }
   }

copy method:

public static Iterable<ThreadLocal<?>> copy(Thread caller) throws Exception
   {
      List<ThreadLocal<?>> threadLocals = new ArrayList<>();
      Field field = Thread.class.getDeclaredField("inheritableThreadLocals");
      field.setAccessible(true);
      Object map = field.get(caller);
      Field table = Class.forName("java.lang.ThreadLocal$ThreadLocalMap").getDeclaredField("table");
      table.setAccessible(true);

      Method method = ThreadLocal.class
              .getDeclaredMethod("createInheritedMap", Class.forName("java.lang.ThreadLocal$ThreadLocalMap"));
      method.setAccessible(true);
      Object o = method.invoke(null, map);

      Field field2 = Thread.class.getDeclaredField("inheritableThreadLocals");
      field2.setAccessible(true);
      field2.set(Thread.currentThread(), o);

      Object tbl = table.get(o);
      int length = Array.getLength(tbl);
      for (int i = 0; i < length; i++)
      {
         Object entry = Array.get(tbl, i);
         Object value = null;
         if (entry != null)
         {
            Method referentField = Class.forName("java.lang.ThreadLocal$ThreadLocalMap$Entry").getMethod(
                    "get");
            referentField.setAccessible(true);
            value = referentField.invoke(entry);
            threadLocals.add((ThreadLocal<?>) value);
         }
      }
      return threadLocals;
   }

Solution 3

As I understand your problem, you can have a look at InheritableThreadLocal which is meant to pass ThreadLocal variables from Parent Thread context to Child Thread Context

Solution 4

I don't like Reflection approach. Alternative solution would be to implement executor wrapper and pass object directly as a ThreadLocal context to all child threads propagating a parent context.

public class PropagatedObject {

    private ThreadLocal<ConcurrentHashMap<AbsorbedObjectType, Object>> data = new ThreadLocal<>();

   //put, set, merge methods, etc

}

==>

public class ObjectAwareExecutor extends AbstractExecutorService {

    private final ExecutorService delegate;
    private final PropagatedObject objectAbsorber;

    public ObjectAwareExecutor(ExecutorService delegate, PropagatedObject objectAbsorber){
        this.delegate = delegate;
        this.objectAbsorber = objectAbsorber;
    }
    @Override
    public void execute(final Runnable command) {

        final ConcurrentHashMap<String, Object> parentContext = objectAbsorber.get();
        delegate.execute(() -> {
            try{
                objectAbsorber.set(parentContext);
                command.run();
            }finally {
                parentContext.putAll(objectAbsorber.get());
                objectAbsorber.clean();
            }
        });
        objectAbsorber.merge(parentContext);
    }
Share:
33,770
Luciano Fiandesio
Author by

Luciano Fiandesio

I am a software architect with 25+ years of professional experience. As a software development generalist with broad industry experience, I am best utilized in a senior engineer or Tech Lead role. I have significant experience building and launching products/applications from the ground up, and I have had primary responsibilities across the software development spectrum, including system architecture, backend, frontend, and Devops. I’m the co-author of the book “Groovy 2 Cookbook”, published by Packt in 2013. Industry conference speaker (Devoxx, Codemotion, DevConFu, various JUG in Europe)

Updated on June 28, 2020

Comments

  • Luciano Fiandesio
    Luciano Fiandesio almost 4 years

    I'm running a process in a separate thread with a timeout, using an ExecutorService and a Future (example code here) (the thread "spawning" takes place in a AOP Aspect).

    Now, the main thread is a Resteasy request. Resteasy uses one ore more ThreadLocal variables to store some context information that I need to retrieve at some point in my Rest method call. Problem is, since the Resteasy thread is running in a new thread, the ThreadLocal variables are lost.

    What would be the best way to "propagate" whatever ThreadLocal variable is used by Resteasy to the new thread? It seems that Resteasy uses more than one ThreadLocal variable to keep track of context information and I would like to "blindly" transfer all the information to the new thread.

    I have looked at subclassing ThreadPoolExecutor and using the beforeExecute method to pass the current thread to the pool, but I couldn't find a way to pass the ThreadLocal variables to the pool.

    Any suggestion?

    Thanks

  • Tomasz Nurkiewicz
    Tomasz Nurkiewicz over 12 years
    Won't work, first of all the OP does not have control over ThreadLocal creation in 3rd party library. Secondly, ExecutorService reuses threads, while InheritableThreadLocal works only when you spawn new thread directly.
  • Viraj
    Viraj about 8 years
    Could you give an example how the latter technique would work ?
  • Marcel Stör
    Marcel Stör about 8 years
    @erickson is your "nasty bunch of reflection to do this" something along the lines of stackoverflow.com/a/32231177/131929?
  • brady
    brady about 8 years
    @MarcelStör I haven't tried it, but it looks the right information. So, in addition to reading the value field, you'll need to call get() on the entry to get the ThreadLocal instance referenced by the entry. Once you have these two objects, you are done with reflection. Call local.set(value) for each entry that you find, and add the thread local to the collection that is returned from copy().
  • brady
    brady about 8 years
    @MarcelStör As I look at that a bit more, there could actually be a visibility problem in the concurrent access of the thread local map, because its designed for single-threaded access. This means that you'd need to copy the values in the current thread, and then set them in the worker threads, conveying them between threads in a safe way. I will have to think about this some more and amend my answer.
  • brady
    brady about 8 years
    @MarcelStör After more thought, I think in this particular case, everything is okay. Submitting the task to an Executor will take care of the visibility issues, because there is a memory barrier there. And because the main thread is waiting while the background thread runs, the ThreadLocals of the main thread will not be altered in the meantime. However, if those conditions changed, there could be some additional work necessary.
  • Dean Hiller
    Dean Hiller over 7 years
    @erickson one issue with copying too late is the ThreadLocal's will change later in typical platforms as that thread processes the next request so I think you have to copy on the thread before that race condition occurs.
  • brady
    brady over 2 years
    @bit_cracker007 InheritableThreadLocal copies values from the current thread at the time the new thread is created. In the use case here, we a submitting tasks to an executor with worker threads already created. So some other means are needed.
  • bit_cracker007
    bit_cracker007 over 2 years
    @erickson Is there no way to ensure Child threads to share the exact same reference of the shared context object as parent thread, so that any set done by child thread is visible to parent thread? Question posted: stackoverflow.com/questions/68549187/…