网络编程
- 什么是网络:
- 在计算机领域中,网络是信息传输、接收、共享的虚拟平台。
- 通过它把各个点、面、体的信息联系到一起,从而实现这些资源的共享
- 什么是网络编程:
- 网络编程从大的方面说就是对信息的发送到接收
- 通过操作相应的Api调度计算机硬件资源,并利用传输管道(网线)进行数据交换的过程。
- 更为具体的涉及:网络模型,套接字,数据包
- 七层网络模型-OSI
- 基础层:物理层(physical),数据链路层(datalink),网络层(network)
- 传输层(transport):TCP-UDP协议层、socket
- 高级层:会话层(session),表示层(presentation),应用层(application)
- 什么是socket:简单来说就是IP地址与端口的结合协议(RFC 793),一种地址与端口的结合描述协议
- socket作用与组成
- 在网络传输中用于唯一标示两个端点之间的链接
- 端点包括ip和port
- 四个要素:客户端地址,客户端端口,服务器地址,服务器端口
- socket之TCP:
- TCP是面向连接的通信协议
- 通过三次握手建立连接,通讯完成时要拆除连接
- 由于TCP是面向连接的,所以只能用于端到端的通讯
- socket之UDP
- UDP是面向无连接的通讯协议
- UDP数据包括目的端口号和源端口号信息
- 由于通讯不需要连接,所以可以实现广播发送,并不局限于端到端
通讯demo
- 客户端
public class Client{
public static void main(String[] args) throws IOException{
Socket socket = new Socket();
//超时时间
socket.setSoTimeout(3000);
//连接本地,端口2000,超时时间3000ms
socket.connect(new InetSocketAddress(InetSocketAddress.getLocalHost(),2000),3000);
System.out.printf("已发起服务器连接,并进入后续流程~");
System.out.printf("客户端信息:"+socket.getLocalAddress()+"P:"+socket.getLocalPort());
System.out.printf("服务器信息:"+socket.getInetAddress()+"P:"+socket.getPort());
try{
todo(socket);
}catch(Exception e){
System.out.printf("异常关闭");
}
socket.close();
System.out.printf("客户端已退出");
}
private static void todo(Socket client) throws IOException{
//构建键盘输入流
InputStream in = System.in;
BufferedReader input = new BufferedReader(new InputStreamReader(in));
//得到socket输出流,并转换为打印流
OutputStream outputStream = client.getOutputStream();
PrintStream socketPrintStream = new PrintStream(outputStream);
//得到socket输入流,并转换为BufferedReader
InputStream inputStream = client.getInputStream();
BufferedReader socketBufferedReader = new BufferedReader(new InputStreamReader(inputStream));
boolean flag = true;
do{
//键盘读取一行
String str = input.readLine();
//发送到服务器
socketPrintStream.println(str);
//从服务器读取一行
String echo = socketBufferedReader.readLine();
if ("bye".equalsIgnoreCase(echo)) {
flag = false;
}else{
System.out.println(echo);
}
}while(flag);
//资源释放
socketPrintStream.close();
socketBufferedReader.close();
}
}
- 服务端
public class Server{
public static void main(String[] args) throws IOexception{
ServerSocket server = new ServerSocket(2000);
System.out.printf("服务器准备就绪~");
System.out.printf("服务器信息:"+server.getInetAddress()+" P:"+server.getLocalPort());
//等待客户端连接
for (; ; ) {
//得到客户端
Socket client = server.accept();
//客户端构建异步线程
ClientHandler clientHandler = new ClientHandler(client);
//启动线程
clientHandler.start();
}
}
//客户端消息处理
private static class ClientHandler extends Thread{
private Socket socket;
private boolean flag = true;
ClientHandler(Socket socket){
this.socket = socket;
}
@Override
public void run() {
super.run();
System.out.println("新客户端连接:" + socket.getInetAddress() +
" P:" + socket.getPort());
try {
// 得到打印流,用于数据输出;服务器回送数据使用
PrintStream socketOutput = new PrintStream(socket.getOutputStream());
// 得到输入流,用于接收数据
BufferedReader socketInput = new BufferedReader(new InputStreamReader(
socket.getInputStream()));
do {
// 客户端拿到一条数据
String str = socketInput.readLine();
if ("bye".equalsIgnoreCase(str)) {
flag = false;
// 回送
socketOutput.println("bye");
} else {
// 打印到屏幕。并回送数据长度
System.out.println(str);
socketOutput.println("回送:" + str.length());
}
} while (flag);
socketInput.close();
socketOutput.close();
} catch (Exception e) {
System.out.println("连接异常断开");
} finally {
// 连接关闭
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
System.out.println("客户端已退出:" + socket.getInetAddress() +
" P:" + socket.getPort());
}
}
}
- 报文段:
- 报文段是指TCP/IP协议网络传输过程中,起着路由导航作用
- 用以查询各个网络路由网段,IP地址,交换协议等IP数据包
- 报文段充当整个TCP/IP协议数据包的导航路由功能。
- 报文在传输过程中会不断地封装成分组,包,帧来传输
- 封装方式就是添加一些控制信息组成的首部,即报文头
- 传输协议:
- 一种规定,约束
- 约定大于配置,在网络传输中依然适用;网络的传输流程是健壮的,稳定的,得益于基础的协议构成
- 简单来说:A -> B 的传输数据,B能识别,反之B -> A 的传输数据A也能识别,这就是协议
- Mac地址:
- media access control 或者medium access control
- 意为媒体访问控制,或者称为物理地址,硬件地址
- 用来定义网络设备的位置
- 形如:44-45-53-54-00-00
UDP
- 什么是udp:
- 一种用户数据报协议,又称用户数据报文协议
- 是一个简单的面向数据报的传输层协议,正式规范为RFC768
- 用户数据协议,非连接协议
- 为什么udp不可靠
- 它一旦把应用程序发给网络层的数据发送出去,就不保留数据备份
- udp在IP数据报的头部仅仅加入了复用和数据校验(字段)
- 发送端产生数据,接收端从网络中抓取数据
- 结构简单,无校验,速度快,容易丢包,可广播
- udp包最大长度:65507byte字节
- API-DatagramSocket
- 用于接收和发送udp的类
- 负责发送某一个udp包,或者接收udp包
- 不同于tcp,udp并没有合并到socket API中
- API-DatagramPacket
- 用于处理报文
- 将byte数组,目标地址,目标端口等数据包封装为报文或将报文拆卸为byte数组
- 是udp的发送实体,也是接收实体
- 单播,多播,广播
局域网搜索案例
- 提供者
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.util.UUID;
/**
* UDP 提供者,用于提供服务
*/
public class UDPProvider {
public static void main(String[] args) throws IOException {
// 生成一份唯一标示
String sn = UUID.randomUUID().toString();
Provider provider = new Provider(sn);
provider.start();
// 读取任意键盘信息后可以退出
//noinspection ResultOfMethodCallIgnored
System.in.read();
provider.exit();
}
private static class Provider extends Thread {
private final String sn;
private boolean done = false;
private DatagramSocket ds = null;
public Provider(String sn) {
super();
this.sn = sn;
}
@Override
public void run() {
super.run();
System.out.println("UDPProvider Started.");
try {
// 监听20000 端口
ds = new DatagramSocket(20000);
while (!done) {
// 构建接收实体
final byte[] buf = new byte[512];
DatagramPacket receivePack = new DatagramPacket(buf, buf.length);
// 接收
ds.receive(receivePack);
// 打印接收到的信息与发送者的信息
// 发送者的IP地址
String ip = receivePack.getAddress().getHostAddress();
int port = receivePack.getPort();
int dataLen = receivePack.getLength();
String data = new String(receivePack.getData(), 0, dataLen);
System.out.println("UDPProvider receive form ip:" + ip
+ "\tport:" + port + "\tdata:" + data);
// 解析端口号
int responsePort = MessageCreator.parsePort(data);
if (responsePort != -1) {
// 构建一份回送数据
String responseData = MessageCreator.buildWithSn(sn);
byte[] responseDataBytes = responseData.getBytes();
// 直接根据发送者构建一份回送信息
DatagramPacket responsePacket = new DatagramPacket(responseDataBytes,
responseDataBytes.length,
receivePack.getAddress(),
responsePort);
ds.send(responsePacket);
}
}
} catch (Exception ignored) {
} finally {
close();
}
// 完成
System.out.println("UDPProvider Finished.");
}
private void close() {
if (ds != null) {
ds.close();
ds = null;
}
}
/**
* 提供结束
*/
void exit() {
done = true;
close();
}
}
}
- 搜索者
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
/**
* UDP 搜索者,用于搜索服务支持方
*/
public class UDPSearcher {
private static final int LISTEN_PORT = 30000;
public static void main(String[] args) throws IOException, InterruptedException {
System.out.println("UDPSearcher Started.");
Listener listener = listen();
sendBroadcast();
// 读取任意键盘信息后可以退出
//noinspection ResultOfMethodCallIgnored
System.in.read();
List<Device> devices = listener.getDevicesAndClose();
for (Device device : devices) {
System.out.println("Device:" + device.toString());
}
// 完成
System.out.println("UDPSearcher Finished.");
}
//监听方法
private static Listener listen() throws InterruptedException {
System.out.println("UDPSearcher start listen.");
CountDownLatch countDownLatch = new CountDownLatch(1);
Listener listener = new Listener(LISTEN_PORT, countDownLatch);
listener.start();
countDownLatch.await();
return listener;
}
//发送广播
private static void sendBroadcast() throws IOException {
System.out.println("UDPSearcher sendBroadcast started.");
// 作为搜索方,让系统自动分配端口
DatagramSocket ds = new DatagramSocket();
// 构建一份请求数据
String requestData = MessageCreator.buildWithPort(LISTEN_PORT);
byte[] requestDataBytes = requestData.getBytes();
// 直接构建packet
DatagramPacket requestPacket = new DatagramPacket(requestDataBytes,
requestDataBytes.length);
// 20000端口, 广播地址
requestPacket.setAddress(InetAddress.getByName("255.255.255.255"));
requestPacket.setPort(20000);
// 发送
ds.send(requestPacket);
ds.close();
// 完成
System.out.println("UDPSearcher sendBroadcast finished.");
}
//设备信息
private static class Device {
final int port;
final String ip;
final String sn;
private Device(int port, String ip, String sn) {
this.port = port;
this.ip = ip;
this.sn = sn;
}
@Override
public String toString() {
return "Device{" +
"port=" + port +
", ip='" + ip + '\'' +
", sn='" + sn + '\'' +
'}';
}
}
//监听类
private static class Listener extends Thread {
private final int listenPort;
private final CountDownLatch countDownLatch;
private final List<Device> devices = new ArrayList<>();
private boolean done = false;
private DatagramSocket ds = null;
public Listener(int listenPort, CountDownLatch countDownLatch) {
super();
this.listenPort = listenPort;
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
super.run();
// 通知已启动
countDownLatch.countDown();
try {
// 监听回送端口
ds = new DatagramSocket(listenPort);
while (!done) {
// 构建接收实体
final byte[] buf = new byte[512];
DatagramPacket receivePack = new DatagramPacket(buf, buf.length);
// 接收
ds.receive(receivePack);
// 打印接收到的信息与发送者的信息
// 发送者的IP地址
String ip = receivePack.getAddress().getHostAddress();
int port = receivePack.getPort();
int dataLen = receivePack.getLength();
String data = new String(receivePack.getData(), 0, dataLen);
System.out.println("UDPSearcher receive form ip:" + ip
+ "\tport:" + port + "\tdata:" + data);
String sn = MessageCreator.parseSn(data);
if (sn != null) {
Device device = new Device(port, ip, sn);
devices.add(device);
}
}
} catch (Exception ignored) {
} finally {
close();
}
System.out.println("UDPSearcher listener finished.");
}
private void close() {
if (ds != null) {
ds.close();
ds = null;
}
}
List<Device> getDevicesAndClose() {
done = true;
close();
return devices;
}
}
}
- 消息创建者
public class MessageCreator {
private static final String SN_HEADER = "收到暗号,我是(SN):";
private static final String PORT_HEADER = "这是暗号,请回电端口(Port):";
public static String buildWithPort(int port) {
return PORT_HEADER + port;
}
public static int parsePort(String data) {
if (data.startsWith(PORT_HEADER)) {
return Integer.parseInt(data.substring(PORT_HEADER.length()));
}
return -1;
}
public static String buildWithSn(String sn) {
return SN_HEADER + sn;
}
public static String parseSn(String data) {
if (data.startsWith(SN_HEADER)) {
return data.substring(SN_HEADER.length());
}
return null;
}
}
TCP
- 什么是TCP?
- TCP是传输控制协议;是一种面向连接的、可靠的、基于字节流的传输层通信协议;由IETF的RFC 793定义
- TCP的机制
- 三次握手,四次挥手
- 具有校验机制,可靠,数据传输稳定
- TCP能做什么?
- 聊天消息传输,推送
- 单人语音,视频聊天等
- 几乎udp能做的都能做,但需要考虑复杂性,性能问题
- 限制:无法进行广播,多播操作
TCP数据传输案例
- 客户端
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Inet4Address;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketException;
import java.nio.ByteBuffer;
public class Client {
private static final int PORT = 20000;
private static final int LOCAL_PORT = 20001;
public static void main(String[] args) throws IOException {
Socket socket = createSocket();
initSocket(socket);
// 链接到本地20000端口,超时时间3秒,超过则抛出超时异常
socket.connect(new InetSocketAddress(Inet4Address.getLocalHost(), PORT), 3000);
System.out.println("已发起服务器连接,并进入后续流程~");
System.out.println("客户端信息:" + socket.getLocalAddress() + " P:" + socket.getLocalPort());
System.out.println("服务器信息:" + socket.getInetAddress() + " P:" + socket.getPort());
try {
// 发送接收数据
todo(socket);
} catch (Exception e) {
System.out.println("异常关闭");
}
// 释放资源
socket.close();
System.out.println("客户端已退出~");
}
private static Socket createSocket() throws IOException {
/*
// 无代理模式,等效于空构造函数
Socket socket = new Socket(Proxy.NO_PROXY);
// 新建一份具有HTTP代理的套接字,传输数据将通过www.baidu.com:8080端口转发
Proxy proxy = new Proxy(Proxy.Type.HTTP,
new InetSocketAddress(Inet4Address.getByName("www.baidu.com"), 8800));
socket = new Socket(proxy);
// 新建一个套接字,并且直接链接到本地20000的服务器上
socket = new Socket("localhost", PORT);
// 新建一个套接字,并且直接链接到本地20000的服务器上
socket = new Socket(Inet4Address.getLocalHost(), PORT);
// 新建一个套接字,并且直接链接到本地20000的服务器上,并且绑定到本地20001端口上
socket = new Socket("localhost", PORT, Inet4Address.getLocalHost(), LOCAL_PORT);
socket = new Socket(Inet4Address.getLocalHost(), PORT, Inet4Address.getLocalHost(), LOCAL_PORT);
*/
Socket socket = new Socket();
// 绑定到本地20001端口
socket.bind(new InetSocketAddress(Inet4Address.getLocalHost(), LOCAL_PORT));
return socket;
}
private static void initSocket(Socket socket) throws SocketException {
// 设置读取超时时间为2秒
socket.setSoTimeout(2000);
// 是否复用未完全关闭的Socket地址,对于指定bind操作后的套接字有效
socket.setReuseAddress(true);
// 是否开启Nagle算法
socket.setTcpNoDelay(true);
// 是否需要在长时无数据响应时发送确认数据(类似心跳包),时间大约为2小时
socket.setKeepAlive(true);
// 对于close关闭操作行为进行怎样的处理;默认为false,0
// false、0:默认情况,关闭时立即返回,底层系统接管输出流,将缓冲区内的数据发送完成
// true、0:关闭时立即返回,缓冲区数据抛弃,直接发送RST结束命令到对方,并无需经过2MSL等待
// true、200:关闭时最长阻塞200毫秒,随后按第二情况处理
socket.setSoLinger(true, 20);
// 是否让紧急数据内敛,默认false;紧急数据通过 socket.sendUrgentData(1);发送
socket.setOOBInline(true);
// 设置接收发送缓冲器大小
socket.setReceiveBufferSize(64 * 1024 * 1024);
socket.setSendBufferSize(64 * 1024 * 1024);
// 设置性能参数:短链接,延迟,带宽的相对重要性
socket.setPerformancePreferences(1, 1, 0);
}
private static void todo(Socket client) throws IOException {
// 得到Socket输出流
OutputStream outputStream = client.getOutputStream();
// 得到Socket输入流
InputStream inputStream = client.getInputStream();
byte[] buffer = new byte[256];
ByteBuffer byteBuffer = ByteBuffer.wrap(buffer);
// byte
byteBuffer.put((byte) 126);
// char
char c = 'a';
byteBuffer.putChar(c);
// int
int i = 2323123;
byteBuffer.putInt(i);
// bool
boolean b = true;
byteBuffer.put(b ? (byte) 1 : (byte) 0);
// Long
long l = 298789739;
byteBuffer.putLong(l);
// float
float f = 12.345f;
byteBuffer.putFloat(f);
// double
double d = 13.31241248782973;
byteBuffer.putDouble(d);
// String
String str = "Hello你好!";
byteBuffer.put(str.getBytes());
// 发送到服务器
outputStream.write(buffer, 0, byteBuffer.position() + 1);
// 接收服务器返回
int read = inputStream.read(buffer);
System.out.println("收到数量:" + read);
// 资源释放
outputStream.close();
inputStream.close();
}
}
- 服务端
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Inet4Address;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.ByteBuffer;
public class Server {
private static final int PORT = 20000;
public static void main(String[] args) throws IOException {
ServerSocket server = createServerSocket();
initServerSocket(server);
// 绑定到本地端口上
server.bind(new InetSocketAddress(Inet4Address.getLocalHost(), PORT), 50);
System.out.println("服务器准备就绪~");
System.out.println("服务器信息:" + server.getInetAddress() + " P:" + server.getLocalPort());
// 等待客户端连接
for (; ; ) {
// 得到客户端
Socket client = server.accept();
// 客户端构建异步线程
ClientHandler clientHandler = new ClientHandler(client);
// 启动线程
clientHandler.start();
}
}
private static ServerSocket createServerSocket() throws IOException {
// 创建基础的ServerSocket
ServerSocket serverSocket = new ServerSocket();
// 绑定到本地端口20000上,并且设置当前可允许等待链接的队列为50个
//serverSocket = new ServerSocket(PORT);
// 等效于上面的方案,队列设置为50个
//serverSocket = new ServerSocket(PORT, 50);
// 与上面等同
// serverSocket = new ServerSocket(PORT, 50, Inet4Address.getLocalHost());
return serverSocket;
}
private static void initServerSocket(ServerSocket serverSocket) throws IOException {
// 是否复用未完全关闭的地址端口
serverSocket.setReuseAddress(true);
// 等效Socket#setReceiveBufferSize
serverSocket.setReceiveBufferSize(64 * 1024 * 1024);
// 设置serverSocket#accept超时时间
// serverSocket.setSoTimeout(2000);
// 设置性能参数:短链接,延迟,带宽的相对重要性
serverSocket.setPerformancePreferences(1, 1, 1);
}
/**
* 客户端消息处理
*/
private static class ClientHandler extends Thread {
private Socket socket;
ClientHandler(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
super.run();
System.out.println("新客户端连接:" + socket.getInetAddress() +
" P:" + socket.getPort());
try {
// 得到套接字流
OutputStream outputStream = socket.getOutputStream();
InputStream inputStream = socket.getInputStream();
byte[] buffer = new byte[256];
int readCount = inputStream.read(buffer);
ByteBuffer byteBuffer = ByteBuffer.wrap(buffer, 0, readCount);
// byte
byte be = byteBuffer.get();
// char
char c = byteBuffer.getChar();
// int
int i = byteBuffer.getInt();
// bool
boolean b = byteBuffer.get() == 1;
// Long
long l = byteBuffer.getLong();
// float
float f = byteBuffer.getFloat();
// double
double d = byteBuffer.getDouble();
// String
int pos = byteBuffer.position();
String str = new String(buffer, pos, readCount - pos - 1);
System.out.println("收到数量:" + readCount + " 数据:"
+ be + "\n"
+ c + "\n"
+ i + "\n"
+ b + "\n"
+ l + "\n"
+ f + "\n"
+ d + "\n"
+ str + "\n");
outputStream.write(buffer, 0, readCount);
outputStream.close();
inputStream.close();
} catch (Exception e) {
System.out.println("连接异常断开");
} finally {
// 连接关闭
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
System.out.println("客户端已退出:" + socket.getInetAddress() +
" P:" + socket.getPort());
}
}
}
- 转换工具类
public class Tools {
public static int byteArrayToInt(byte[] b) {
return b[3] & 0xFF |
(b[2] & 0xFF) << 8 |
(b[1] & 0xFF) << 16 |
(b[0] & 0xFF) << 24;
}
public static byte[] intToByteArray(int a) {
return new byte[]{
(byte) ((a >> 24) & 0xFF),
(byte) ((a >> 16) & 0xFF),
(byte) ((a >> 8) & 0xFF),
(byte) (a & 0xFF)
};
}
}
数据发送与接收并行
常量
- tcp
package constants;
public class TCPConstants {
// 服务器固化UDP接收端口
public static int PORT_SERVER = 30401;
}
- udp
package constants;
public class UDPConstants {
// 公用头部
public static byte[] HEADER = new byte[]{7, 7, 7, 7, 7, 7, 7, 7};
// 服务器固化UDP接收端口
public static int PORT_SERVER = 30201;
// 客户端回送端口
public static int PORT_CLIENT_RESPONSE = 30202;
}
工具类
- ByteUtils
package clink.net.qiujuer.clink.utils;
public class ByteUtils {
/**
* Does this byte array begin with match array content?
*
* @param source Byte array to examine
* @param match Byte array to locate in <code>source</code>
* @return true If the starting bytes are equal
*/
public static boolean startsWith(byte[] source, byte[] match) {
return startsWith(source, 0, match);
}
/**
* Does this byte array begin with match array content?
*
* @param source Byte array to examine
* @param offset An offset into the <code>source</code> array
* @param match Byte array to locate in <code>source</code>
* @return true If the starting bytes are equal
*/
public static boolean startsWith(byte[] source, int offset, byte[] match) {
if (match.length > (source.length - offset)) {
return false;
}
for (int i = 0; i < match.length; i++) {
if (source[offset + i] != match[i]) {
return false;
}
}
return true;
}
/**
* Does the source array equal the match array?
*
* @param source Byte array to examine
* @param match Byte array to locate in <code>source</code>
* @return true If the two arrays are equal
*/
public static boolean equals(byte[] source, byte[] match) {
if (match.length != source.length) {
return false;
}
return startsWith(source, 0, match);
}
/**
* Copies bytes from the source byte array to the destination array
*
* @param source The source array
* @param srcBegin Index of the first source byte to copy
* @param srcEnd Index after the last source byte to copy
* @param destination The destination array
* @param dstBegin The starting offset in the destination array
*/
public static void getBytes(byte[] source, int srcBegin, int srcEnd, byte[] destination,
int dstBegin) {
System.arraycopy(source, srcBegin, destination, dstBegin, srcEnd - srcBegin);
}
/**
* Return a new byte array containing a sub-portion of the source array
*
* @param srcBegin The beginning index (inclusive)
* @param srcEnd The ending index (exclusive)
* @return The new, populated byte array
*/
public static byte[] subbytes(byte[] source, int srcBegin, int srcEnd) {
byte destination[];
destination = new byte[srcEnd - srcBegin];
getBytes(source, srcBegin, srcEnd, destination, 0);
return destination;
}
/**
* Return a new byte array containing a sub-portion of the source array
*
* @param srcBegin The beginning index (inclusive)
* @return The new, populated byte array
*/
public static byte[] subbytes(byte[] source, int srcBegin) {
return subbytes(source, srcBegin, source.length);
}
}
- CloseUtils
package clink.net.qiujuer.clink.utils;
import java.io.Closeable;
import java.io.IOException;
public class CloseUtils {
public static void close(Closeable... closeables) {
if (closeables == null) {
return;
}
for (Closeable closeable : closeables) {
try {
closeable.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
客户端
- bean
package client.bean;
public class ServerInfo {
private String sn;
private int port;
private String address;
public ServerInfo(int port, String ip, String sn) {
this.port = port;
this.address = ip;
this.sn = sn;
}
public String getSn() {
return sn;
}
public void setSn(String sn) {
this.sn = sn;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
public String getAddress() {
return address;
}
public void setAddress(String address) {
this.address = address;
}
@Override
public String toString() {
return "ServerInfo{" +
"sn='" + sn + '\'' +
", port=" + port +
", address='" + address + '\'' +
'}';
}
}
- client
package client;
import client.bean.ServerInfo;
import java.io.IOException;
public class Client {
public static void main(String[] args) {
ServerInfo info = UDPSearcher.searchServer(10000);
System.out.println("Server:" + info);
if (info != null) {
try {
TCPClient.linkWith(info);
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
- TCPClient
package client;
import client.bean.ServerInfo;
import clink.net.qiujuer.clink.utils.CloseUtils;
import java.io.*;
import java.net.Inet4Address;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketTimeoutException;
public class TCPClient {
public static void linkWith(ServerInfo info) throws IOException {
Socket socket = new Socket();
// 超时时间
socket.setSoTimeout(3000);
// 连接本地,端口2000;超时时间3000ms
socket.connect(new InetSocketAddress(Inet4Address.getByName(info.getAddress()), info.getPort()), 3000);
System.out.println("已发起服务器连接,并进入后续流程~");
System.out.println("客户端信息:" + socket.getLocalAddress() + " P:" + socket.getLocalPort());
System.out.println("服务器信息:" + socket.getInetAddress() + " P:" + socket.getPort());
try {
ReadHandler readHandler = new ReadHandler(socket.getInputStream());
readHandler.start();
// 发送接收数据
write(socket);
// 退出操作
readHandler.exit();
} catch (Exception e) {
System.out.println("异常关闭");
}
// 释放资源
socket.close();
System.out.println("客户端已退出~");
}
private static void write(Socket client) throws IOException {
// 构建键盘输入流
InputStream in = System.in;
BufferedReader input = new BufferedReader(new InputStreamReader(in));
// 得到Socket输出流,并转换为打印流
OutputStream outputStream = client.getOutputStream();
PrintStream socketPrintStream = new PrintStream(outputStream);
do {
// 键盘读取一行
String str = input.readLine();
// 发送到服务器
socketPrintStream.println(str);
if ("00bye00".equalsIgnoreCase(str)) {
break;
}
} while (true);
// 资源释放
socketPrintStream.close();
}
static class ReadHandler extends Thread {
private boolean done = false;
private final InputStream inputStream;
ReadHandler(InputStream inputStream) {
this.inputStream = inputStream;
}
@Override
public void run() {
super.run();
try {
// 得到输入流,用于接收数据
BufferedReader socketInput = new BufferedReader(new InputStreamReader(inputStream));
do {
String str;
try {
// 客户端拿到一条数据
str = socketInput.readLine();
} catch (SocketTimeoutException e) {
continue;
}
if (str == null) {
System.out.println("连接已关闭,无法读取数据!");
break;
}
// 打印到屏幕
System.out.println(str);
} while (!done);
} catch (Exception e) {
if (!done) {
System.out.println("连接异常断开:" + e.getMessage());
}
} finally {
// 连接关闭
CloseUtils.close(inputStream);
}
}
void exit() {
done = true;
CloseUtils.close(inputStream);
}
}
}
- UDPSearcher
package client;
import client.bean.ServerInfo;
import clink.net.qiujuer.clink.utils.ByteUtils;
import constants.UDPConstants;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class UDPSearcher {
private static final int LISTEN_PORT = UDPConstants.PORT_CLIENT_RESPONSE;
public static ServerInfo searchServer(int timeout) {
System.out.println("UDPSearcher Started.");
// 成功收到回送的栅栏
CountDownLatch receiveLatch = new CountDownLatch(1);
Listener listener = null;
try {
listener = listen(receiveLatch);
sendBroadcast();
receiveLatch.await(timeout, TimeUnit.MILLISECONDS);
} catch (Exception e) {
e.printStackTrace();
}
// 完成
System.out.println("UDPSearcher Finished.");
if (listener == null) {
return null;
}
List<ServerInfo> devices = listener.getServerAndClose();
if (devices.size() > 0) {
return devices.get(0);
}
return null;
}
private static Listener listen(CountDownLatch receiveLatch) throws InterruptedException {
System.out.println("UDPSearcher start listen.");
CountDownLatch startDownLatch = new CountDownLatch(1);
Listener listener = new Listener(LISTEN_PORT, startDownLatch, receiveLatch);
listener.start();
startDownLatch.await();
return listener;
}
private static void sendBroadcast() throws IOException {
System.out.println("UDPSearcher sendBroadcast started.");
// 作为搜索方,让系统自动分配端口
DatagramSocket ds = new DatagramSocket();
// 构建一份请求数据
ByteBuffer byteBuffer = ByteBuffer.allocate(128);
// 头部
byteBuffer.put(UDPConstants.HEADER);
// CMD命名
byteBuffer.putShort((short) 1);
// 回送端口信息
byteBuffer.putInt(LISTEN_PORT);
// 直接构建packet
DatagramPacket requestPacket = new DatagramPacket(byteBuffer.array(),
byteBuffer.position() + 1);
// 广播地址
requestPacket.setAddress(InetAddress.getByName("255.255.255.255"));
// 设置服务器端口
requestPacket.setPort(UDPConstants.PORT_SERVER);
// 发送
ds.send(requestPacket);
ds.close();
// 完成
System.out.println("UDPSearcher sendBroadcast finished.");
}
private static class Listener extends Thread {
private final int listenPort;
private final CountDownLatch startDownLatch;
private final CountDownLatch receiveDownLatch;
private final List<ServerInfo> serverInfoList = new ArrayList<>();
private final byte[] buffer = new byte[128];
private final int minLen = UDPConstants.HEADER.length + 2 + 4;
private boolean done = false;
private DatagramSocket ds = null;
private Listener(int listenPort, CountDownLatch startDownLatch, CountDownLatch receiveDownLatch) {
super();
this.listenPort = listenPort;
this.startDownLatch = startDownLatch;
this.receiveDownLatch = receiveDownLatch;
}
@Override
public void run() {
super.run();
// 通知已启动
startDownLatch.countDown();
try {
// 监听回送端口
ds = new DatagramSocket(listenPort);
// 构建接收实体
DatagramPacket receivePack = new DatagramPacket(buffer, buffer.length);
while (!done) {
// 接收
ds.receive(receivePack);
// 打印接收到的信息与发送者的信息
// 发送者的IP地址
String ip = receivePack.getAddress().getHostAddress();
int port = receivePack.getPort();
int dataLen = receivePack.getLength();
byte[] data = receivePack.getData();
boolean isValid = dataLen >= minLen
&& ByteUtils.startsWith(data, UDPConstants.HEADER);
System.out.println("UDPSearcher receive form ip:" + ip
+ "\tport:" + port + "\tdataValid:" + isValid);
if (!isValid) {
// 无效继续
continue;
}
ByteBuffer byteBuffer = ByteBuffer.wrap(buffer, UDPConstants.HEADER.length, dataLen);
final short cmd = byteBuffer.getShort();
final int serverPort = byteBuffer.getInt();
if (cmd != 2 || serverPort <= 0) {
System.out.println("UDPSearcher receive cmd:" + cmd + "\tserverPort:" + serverPort);
continue;
}
String sn = new String(buffer, minLen, dataLen - minLen);
ServerInfo info = new ServerInfo(serverPort, ip, sn);
serverInfoList.add(info);
// 成功接收到一份
receiveDownLatch.countDown();
}
} catch (Exception ignored) {
} finally {
close();
}
System.out.println("UDPSearcher listener finished.");
}
private void close() {
if (ds != null) {
ds.close();
ds = null;
}
}
List<ServerInfo> getServerAndClose() {
done = true;
close();
return serverInfoList;
}
}
}
服务端
- ClientHandler
package server.handle;
import clink.net.qiujuer.clink.utils.CloseUtils;
import java.io.*;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ClientHandler {
private final Socket socket;
private final ClientReadHandler readHandler;
private final ClientWriteHandler writeHandler;
private final CloseNotify closeNotify;
public ClientHandler(Socket socket, CloseNotify closeNotify) throws IOException {
this.socket = socket;
this.readHandler = new ClientReadHandler(socket.getInputStream());
this.writeHandler = new ClientWriteHandler(socket.getOutputStream());
this.closeNotify = closeNotify;
System.out.println("新客户端连接:" + socket.getInetAddress() +
" P:" + socket.getPort());
}
public void exit() {
readHandler.exit();
writeHandler.exit();
CloseUtils.close(socket);
System.out.println("客户端已退出:" + socket.getInetAddress() +
" P:" + socket.getPort());
}
public void send(String str) {
writeHandler.send(str);
}
public void readToPrint() {
readHandler.start();
}
private void exitBySelf() {
exit();
closeNotify.onSelfClosed(this);
}
public interface CloseNotify {
void onSelfClosed(ClientHandler handler);
}
class ClientReadHandler extends Thread {
private boolean done = false;
private final InputStream inputStream;
ClientReadHandler(InputStream inputStream) {
this.inputStream = inputStream;
}
@Override
public void run() {
super.run();
try {
// 得到输入流,用于接收数据
BufferedReader socketInput = new BufferedReader(new InputStreamReader(inputStream));
do {
// 客户端拿到一条数据
String str = socketInput.readLine();
if (str == null) {
System.out.println("客户端已无法读取数据!");
// 退出当前客户端
ClientHandler.this.exitBySelf();
break;
}
// 打印到屏幕
System.out.println(str);
} while (!done);
} catch (Exception e) {
if (!done) {
System.out.println("连接异常断开");
ClientHandler.this.exitBySelf();
}
} finally {
// 连接关闭
CloseUtils.close(inputStream);
}
}
void exit() {
done = true;
CloseUtils.close(inputStream);
}
}
class ClientWriteHandler {
private boolean done = false;
private final PrintStream printStream;
private final ExecutorService executorService;
ClientWriteHandler(OutputStream outputStream) {
this.printStream = new PrintStream(outputStream);
this.executorService = Executors.newSingleThreadExecutor();
}
void exit() {
done = true;
CloseUtils.close(printStream);
executorService.shutdownNow();
}
void send(String str) {
executorService.execute(new WriteRunnable(str));
}
class WriteRunnable implements Runnable {
private final String msg;
WriteRunnable(String msg) {
this.msg = msg;
}
@Override
public void run() {
if (ClientWriteHandler.this.done) {
return;
}
try {
ClientWriteHandler.this.printStream.println(msg);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
- Server
package server;
import constants.TCPConstants;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
public class Server {
public static void main(String[] args) throws IOException {
TCPServer tcpServer = new TCPServer(TCPConstants.PORT_SERVER);
boolean isSucceed = tcpServer.start();
if (!isSucceed) {
System.out.println("Start TCP server failed!");
return;
}
UDPProvider.start(TCPConstants.PORT_SERVER);
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));
String str;
do {
str = bufferedReader.readLine();
tcpServer.broadcast(str);
} while (!"00bye00".equalsIgnoreCase(str));
UDPProvider.stop();
tcpServer.stop();
}
}
-
package server;
import server.handle.ClientHandler;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;
public class TCPServer {
private final int port;
private ClientListener mListener;
private List<ClientHandler> clientHandlerList = new ArrayList<>();
public TCPServer(int port) {
this.port = port;
}
public boolean start() {
try {
ClientListener listener = new ClientListener(port);
mListener = listener;
listener.start();
} catch (IOException e) {
e.printStackTrace();
return false;
}
return true;
}
public void stop() {
if (mListener != null) {
mListener.exit();
}
for (ClientHandler clientHandler : clientHandlerList) {
clientHandler.exit();
}
clientHandlerList.clear();
}
public void broadcast(String str) {
for (ClientHandler clientHandler : clientHandlerList) {
clientHandler.send(str);
}
}
private class ClientListener extends Thread {
private ServerSocket server;
private boolean done = false;
private ClientListener(int port) throws IOException {
server = new ServerSocket(port);
System.out.println("服务器信息:" + server.getInetAddress() + " P:" + server.getLocalPort());
}
@Override
public void run() {
super.run();
System.out.println("服务器准备就绪~");
// 等待客户端连接
do {
// 得到客户端
Socket client;
try {
client = server.accept();
} catch (IOException e) {
continue;
}
try {
// 客户端构建异步线程
ClientHandler clientHandler = new ClientHandler(client,
handler -> clientHandlerList.remove(handler));
// 读取数据并打印
clientHandler.readToPrint();
clientHandlerList.add(clientHandler);
} catch (IOException e) {
e.printStackTrace();
System.out.println("客户端连接异常:" + e.getMessage());
}
} while (!done);
System.out.println("服务器已关闭!");
}
void exit() {
done = true;
try {
server.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
-
package server;
import clink.net.qiujuer.clink.utils.ByteUtils;
import constants.UDPConstants;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.nio.ByteBuffer;
import java.util.UUID;
class UDPProvider {
private static Provider PROVIDER_INSTANCE;
static void start(int port) {
stop();
String sn = UUID.randomUUID().toString();
Provider provider = new Provider(sn, port);
provider.start();
PROVIDER_INSTANCE = provider;
}
static void stop() {
if (PROVIDER_INSTANCE != null) {
PROVIDER_INSTANCE.exit();
PROVIDER_INSTANCE = null;
}
}
private static class Provider extends Thread {
private final byte[] sn;
private final int port;
private boolean done = false;
private DatagramSocket ds = null;
// 存储消息的Buffer
final byte[] buffer = new byte[128];
Provider(String sn, int port) {
super();
this.sn = sn.getBytes();
this.port = port;
}
@Override
public void run() {
super.run();
System.out.println("UDPProvider Started.");
try {
// 监听20000 端口
ds = new DatagramSocket(UDPConstants.PORT_SERVER);
// 接收消息的Packet
DatagramPacket receivePack = new DatagramPacket(buffer, buffer.length);
while (!done) {
// 接收
ds.receive(receivePack);
// 打印接收到的信息与发送者的信息
// 发送者的IP地址
String clientIp = receivePack.getAddress().getHostAddress();
int clientPort = receivePack.getPort();
int clientDataLen = receivePack.getLength();
byte[] clientData = receivePack.getData();
boolean isValid = clientDataLen >= (UDPConstants.HEADER.length + 2 + 4)
&& ByteUtils.startsWith(clientData, UDPConstants.HEADER);
System.out.println("UDPProvider receive form ip:" + clientIp
+ "\tport:" + clientPort + "\tdataValid:" + isValid);
if (!isValid) {
// 无效继续
continue;
}
// 解析命令与回送端口
int index = UDPConstants.HEADER.length;
short cmd = (short) ((clientData[index++] << 8) | (clientData[index++] & 0xff));
int responsePort = (((clientData[index++]) << 24) |
((clientData[index++] & 0xff) << 16) |
((clientData[index++] & 0xff) << 8) |
((clientData[index] & 0xff)));
// 判断合法性
if (cmd == 1 && responsePort > 0) {
// 构建一份回送数据
ByteBuffer byteBuffer = ByteBuffer.wrap(buffer);
byteBuffer.put(UDPConstants.HEADER);
byteBuffer.putShort((short) 2);
byteBuffer.putInt(port);
byteBuffer.put(sn);
int len = byteBuffer.position();
// 直接根据发送者构建一份回送信息
DatagramPacket responsePacket = new DatagramPacket(buffer,
len,
receivePack.getAddress(),
responsePort);
ds.send(responsePacket);
System.out.println("UDPProvider response to:" + clientIp + "\tport:" + responsePort + "\tdataLen:" + len);
} else {
System.out.println("UDPProvider receive cmd nonsupport; cmd:" + cmd + "\tport:" + port);
}
}
} catch (Exception ignored) {
} finally {
close();
}
// 完成
System.out.println("UDPProvider Finished.");
}
private void close() {
if (ds != null) {
ds.close();
ds = null;
}
}
/**
* 提供结束
*/
void exit() {
done = true;
close();
}
}
}
简易聊天室
- 必要条件:客户端,服务器
- 必要约束:数据传输协议
- 原理:服务器监听消息来源,客户端连接服务器并发送消息到服务器