聊聊Thread的状态与ThreadLocal

  1. 云栖社区>
  2. 博客>
  3. 正文

聊聊Thread的状态与ThreadLocal

小飞哥1112 2018-12-22 12:49:20 浏览1665
展开阅读全文

 线程状态

在之前讨论线程-基础知识的时候,详细讲述过java的线程模型、线程状态流转、线程安全出现的原因以及解决办法,所以这里不多过的讨论这一块。此小节的重点从源码的角度上看看那些地方会引起线程状态变化。

1 线程状态流转

8a68a5501936ea5d21bbd2b5b92fa3d2687202f1

图中Waiting、Blocking的区别是:

 - 当线程处于Blocking状态时,代表着线程可以尝试获取CPU资源。

 - 当线程处于Waiting状态时,线程不会获取CPU资源。

 

2  Thread源码

1)   join

a)    源码分析

让当前线程等待,直到调用join方法的线程执行完毕后再执行,例如:在threadA中调用threadB.join()方法,就表示让threadA等待threadB执行完成,然后threadB再继续执行。从下面的代码中我们可以看到,join其实调用的时wait方法,代码如下:

/**
 * Waits at most {@code millis} milliseconds for this thread to die. A timeout of {@code 0} means to wait forever.
 */
public final synchronized void join(long millis)
throws InterruptedException {
    long base = System.currentTimeMillis();
    long now = 0;

    if (millis < 0) {
        throw new IllegalArgumentException("timeout value is negative");
    }

    if (millis == 0) {
        while (isAlive()) {
            wait(0);
        }
    } else {
        while (isAlive()) {
            long delay = millis - now;
            if (delay <= 0) {
                break;
            }
            wait(delay);
            now = System.currentTimeMillis() - base;
        }
    }
}

b)     示例

以下代码中,主线程将等待thread执行结束,然后再输出最后的结束信息。

public static void main(String[] args) {
    System.out.println("Main Thread start....");
    Thread thread = new Thread(() -> {
        System.out.println("Child Thread start...");
        try {
            Thread.sleep(1000);
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("Child Thread finished...");

    });
    thread.start();
    System.out.println("Main Thread begin join...");
    try {
        thread.join(); 
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println("Main Thread finished.,,");
}

 

2)   yield

    public static native void yield();

出让cpu的执行时间,出让后和其他线程一起争抢CPU调度。

 

3)   sleep

public static native void sleep(long millis) throws InterruptedException;

public static void sleep(long millis, int nanos)  throws InterruptedException{…}

让线程休眠指定时间,休眠时不放弃线程持有的资源。

 

4)   interrupt

参考Interrupt

 

3  Object源码

Object中的wait、notify也会影响线程状态,所以这里也就一起介绍了。值得一提的是JDK还提供了LockSupport类,此类中的park和unpark方法也能实现和wait、notify类似的功能。具体请参考:LockSupport

1)   wait

public final void wait() throws InterruptedException{…}

public final void wait(long timeout, int nanos) throws InterruptedException { … }

public final native void wait(long timeout) throws InterruptedException;

让当前线程(假设是T)等待直到其他线程调用notify()、notifyAll()或者等待超过指定的时间。

当执行object.wait()方法时,将当前线程T放到此object对象的wait set中,然后放弃持有的资源,这个线程T将不再被cup调度,直到以下任意一个事情发生:

 - 其他线程调用notify()方法,并且线程T正好的被唤醒。

 - 其他线程调用notifyAll()方法。

 - 其他线程interrupt线程T。

 -  到了指定时间。如果参数timeout=0,那么时间参数将会被忽略,即此时只能通过上面三种方式唤醒。

线程T接着会从wait set中移除,接着将和其他线程一样争取object的锁,一旦线程T获得了object的锁,将会获得同步声明并且立即从之前的等待状态返回,继续执行我们的代码。

注意:wait()一定要放在sycronized内部,否则会报“java.lang.IllegalMonitorStateException”异常。这是因为wait方法会释放对象锁,所以如果没有使用sycronized同步时,因为没有锁就会报异常。示例代码如下:

     synchronized (obj) {

         while (<condition does not hold>)

            obj.wait(timeout);

         ... // Perform action appropriate to condition

     }

 

2)   notify、notifyAll

public final native void notify();

public final native void notifyAll();

当调用object.notify()方法时:如果多个线程都在等待object对象的锁,那么唤醒其中的一个(一般而言是随机的唤醒,具体还要看底层的实现)。此方法仅能够被拥有此object对象锁的线程调用,只有以下三种情况,线程才能拥有对象的锁:

 - 通过执行一个object对象synchronized的实例方法。

 - 通过执行object的synchronized代码块。

 - 如果是Class类型的对象,那么执行此class的static方法。

notifyAll和notify类似,主要的区别是notifyAll会唤醒等待此对象锁的所有线程,然后大家一起争抢cpu资源。

和wait一样,notify、notifyAll也必须放在sycronized内部,否则会报“java.lang.IllegalMonitorStateException”异常。示例代码如下:

     synchronized (obj) {

         ... // do the work

             obj.notify(timeout);

     }

 

3)   示例代码

以下示例代码中,thread1先获得obj的锁,执行自己的逻辑,thread2因为获取不到锁一直阻塞;当thread1因为wait()而等待时,释放了obj的所资源,此时thread2将开始执行,thread2执行完后通过notifyAll()唤醒在obj上等待的线程,此后thread1将继续执行。

public static void main(String[] args) {
    Object obj = new Object();
    Thread thread1 = new Thread(() -> {
        synchronized (obj) {
           System.out.println(DateUtil.getCurrentTime() + " Thread1 start...");
            try {
                Thread.sleep(1000);
               System.out.println(DateUtil.getCurrentTime() + " Thread1 begin wait...");
                obj.wait();

               System.out.println(DateUtil.getCurrentTime() + " Thread1 finished...");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    });
    thread1.start();
    Thread thread2 = new Thread(() -> {
        synchronized (obj) {
            System.out.println(DateUtil.getCurrentTime() + " Thread2 executing...");
            obj.notifyAll();
        }
    });
    thread2.start();
}

执行结果如下:

11:05:25:836 Thread1 start...

11:05:26:874 Thread1 begin wait...

11:05:26:874 Thread2 executing...

11:05:26:875 Thread1 finished...

 

 ThreadLocal

ThreadLocal经常用来存储线程私有变量。

1  关系模型

ThreadLocal经常用来存储线程私有变量,在一个线程内可以多次创建ThreadLocal对象,但是这些ThreadLocal对象共用同一个ThreadLocalMap的实例。以下是Thread、ThreadLocal、ThreadLocalMap之间的关系。

74a0bc0746699824387210d29d1cd999764e0d8f

从图中可以看到ThreadLocal通过Thread.currentThread()获取当前线程,并操作线程持有的ThreadLocalMap对象。总结一下:

 - 每个Thread线程内部都有一个Map。

 - Map里面存储线程本地对象(key)和线程的变量副本(value)

 - Thread内部的Map是由ThreadLocal维护的,由ThreadLocal负责向map获取和设置线程的变量值。

 

2  set

以下是set方法的源码,代码逻辑比较简单,其逻辑是:如果thread中threadLocals(ThreadLocal.ThreadLocalMap类型)属性为空,那么创建一个ThreadLocalMap对象,赋给thread.threadLocals;否则直接使用此value值替换thread.threadLocals中的值。

需要注意的是:ThreadLocalMap是线程私有的,见ThreadLocal#createMap()方法。

ThreadLocal#set方法

public void set(T value) {
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t);
    if (map != null)
        map.set(this, value);
    else
        createMap(t, value);
}

 

ThreadLocal#createMap方法

void createMap(Thread t, T firstValue) {
    t.threadLocals = new ThreadLocalMap(this, firstValue);
}

 

ThreadLocalMap#set方法

private void set(ThreadLocal<?> key, Object value) {
    Entry[] tab = table;
    int len = tab.length;
    int i = key.threadLocalHashCode & (len-1);
    for (Entry e = tab[i];
         e != null;
         e = tab[i = nextIndex(i, len)]) {
        ThreadLocal<?> k = e.get();
        if (k == key) {
            e.value = value;
            return;
        }
        if (k == null) {
       //清理掉陈旧Entry,并设置为当前的新值,这种Entry是由于ThreadLocal变量对象被回收后k==null造成的

           replaceStaleEntry(key, value, i);
            return;
        }
    }

    tab[i] = new Entry(key, value);
    int sz = ++size;
    if (!cleanSomeSlots(i, sz) && sz >= threshold)
        rehash();
}

ThreadLocalMap#replaceStaleEntry方法是用来复用ThreadLocalMap中的数据,当某一个ThreadLocal已经被清理了,那么就可以用这个空间来存储新的数据了。

private void replaceStaleEntry(ThreadLocal<?> key, Object value,
                               int staleSlot) {
    Entry[] tab = table;
    int len = tab.length;
    Entry e;

    // Back up to check for prior stale entry in current run.
    // We clean out whole runs at a time to avoid continual
    // incremental rehashing due to garbage collector freeing
    // up refs in bunches (i.e., whenever the collector runs).
    int slotToExpunge = staleSlot;
    for (int i = prevIndex(staleSlot, len);
         (e = tab[i]) != null;
         i = prevIndex(i, len))
        if (e.get() == null)
            slotToExpunge = i;

    // Find either the key or trailing null slot of run, whichever
    // occurs first
    for (int i = nextIndex(staleSlot, len);
         (e = tab[i]) != null;
         i = nextIndex(i, len)) {
        ThreadLocal<?> k = e.get();

        // If we find key, then we need to swap it
        // with the stale entry to maintain hash table order.
        // The newly stale slot, or any other stale slot
        // encountered above it, can then be sent to expungeStaleEntry
        // to remove or rehash all of the other entries in run.
        if (k == key) {
            e.value = value;

            tab[i] = tab[staleSlot];
            tab[staleSlot] = e;

            // Start expunge at preceding stale entry if it exists
            if (slotToExpunge == staleSlot)
                slotToExpunge = i;
            cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
            return;
        }

        // If we didn't find stale entry on backward scan, the
        // first stale entry seen while scanning for key is the
        // first still present in the run.
        if (k == null && slotToExpunge == staleSlot)
            slotToExpunge = i;
    }

    // If key not found, put new entry in stale slot
    tab[staleSlot].value = null;
    tab[staleSlot] = new Entry(key, value);

    // If there are any other stale entries in run, expunge them
    if (slotToExpunge != staleSlot)
       cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
}

 


3  Get

如下代码,从线程的threadLocals属性中查找是否存在需要的值,如果存在,那么返回,如果不存在初始化线程的threadLocals。

ThreadLocal#get方法

public T get() {
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t);
    if (map != null) {
        ThreadLocalMap.Entry e = map.getEntry(this);
        if (e != null) {
           @SuppressWarnings("unchecked")
            T result = (T)e.value;
            return result;
        }
    }
    return setInitialValue();
}

 

ThreadLocal#setInitialValue方法

private T setInitialValue() {
    T value = initialValue();
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t);
    if (map != null)
        map.set(this, value);
    else
        createMap(t, value);
    return value;
}

 

ThreadLocalMap#getEntry方法

private Entry getEntry(ThreadLocal<?> key) {
    int i = key.threadLocalHashCode & (table.length - 1);
    Entry e = table[i];
    if (e != null && e.get() == key)
        return e;
    else
        return getEntryAfterMiss(key, i, e);
}

 

ThreadLocalMap#getEntryAfterMiss方法

private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) {
    Entry[] tab = table;
    int len = tab.length;

    while (e != null) {
        ThreadLocal<?> k = e.get();
        if (k == key)
            return e;
        if (k == null)
            expungeStaleEntry(i);
        else
            i = nextIndex(i, len);
        e = tab[i];
    }
    return null;
}

 

注意ThreadLocalMap中的getEntry、getEntryAfterMiss方法,为什么我们set进去的value,后来取的时候会出现key为空的情况?这是因为ThreadLocalMap#Entry对象的key是WeakReference<ThreadLocal<?>>类型的,此种类型的引用,如果没有其他引用了,那么在GC的时候会被回收掉(当然如果此对象「key」还有其他有效引用,对象将不会被回收);因为value是普通类型的引用,当GC时如果key被回收了,那么将会清理掉此Entry的值。

 

4  remove

线程的threadLocals属性如果非空,那么从threadLocals中删除之前存放的值。

ThreadLocal#remove

public void remove() {
    ThreadLocalMap m = getMap(Thread.currentThread());
    if (m != null)
        m.remove(this);
}

ThreadLocalMap#remove

private void remove(ThreadLocal<?> key) {
    Entry[] tab = table;
    int len = tab.length;
    int i = key.threadLocalHashCode & (len-1);
    for (Entry e = tab[i]; e != null; e = tab[i = nextIndex(i, len)]) {
        if (e.get() == key) {
            e.clear();
            expungeStaleEntry(i);
            return;
        }
    }
}

 

5  ThreadLocalMap与WeakReference

我们假设这样一种情况:假设ThreadLocal对象不再被引用了,那么JVM如何回收ThreadLocalMap中存储的数据呢?

当ThreadLocal不在被需要时,其在线程中存储的数据应该被GC回收掉,即线程中ThreadLocalMap里的key-value对应该失效。ThreadLocalMap#Entry中key是WeakReference<ThreadLocal<?>>类型,其实就是为了实现这一诉求的。因为key是WeakReference类型的应用,所以在GC的时,ThreadLocalMap中key会被回收,即被设置为null(注意此时因为ThreadLocalMap引用value的关系,所以value不会被回收)。在上面set、get等方法中,我们看到的expungeStaleEntry、replaceStaleEntry、cleanSomeSlots方法其实就是在整理、回收ThreadLocalMap中已经失效的key-value。如果触发了expungeStaleEntry方法,ThreadLocalMap中的数据就会被回收干净。

最好的处理方式是,如果某个value不需要了,手动调用ThreadLocal#remove方法删除,以免因为没有触发replaceStaleEntry()方法,而不能回收value。

Entry的部分代码如下所示

static class Entry extends WeakReference<ThreadLocal<?>> {
    Object value;
    Entry(ThreadLocal<?> k, Object v) {
        super(k);
        value = v;
    }
}

 

网友评论

登录后评论
0/500
评论
小飞哥1112
+ 关注