Daily-Note 09-15
OVERVIEW
在分布式系统中,分布式高并发编程模型、socket 网络模型、IO 模型等属于了分布式系统的基石。
- Actor Model,一种高性能分布式并发编程模型,来自 Erlang 语言。背景介绍
- Netty, Netty is a NIO client server framework which enables quick and easy development of network applications such as protocol servers and clients. It greatly simplifies and streamlines network programming such as TCP and UDP socket server.
- select / poll、信号驱动 IO、epoll,多种非阻塞的操作系统 IO 模型。
- Java NIO,与 java 1.4 之前的 BIO 相比,NIO 提供了非阻塞的 IO 模型,大大提高了 Java 编程语言在大型应用服务器上的 IO 性能。
Actor Model
学习 akka actor example
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.LoggerOps
import akka.actor.typed.{ActorRef, ActorSystem, Behavior}
object HelloWorld {
final case class Greet(whom: String, replyTo: ActorRef[Greeted])
final case class Greeted(whom: String, from: ActorRef[Greet])
def apply(): Behavior[Greet] = Behaviors.receive { (context, message) =>
println(s"Hello ${message.whom}!")
message.replyTo ! Greeted(message.whom, context.self)
Behaviors.same
}
}
object HelloWorldBot {
def apply(max: Int): Behavior[HelloWorld.Greeted] = {
bot(0, max)
}
private def bot(greetingCounter: Int, max: Int): Behavior[HelloWorld.Greeted] =
Behaviors.receive { (context, message) =>
val n = greetingCounter + 1
println(s"Greeting $n for ${message.whom}")
if (n == max) {
Behaviors.stopped
} else {
message.from ! HelloWorld.Greet(message.whom, context.self)
bot(n, max)
}
}
}
object HelloWorldMain {
final case class SayHello(name: String)
def apply(): Behavior[SayHello] =
Behaviors.setup { context =>
val greeter = context.spawn(HelloWorld(), "greeter")
Behaviors.receiveMessage { message =>
val replyTo = context.spawn(HelloWorldBot(max = 3), message.name)
greeter ! HelloWorld.Greet(message.name, replyTo)
Behaviors.same
}
}
def main(args: Array[String]): Unit = {
val system: ActorSystem[HelloWorldMain.SayHello] =
ActorSystem(HelloWorldMain(), "hello")
system ! HelloWorldMain.SayHello("World")
system ! HelloWorldMain.SayHello("Akka")
}
}
// This is run by ScalaFiddle
//HelloWorldMain.main(Array.empty)
Netty
Netty Server demo, a simple echo server
public class NettyServer {
public NettyServer(int port) {
NioEventLoopGroup bossG = new NioEventLoopGroup();
NioEventLoopGroup workerG = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossG, workerG).channel(NioServerSocketChannel.class).childHandler(
new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
byte[] bytes = new byte[buf.readableBytes()];
buf.readBytes(bytes);
String req = new String(bytes, "utf-8");
System.out.println(req);
String resp = "echo : " + req;
ByteBuf outbuf = Unpooled.copiedBuffer(resp.getBytes());
ctx.write(outbuf);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
}
});
}
}
);
try {
ChannelFuture f = serverBootstrap.bind(port).sync();
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
bossG.shutdownGracefully();
workerG.shutdownGracefully();
e.printStackTrace();
}
}
public static void main(String[] args) {
new NettyServer(8082);
}
}
select / poll、信号驱动 IO、epoll
Linux-UNIX 系统编程手册
-
水平触发 & 边缘触发,水平触发通知:如果文件描述符上可以非阻塞地执行 I/O 系统调用,此时认为它已经就绪。边缘触发通知:如果文件描述符自上次状态检查以来有了新的 I/O 活动(比如新的输入),此时需要触发通知。
-
select 和 poll 这种 IO 模型,首先,将感兴趣的文件描述符集合复制到内核空间中,系统通过顺序扫描检查感兴趣的文件描述符中发生的状态,然后设置其状态,然后返回给调用者,可以通过 timeout 参数设置是否阻塞、非阻塞、超时等待等方式,调用者获得结果后顺序检查感兴趣的文件描述符集合,从中找到发生的 IO 事件。
-
信号驱动 IO 这种模型,首先,需要通过 sigaction 函数调用设置需要监听的文件描述符的 IO 事件以及发生 IO 就绪时唤醒的进程或进程组,并设置对应的回调处理函数,当 IO 就绪时将调用指定的回调函数,该模式下的触发为边缘触发,因此需要尽可能读取输入。与 select / poll 不同的是,该模式下内核空间会记录对应的文件描述符,因此不需要每次查询都复制对应的文件描述符到内核空间中,因此性能有大幅度的提升。
-
epoll 这种 IO 模型下,首先,通过 epoll_create() 函数调用获取到一个关联到内核空间指定数据结构的文件描述符(但是不适用于 IO),接着,调用 epoll_ctl() 函数设置触发模式,对应的感兴趣文件描述符,最后,通过 epoll_wait() 函数返回一个就绪列表(ready list)该列表中即发生对应 IO 就绪事件的列表。
与信号驱动的 IO 模型相同,内核空间会记录对应的数据结构,因此不需要每次查询都复制文件描述符到内核空间。与信号驱动模式不同的是,该模式包括两种触发模式,而信号驱动 IO 只有边缘触发模式,此外该模式能够指定触发的 IO 事件为写就绪还是读就绪,并且不需要处理信号驱动 IO 模式下存在的排队信号溢出的问题。
-
OTHER
- epoll 相对于 select 的性能提升来自于系统实现的 callback,使得无需线性扫描所有的文件描述符,而是由文件描述符自身就绪时通知内核。因此如果在大量文件描述符都处于活跃状态时,select 和 epoll 的性能差距并不明显。
Java NIO
Netty 权威指南
- NIO 涉及 buffer、channel、selector 等概念,其中 selector 作为复用器能够监听多个 selector 的 IO 事件,因此相对于 socket 编程中传统的 BIO 每一个连接建立一个线程进行处理(或通过线程池处理)其开销更低。NIO 所有的通信都需要通过 buffer 进行传递,buffer 是内存中的缓存区,读写都是直接对缓存区的读写。
- 实验:多种 IO 下 socket 编程对比
NIO 编程
// main
package com.example;
/**
* Hello world!
*/
public final class App {
private App() {
}
/**
* Says hello to the world.
*
* @param args The arguments of the program.
*/
public static void main(String[] args) {
System.out.println("Hello World!");
new NIOServer(8081).run();
}
}
// NIO server
package com.example;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
public class Server implements Runnable {
private Selector selector;
private ServerSocketChannel serverChannel;
private boolean running;
public Server(int port) {
try {
selector = Selector.open();
serverChannel = ServerSocketChannel.open();
serverChannel.socket().bind(new InetSocketAddress(port), 1024);
serverChannel.configureBlocking(false);
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
running = true;
} catch (IOException e) {
e.printStackTrace();
}
}
private void handle(SelectionKey key) {
if (key.isValid()) {
if (key.isAcceptable()) {
ServerSocketChannel channel = (ServerSocketChannel) key.channel();
try {
SocketChannel c = channel.accept();
c.configureBlocking(false);
c.register(selector, SelectionKey.OP_READ);
} catch (IOException e) {
e.printStackTrace();
}
}
if (key.isReadable()) {
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer buf = ByteBuffer.allocate(4096);
try {
int read = sc.read(buf);
if (read > 0) {
// read
buf.flip();
byte[] bytes = new byte[buf.remaining()];
buf.get(bytes);
String req = new String(bytes, "UTF-8");
System.out.println(req);
// write
String resp = "echo : " + req;
buf.clear();
buf.put(resp.getBytes());
buf.flip();
sc.write(buf);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
@Override
public void run() {
while (running) {
try {
selector.select(1000);
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectedKeys.iterator();
SelectionKey next;
while (it.hasNext()) {
next = it.next();
it.remove();
handle(next);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
NIO2.0 编程
package com.example;
/**
* Hello world!
*/
public final class App {
private App() {
}
/**
* Says hello to the world.
*
* @param args The arguments of the program.
*/
public static void main(String[] args) {
System.out.println("Hello World!");
new AIOServer(8081).run();
}
}
package com.example;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
public class AIOServer {
private AsynchronousServerSocketChannel asyncSC;
public AIOServer(int port) {
try {
asyncSC = AsynchronousServerSocketChannel.open();
asyncSC.bind(new InetSocketAddress(port));
} catch (IOException e) {
e.printStackTrace();
}
}
public void run() {
asyncSC.accept(this, new CompletionHandler<AsynchronousSocketChannel, AIOServer>() {
@Override
public void completed(AsynchronousSocketChannel result, AIOServer attachment) {
attachment.asyncSC.accept(attachment, this);
ByteBuffer buf = ByteBuffer.allocate(1024);
final AsynchronousSocketChannel ch = result;
result.read(buf, buf, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
if (result > 0) {
attachment.flip();
byte[] bytes = new byte[attachment.remaining()];
attachment.get(bytes);
try {
String req = new String(bytes, "UTF-8");
System.out.println(req);
String resp = "echo : " + req;
buf.clear();
buf.put(resp.getBytes());
buf.flip();
ch.write(buf);
// redo
buf.clear();
ch.read(buf, buf, this);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
exc.printStackTrace();
}
});
}
@Override
public void failed(Throwable exc, AIOServer attachment) {
exc.printStackTrace();
}
});
try {
Thread.sleep(2000 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
- 性能测试
- 对比 Go、Rust 等语言 IO 模型