1.1. 编程语言中的线程对象

1.1.1. 定义和启动一个线程

Java中两种创建Thread实例的方式

1.1.2. 方式1

第一种,提供Runnable对象

package thread;

public class HelloRunnable implements Runnable {
    @Override
    public void run() {
        System.out.println("HeLLO FROM RUNNABLE`~~!!");
    }

    public static void main(String[] args) {
        (new Thread(new HelloRunnable())).start();
    }
}

//    Hello from a runnable......

1.1.3. 方式2

第二种,继承Thread

package thread;

public class HelloRunnable2 extends Thread {
    @Override
    public void run() {
        System.out.println("Hello from a thread!");
    }

    public static void main(String[] args) {
        (new HelloRunnable2()).start();
    }
}

1.1.4. 使用sleep来暂停执行

package thread;

public class SleepMessages {
    /**
     * @param args
     * @throws InterruptedException
     */
    public static void main(String[] args) throws InterruptedException {
        String importantInfo[] = {
                "Mares eat oats",
                "Does eat oats",
                "Little lambs eat ivy",
                "A kid will eat ivy too"};
        for (int i = 0; i < importantInfo.length; i++) {
            // Pause for 4 seconds
            Thread.sleep(4000);

            System.out.println(importantInfo[i]);
        }
    }

}

在main中声明抛出InterruptedException,若有另外一个线程中断当前线程时,则sleep抛出异常。

1.1.5. 中断

支持中断

当线程被另一个线程中断的时候,捕捉异常,结束后续操作。

public class SleepMessage2 {
    /**
     * @param args
     */

    public static void main(String[] args) throws InterruptedException{
        String importinfo[] = {
                "hello hu1",
                "hello hu2",
                "hello hu3",
                "hello hu4  ha ha ha ha",
        };
        for (int i = 0; i <importinfo.length ; i++) {
            //暂停4s
            try {
                Thread.sleep(4000);
            } catch (InterruptedException e) {
                //已经中断,无需更多信息
                return;
            }
            System.out.println(importinfo[i]);
        }
    }
}

1.1.6. SimpleThreads示例

/**
 * SimpleThreads 示例有两个线程。第一个线程是每个 Java 应用程序都有的主线程。
 * 主线程创建的Runnable 对象的MessageLoop,并等待它完成。如果 MessageLoop
 * 需要很长时间才能完成,主线程就中断它。
 *
 * @author <a href="http://www.waylau.com">waylau.com</a>
 * @date 2016年1月21日
 */
public class SimpleThreads {
    // Display a message, preceded by
    // the name of the current thread
    static void threadMessage(String message) {
        String threadName = Thread.currentThread().getName();
        System.out.format("%s: %s%n", threadName, message);
    }

    /**
     * 遍历打印子进程名称和传入信息
     */
    private static class MessageLoop implements Runnable {
        public void run() {
            String importantInfo[] = {
                    "Mares eat oats",
                    "Does eat oats",
                    "Little lambs eat ivy",
                    "A kid will eat ivy too"
            };
            try {
                for (int i = 0; i < importantInfo.length; i++) {
                    // Pause for 4 seconds
                    Thread.sleep(4000);
                    // Print a message
                    threadMessage(importantInfo[i]);
                }
            } catch (InterruptedException e) {
                threadMessage("I wasn't done!");
            }
        }
    }

    public static void main(String args[])
            throws InterruptedException {

        // Delay, in milliseconds before
        // we interrupt MessageLoop
        // thread (default one hour).
        long patience = 1000 * 60 * 60;

        // If command line argument
        // present, gives patience
        // in seconds.
        if (args.length > 0) {
            try {
                patience = Long.parseLong(args[0]) * 1000;
            } catch (NumberFormatException e) {
                System.err.println("Argument must be an integer.");
                System.exit(1);
            }
        }

        threadMessage("Starting MessageLoop thread");
        long startTime = System.currentTimeMillis();
        Thread t = new Thread(new MessageLoop());
        t.start();

        threadMessage("Waiting for MessageLoop thread to finish");
        // loop until MessageLoop
        // thread exits
        while (t.isAlive()) {
            threadMessage("Still waiting...");
            // Wait maximum of 1 second
            // for MessageLoop thread
            // to finish.
            t.join(1000);
            // 如果时间间隔s数 大于1个小时并且子进程还是活着的状态,就中断掉
            if (((System.currentTimeMillis() - startTime) > patience) && t.isAlive()) {
                threadMessage("Tired of waiting!");
                t.interrupt();
                // Shouldn't be long now
                // -- wait indefinitely
                t.join();   //否则主进程就必须一直等待子进程完成
            }
        }
        threadMessage("Finally!");
    }
}

1.1.7. 通信

网络IO模型的演讲 1.同步和异步

同步是指用户线程发起I/O请求后需要等待或者轮询内核I/O操作完成后才能继续执行。

异步是指用户发起I/O请求后仍继续执行,当内核IO操作完成后会通知用户线程,或者调用用户进程的回调函数。

2.阻塞和非阻塞 阻塞和非阻塞描述的是用户线程调用I/O操作的方式:

· 阻塞是指I/O操作要彻底完成后才返回到用户空间。
· 非阻塞是指I/O操作被调用后,立刻返回给用户一个状态值,无需等到I/O操作彻底完成。

常见的Java I/O模型 单进程阻塞I/O模式,server 监听7端口,只能有一个客户端连接

EchoServer.java

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;

/**
 * “阻塞I/O”模式
 *
 * @author <a href="https://waylau.com">waylau.com</a>
 * @date 2016年7月29日
 */
public class EchoServer {
    public static int DEFAULT_PORT = 7;

    public static void main(String[] args) throws IOException {

        int port;

        try {
            port = Integer.parseInt(args[0]);
        } catch (RuntimeException ex) {
            port = DEFAULT_PORT;
        }

        try (
                ServerSocket serverSocket =
                        new ServerSocket(port);

                Socket clientSocket = serverSocket.accept();
                PrintWriter out =
                        new PrintWriter(clientSocket.getOutputStream(), true);
                BufferedReader in = new BufferedReader(
                        new InputStreamReader(clientSocket.getInputStream()));
        ) {
            String inputLine;
            while ((inputLine = in.readLine()) != null) {
                out.println(inputLine);
//                System.out.println(inputLine);
            }
        } catch (IOException e) {
            System.out.println("Exception caught when trying to listen"
                    + " on port " + port
                    + " or listening for a connection");
            System.out.println(e.getMessage());
        }
    }

}

阻塞I/O + 多线程模式

存在问题,每次接到新的连接要新建一个线程,处理完销毁线程,代价大。性能比较低。

EchoServerHandler.java

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;

/**
 * 处理器类
 *
 * @author <a href="https://waylau.com">waylau.com</a>
 * @date 2016年7月29日
 */
public class EchoServerHandler implements Runnable {
    private Socket clientSocket;

    public EchoServerHandler(Socket clientSocket) {
        this.clientSocket = clientSocket;
    }

    @Override
    public void run() {
        try (
                PrintWriter out = new PrintWriter(clientSocket.getOutputStream()
                        , true);
                BufferedReader in = new BufferedReader(
                        new InputStreamReader(clientSocket.getInputStream()));) {

            String inputLine;
            while ((inputLine = in.readLine()) != null) {
                out.println(inputLine);
                System.out.println(inputLine);
            }
        } catch (IOException e) {
            System.out.println(e.getMessage());
        }
    }
}

MultiThreadEchoServer.java

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;

/**
 * “阻塞I/O+多线程”模式。使用多线程来支持多个客户端访问服务器
 *
 * @author <a href="https://waylau.com">waylau.com</a>
 * @date 2016年7月29日
 */
public class MultiThreadEchoServer {
    public static int DEFAULT_PORT = 7;

    public static void main(String[] args) throws IOException {

        int port;

        try {
            port = Integer.parseInt(args[0]);
        } catch (RuntimeException ex) {
            port = DEFAULT_PORT;
        }
        Socket clientSocket = null;
        try (ServerSocket serverSocket = new ServerSocket(port);) {
            while (true) {
                clientSocket = serverSocket.accept();

                // 多线程
                new Thread(new EchoServerHandler(clientSocket)).start();
            }
        } catch (IOException e) {
            System.out.println(
                    "Exception caught when trying to listen on port "
                            + port + " or listening for a connection");
            System.out.println(e.getMessage());
        }
    }
}

阻塞I/O+线程池模式

优点:在大量短连接场景中,性能会有所提升,因为复用线程池中的线程,不用频繁的创建和销毁线程,在长连接场景中,因为线程一直长期占有。 并无显著的效果优势。 适用于小或者中的客户端连接数,如果连接数超过100000或者更多,那么性能将很不理想。

非阻塞I/O模式 NonBlokingEchoServer.java

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

/**
 * “非阻塞I/O”模式
 *
 * @author <a href="https://waylau.com">waylau.com</a>
 * @date 2016年7月29日
 */
public class NonBlokingEchoServer {

    public static int DEFAULT_PORT = 7;

    public static void main(String[] args) throws IOException {

        int port;

        try {
            port = Integer.parseInt(args[0]);
        } catch (RuntimeException ex) {
            port = DEFAULT_PORT;
        }
        System.out.println("Listening for connections on port " + port);

        ServerSocketChannel serverChannel;
        Selector selector;
        try {
            serverChannel = ServerSocketChannel.open();
            InetSocketAddress address = new InetSocketAddress(port);
            serverChannel.bind(address);
            serverChannel.configureBlocking(false);
            selector = Selector.open();
            serverChannel.register(selector, SelectionKey.OP_ACCEPT);
        } catch (IOException ex) {
            ex.printStackTrace();
            return;
        }

        while (true) {
            try {
                selector.select();
            } catch (IOException ex) {
                ex.printStackTrace();
                break;
            }
            Set<SelectionKey> readyKeys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = readyKeys.iterator();
            while (iterator.hasNext()) {
                SelectionKey key = iterator.next();
                iterator.remove();
                try {
                    if (key.isAcceptable()) {
                        ServerSocketChannel server = (ServerSocketChannel) key.channel();
                        SocketChannel client = server.accept();
                        System.out.println("Accepted connection from " + client);
                        client.configureBlocking(false);
                        SelectionKey clientKey = client.register(selector,
                                SelectionKey.OP_WRITE | SelectionKey.OP_READ);
                        ByteBuffer buffer = ByteBuffer.allocate(100);
                        clientKey.attach(buffer);
                    }
                    if (key.isReadable()) {
                        SocketChannel client = (SocketChannel) key.channel();
                        ByteBuffer output = (ByteBuffer) key.attachment();
                        client.read(output);
                    }
                    if (key.isWritable()) {
                        SocketChannel client = (SocketChannel) key.channel();
                        ByteBuffer output = (ByteBuffer) key.attachment();
                        output.flip();
                        client.write(output);

                        output.compact();
                    }
                } catch (IOException ex) {
                    key.cancel();
                    try {
                        key.channel().close();
                    } catch (IOException cex) {
                    }
                }
            }
        }

    }
}

异步I/O模式 省略

1.1.8. 远程过程调用RPC

C/S模型和过程调用时,需要考虑

过程调用的类型

· 本地的过程调用

· 同主机的过程调用

· 不同主机的过程调用

市面上的RPC开源软件很多,可以搜索并研究使用方式。