Contents
1.1. 编程语言中的线程对象¶
1.1.1. 定义和启动一个线程¶
Java中两种创建Thread实例的方式
1.1.2. 方式1¶
第一种,提供Runnable对象
package thread;
public class HelloRunnable implements Runnable {
@Override
public void run() {
System.out.println("HeLLO FROM RUNNABLE`~~!!");
}
public static void main(String[] args) {
(new Thread(new HelloRunnable())).start();
}
}
// Hello from a runnable......
1.1.3. 方式2¶
第二种,继承Thread
package thread;
public class HelloRunnable2 extends Thread {
@Override
public void run() {
System.out.println("Hello from a thread!");
}
public static void main(String[] args) {
(new HelloRunnable2()).start();
}
}
1.1.4. 使用sleep来暂停执行¶
package thread;
public class SleepMessages {
/**
* @param args
* @throws InterruptedException
*/
public static void main(String[] args) throws InterruptedException {
String importantInfo[] = {
"Mares eat oats",
"Does eat oats",
"Little lambs eat ivy",
"A kid will eat ivy too"};
for (int i = 0; i < importantInfo.length; i++) {
// Pause for 4 seconds
Thread.sleep(4000);
System.out.println(importantInfo[i]);
}
}
}
在main中声明抛出InterruptedException,若有另外一个线程中断当前线程时,则sleep抛出异常。
1.1.5. 中断¶
支持中断
当线程被另一个线程中断的时候,捕捉异常,结束后续操作。
public class SleepMessage2 {
/**
* @param args
*/
public static void main(String[] args) throws InterruptedException{
String importinfo[] = {
"hello hu1",
"hello hu2",
"hello hu3",
"hello hu4 ha ha ha ha",
};
for (int i = 0; i <importinfo.length ; i++) {
//暂停4s
try {
Thread.sleep(4000);
} catch (InterruptedException e) {
//已经中断,无需更多信息
return;
}
System.out.println(importinfo[i]);
}
}
}
1.1.6. SimpleThreads示例¶
/**
* SimpleThreads 示例有两个线程。第一个线程是每个 Java 应用程序都有的主线程。
* 主线程创建的Runnable 对象的MessageLoop,并等待它完成。如果 MessageLoop
* 需要很长时间才能完成,主线程就中断它。
*
* @author <a href="http://www.waylau.com">waylau.com</a>
* @date 2016年1月21日
*/
public class SimpleThreads {
// Display a message, preceded by
// the name of the current thread
static void threadMessage(String message) {
String threadName = Thread.currentThread().getName();
System.out.format("%s: %s%n", threadName, message);
}
/**
* 遍历打印子进程名称和传入信息
*/
private static class MessageLoop implements Runnable {
public void run() {
String importantInfo[] = {
"Mares eat oats",
"Does eat oats",
"Little lambs eat ivy",
"A kid will eat ivy too"
};
try {
for (int i = 0; i < importantInfo.length; i++) {
// Pause for 4 seconds
Thread.sleep(4000);
// Print a message
threadMessage(importantInfo[i]);
}
} catch (InterruptedException e) {
threadMessage("I wasn't done!");
}
}
}
public static void main(String args[])
throws InterruptedException {
// Delay, in milliseconds before
// we interrupt MessageLoop
// thread (default one hour).
long patience = 1000 * 60 * 60;
// If command line argument
// present, gives patience
// in seconds.
if (args.length > 0) {
try {
patience = Long.parseLong(args[0]) * 1000;
} catch (NumberFormatException e) {
System.err.println("Argument must be an integer.");
System.exit(1);
}
}
threadMessage("Starting MessageLoop thread");
long startTime = System.currentTimeMillis();
Thread t = new Thread(new MessageLoop());
t.start();
threadMessage("Waiting for MessageLoop thread to finish");
// loop until MessageLoop
// thread exits
while (t.isAlive()) {
threadMessage("Still waiting...");
// Wait maximum of 1 second
// for MessageLoop thread
// to finish.
t.join(1000);
// 如果时间间隔s数 大于1个小时并且子进程还是活着的状态,就中断掉
if (((System.currentTimeMillis() - startTime) > patience) && t.isAlive()) {
threadMessage("Tired of waiting!");
t.interrupt();
// Shouldn't be long now
// -- wait indefinitely
t.join(); //否则主进程就必须一直等待子进程完成
}
}
threadMessage("Finally!");
}
}
1.1.7. 通信¶
网络IO模型的演讲 1.同步和异步
同步是指用户线程发起I/O请求后需要等待或者轮询内核I/O操作完成后才能继续执行。
异步是指用户发起I/O请求后仍继续执行,当内核IO操作完成后会通知用户线程,或者调用用户进程的回调函数。
2.阻塞和非阻塞 阻塞和非阻塞描述的是用户线程调用I/O操作的方式:
· 阻塞是指I/O操作要彻底完成后才返回到用户空间。
· 非阻塞是指I/O操作被调用后,立刻返回给用户一个状态值,无需等到I/O操作彻底完成。
常见的Java I/O模型 单进程阻塞I/O模式,server 监听7端口,只能有一个客户端连接
EchoServer.java
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
/**
* “阻塞I/O”模式
*
* @author <a href="https://waylau.com">waylau.com</a>
* @date 2016年7月29日
*/
public class EchoServer {
public static int DEFAULT_PORT = 7;
public static void main(String[] args) throws IOException {
int port;
try {
port = Integer.parseInt(args[0]);
} catch (RuntimeException ex) {
port = DEFAULT_PORT;
}
try (
ServerSocket serverSocket =
new ServerSocket(port);
Socket clientSocket = serverSocket.accept();
PrintWriter out =
new PrintWriter(clientSocket.getOutputStream(), true);
BufferedReader in = new BufferedReader(
new InputStreamReader(clientSocket.getInputStream()));
) {
String inputLine;
while ((inputLine = in.readLine()) != null) {
out.println(inputLine);
// System.out.println(inputLine);
}
} catch (IOException e) {
System.out.println("Exception caught when trying to listen"
+ " on port " + port
+ " or listening for a connection");
System.out.println(e.getMessage());
}
}
}
阻塞I/O + 多线程模式
存在问题,每次接到新的连接要新建一个线程,处理完销毁线程,代价大。性能比较低。
EchoServerHandler.java
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
/**
* 处理器类
*
* @author <a href="https://waylau.com">waylau.com</a>
* @date 2016年7月29日
*/
public class EchoServerHandler implements Runnable {
private Socket clientSocket;
public EchoServerHandler(Socket clientSocket) {
this.clientSocket = clientSocket;
}
@Override
public void run() {
try (
PrintWriter out = new PrintWriter(clientSocket.getOutputStream()
, true);
BufferedReader in = new BufferedReader(
new InputStreamReader(clientSocket.getInputStream()));) {
String inputLine;
while ((inputLine = in.readLine()) != null) {
out.println(inputLine);
System.out.println(inputLine);
}
} catch (IOException e) {
System.out.println(e.getMessage());
}
}
}
MultiThreadEchoServer.java
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
/**
* “阻塞I/O+多线程”模式。使用多线程来支持多个客户端访问服务器
*
* @author <a href="https://waylau.com">waylau.com</a>
* @date 2016年7月29日
*/
public class MultiThreadEchoServer {
public static int DEFAULT_PORT = 7;
public static void main(String[] args) throws IOException {
int port;
try {
port = Integer.parseInt(args[0]);
} catch (RuntimeException ex) {
port = DEFAULT_PORT;
}
Socket clientSocket = null;
try (ServerSocket serverSocket = new ServerSocket(port);) {
while (true) {
clientSocket = serverSocket.accept();
// 多线程
new Thread(new EchoServerHandler(clientSocket)).start();
}
} catch (IOException e) {
System.out.println(
"Exception caught when trying to listen on port "
+ port + " or listening for a connection");
System.out.println(e.getMessage());
}
}
}
阻塞I/O+线程池模式
优点:在大量短连接场景中,性能会有所提升,因为复用线程池中的线程,不用频繁的创建和销毁线程,在长连接场景中,因为线程一直长期占有。 并无显著的效果优势。 适用于小或者中的客户端连接数,如果连接数超过100000或者更多,那么性能将很不理想。
非阻塞I/O模式 NonBlokingEchoServer.java
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
/**
* “非阻塞I/O”模式
*
* @author <a href="https://waylau.com">waylau.com</a>
* @date 2016年7月29日
*/
public class NonBlokingEchoServer {
public static int DEFAULT_PORT = 7;
public static void main(String[] args) throws IOException {
int port;
try {
port = Integer.parseInt(args[0]);
} catch (RuntimeException ex) {
port = DEFAULT_PORT;
}
System.out.println("Listening for connections on port " + port);
ServerSocketChannel serverChannel;
Selector selector;
try {
serverChannel = ServerSocketChannel.open();
InetSocketAddress address = new InetSocketAddress(port);
serverChannel.bind(address);
serverChannel.configureBlocking(false);
selector = Selector.open();
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
} catch (IOException ex) {
ex.printStackTrace();
return;
}
while (true) {
try {
selector.select();
} catch (IOException ex) {
ex.printStackTrace();
break;
}
Set<SelectionKey> readyKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = readyKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
try {
if (key.isAcceptable()) {
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel client = server.accept();
System.out.println("Accepted connection from " + client);
client.configureBlocking(false);
SelectionKey clientKey = client.register(selector,
SelectionKey.OP_WRITE | SelectionKey.OP_READ);
ByteBuffer buffer = ByteBuffer.allocate(100);
clientKey.attach(buffer);
}
if (key.isReadable()) {
SocketChannel client = (SocketChannel) key.channel();
ByteBuffer output = (ByteBuffer) key.attachment();
client.read(output);
}
if (key.isWritable()) {
SocketChannel client = (SocketChannel) key.channel();
ByteBuffer output = (ByteBuffer) key.attachment();
output.flip();
client.write(output);
output.compact();
}
} catch (IOException ex) {
key.cancel();
try {
key.channel().close();
} catch (IOException cex) {
}
}
}
}
}
}
异步I/O模式 省略
1.1.8. 远程过程调用RPC¶
C/S模型和过程调用时,需要考虑
过程调用的类型
· 本地的过程调用
· 同主机的过程调用
· 不同主机的过程调用
市面上的RPC开源软件很多,可以搜索并研究使用方式。