Java_io体系之PipedInputStream/PipedOutputStream简介、走进源码及示例——06

——管道输出流、必须建立在管道输入流之上、所以先介绍管道输出流。可以先看源码或者总结、总结写的有点Q、不喜可无视、有误的地方指出则不胜感激。

一:PipedOutputStream

1、       类功能简介:

管道字节输出流、用于将当前线程的指定字节写入到与此线程对应的管道输入流中去、所以PipedInputStream(pis)、PipedOutputStream(pos)必须配套使用、缺一不可。管道字节输出流的本质就是调用pis中的方法将字节或者字节数组写入到pis中、这一点是与众不同的地方。所以pos中的方法很少也很简单、主要就是负责将传入的pis与本身绑定、配对使用、然后就是调用绑定的pis的写入方法、将字节或者字节数组写入到pis的缓存字节数组中。

2、       PipedOutputStream API简介:

A:关键字段

    PipedInputStream sink;  与当前管道字节输出流pos配对的管道字节输入流

B:构造方法

 PipedOutputStream(PipedInputStream snk);    将传入的snk与当前pos绑定PipedOutputStream(); 空构造方法、使用之前必须与PipedInputStream绑定

C:一般方法

 synchronized void connect(PipedInputStream snk);    将传入的snk与当前pos绑定void write(byte b) ; 调用绑定的snk.receive(byte b) 将字节b写入snk中void write(byte[] b, int off, int len);      调用绑定的snk.receive(byte[] b, int off, int len)将字节数组b的一部分写入到snk中。synchronized void flush();        将当前流中的字节写入到snk中close(); 关闭当前pos、释放资源、并将snk中标志当前流状态改为关闭状态

3、       源码分析:

package com.chy.io.original.code;import java.io.*;/*** 管道输出流。与管道输入流结合使用、用于线程之间的通信。* @author  andyChen* @version 1.1, 13/11/19*/
public class PipedOutputStream extends OutputStream {// 与PipedOutputStream通信的PipedInputStream对象private PipedInputStream sink;/*** 使用指定的pis来构造pos*/public PipedOutputStream(PipedInputStream snk)  throws IOException {connect(snk);}/*** 空构造函数、在使用之前要绑定与之对应的pis*/public PipedOutputStream() {}/*** 将“管道输出流” 和 “管道输入流”连接。不能进行多次连接、否则会抛异常* 主要工作就是初始化与此pos连接的pis状态。*/public synchronized void connect(PipedInputStream snk) throws IOException {if (snk == null) {throw new NullPointerException();} else if (sink != null || snk.connected) {throw new IOException("Already connected");}sink = snk;snk.in = -1;snk.out = 0;snk.connected = true;}/*** 将int类型b写入“管道输出流”中* 将b写入“管道输出流”之后,它会将b传输给“管道输入流”、因为其本质上就是调用与此pos绑定的pis来写入pis的buffer中*/public void write(int b)  throws IOException {if (sink == null) {throw new IOException("Pipe not connected");}sink.receive(b);}/*** 将字节数组b写入“管道输出流”中* 将数组b写入“管道输出流”之后,它会将其传输给“管道输入流”、同上面方法的原理一样*/public void write(byte b[], int off, int len) throws IOException {if (sink == null) {throw new IOException("Pipe not connected");} else if (b == null) {throw new NullPointerException();} else if ((off < 0) || (off > b.length) || (len < 0) ||((off + len) > b.length) || ((off + len) < 0)) {throw new IndexOutOfBoundsException();} else if (len == 0) {return;} sink.receive(b, off, len);}/*** 刷新此流、实际上是唤醒pis的所有等待的方法、读取、或者通知pos写入字节*/public synchronized void flush() throws IOException {if (sink != null) {synchronized (sink) {sink.notifyAll();}}}/*** 关闭“管道输出流”。告诉pis、与其绑定的pos已关闭、当pis再试图读取字节时就会发现此流关闭、抛出异常结束程序*/public void close()  throws IOException {if (sink != null) {sink.receivedLast();}}
}

4、       实例演示:

因为PipedOutputStream必须与PipedInputStream结合使用、所以将两者的示例放在一起。

二:PipedInputStream

1、       类功能简介:

管道字节输入流、用于读取对应绑定的管道字节输出流写入其内置字节缓存数组buffer中的字节、借此来实现线程之间的通信、pis中专门有两个方法供pos调用、、、receive(byte b)、receive(byte[] b, int off, intlen)、使得pos可以将字节或者字节数组写入pis的buffer中、

2、       PipedInputStream API简介:

A:关键字段

    //用于标记管道输出流是否关闭boolean closedByWriter = false;//用于标记管道输入流是否关闭volatile boolean closedByReader = false;//管道输出流、管道输入流是否关闭boolean connected = false;//读取管道流中数据的线程Thread readSide;//向管道流中写入内容的线程Thread writeSide;//管道的默认的大小private static final int DEFAULT_PIPE_SIZE = 1024;protected static final int PIPE_SIZE = DEFAULT_PIPE_SIZE;//管道输入流的缓冲区protected byte buffer[];//下一次字节在buf中存储的位置、in==out说明数据正好全部被读取完protected int in = -1;//下一个读取的字节protected int out = 0;

B:构造方法

    PipedInputStream(PipedOutputStream src);            使用默认大小的buf、与此管道字节输入流配套的管道字节输出流构造pisPipedInputStream(PipedOutputStream src, int size);       使用指定的缓存数组buffer大小、和与此流绑定的pos构造pisPipedInputStream();        使用默认缓冲大小构造pis、此pis在使用之前必须建立与其相对应的pos。PipedInputStream(int pipeSize);        使用指定缓冲大小构造pis、此pis在使用之前必须建立与其相对应的pos。

核心的方法是第二个!

C:一般方法

    void connect(PipedOutputStream src);    将pis与pos绑定、这里调用的是pos的connect方法、synchronized void receive(byte b);       仅供与当前pis绑定的pos将字节写入到当前pis的buffer中、供当前pis读取synchronized void receive(byte[] b, int off, int len);        仅供与当前pis绑定的pos将字节数组b的一部分写入到当前pis的buffer中、供当前pis读取void receivedLast();   当与此流绑定的pos关闭时、pos调用此方法通知当前pis、输出完毕、并且唤醒当前pis所有正在等待的方法int read();         当前线程调用此pis从buffer中读取一个字节、并以整数形式返回int read(byte[] b,int off, int len);  从管道中读取len个字节放入下标从off开始、的后len个位置中、返回实际读取的字节数synchronized int available();        返回能不受阻塞的、从此流中读取的字节数void close();        关闭此流重置buffer

3、       源码分析:

package com.chy.io.original.code;import java.io.IOException;
import java.io.PipedOutputStream;/*** 管道字节输入流、必须与管道输出流结合使用、用于线程之间的通信。* @author  andyChen* @version 1.1, 13/11/19*/
public class PipedInputStream extends InputStream {//用于标记管道输出流是否关闭boolean closedByWriter = false;//用于标记管道输入流是否关闭volatile boolean closedByReader = false;//管道输出流、管道输入流是否关闭boolean connected = false;//读取管道流中数据的线程Thread readSide;//向管道流中写入内容的线程Thread writeSide;//管道的默认的大小private static final int DEFAULT_PIPE_SIZE = 1024;protected static final int PIPE_SIZE = DEFAULT_PIPE_SIZE;//管道输入流的缓冲区protected byte buffer[];//下一次字节在buf中存储的位置、in==out说明数据正好全部被读取完protected int in = -1;//下一个读取的字节protected int out = 0;/*** 使用默认大小的buf、与此管道字节输入流配套的管道字节输出流构造pis*/public PipedInputStream(PipedOutputStream src) throws IOException {this(src, DEFAULT_PIPE_SIZE);}/*** 使用指定的大小的buf、与此管道字节输入流配套的管道字节输出流构造pis*/public PipedInputStream(PipedOutputStream src, int pipeSize)throws IOException {initPipe(pipeSize);connect(src);}/*** 使用默认缓冲大小构造pis、此pis在使用之前必须建立与其相对应的pos。*/public PipedInputStream() {initPipe(DEFAULT_PIPE_SIZE);}/*** 使用指定缓冲大小构造pis、此pis在使用之前必须建立与其相对应的pos。*/public PipedInputStream(int pipeSize) {initPipe(pipeSize);}//初始化buf大小。private void initPipe(int pipeSize) {if (pipeSize <= 0) {throw new IllegalArgumentException("Pipe Size <= 0");}buffer = new byte[pipeSize];}/*** 将pis与pos绑定、从此方法可以看出、绑定是通过调用pos的connect(pis)来实现的*/public void connect(PipedOutputStream src) throws IOException {src.connect(this);}/*** 接收int类型的数据b。* 它只会在PipedOutputStream的write(int b)中会被调用*/protected synchronized void receive(int b) throws IOException {// 检查管道状态checkStateForReceive();// 获取“写入管道”的线程、用途是在当pis读取buffer中数据时根据此线程是否死亡做出不同处理writeSide = Thread.currentThread();if (in == out)// 若“写入管道”的数据正好全部被读取完,则等待。awaitSpace();if (in < 0) {in = 0;out = 0;}// 将b保存到缓冲区buffer[in++] = (byte)(b & 0xFF);if (in >= buffer.length) {in = 0;}}/*** 接收字节数组b的一部分*/synchronized void receive(byte b[], int off, int len)  throws IOException {// 检查管道状态checkStateForReceive();// 获取“写入管道”的线程writeSide = Thread.currentThread();int bytesToTransfer = len;while (bytesToTransfer > 0) {// 若“写入管道”的数据正好全部被读取完,则等待。if (in == out)awaitSpace();//下一次被写入buff中的字节数int nextTransferAmount = 0;// 如果管道中被读取的数据,少于写入管道的数据;// 则设置nextTransferAmount=buffer.length - inif (out < in) {nextTransferAmount = buffer.length - in;} else if (in < out) {// 如果“管道中被读取的数据,大于/等于写入管道的数据”,则执行后面的操作// 若in==-1即buffer中没有数据,此时初始化in、out并且nextTransferAmount = buffer.length - in;// 否则,nextTransferAmount = out - in;if (in == -1) {in = out = 0;nextTransferAmount = buffer.length - in;} else {nextTransferAmount = out - in;}}if (nextTransferAmount > bytesToTransfer)nextTransferAmount = bytesToTransfer;// assert断言的作用是,若nextTransferAmount <= 0,则终止程序。一般用于不准备通过捕获异常来处理的错误。assert nextTransferAmount > 0;System.arraycopy(b, off, buffer, in, nextTransferAmount);bytesToTransfer -= nextTransferAmount;off += nextTransferAmount;in += nextTransferAmount;if (in >= buffer.length) {in = 0;}}}/*** 检测管道状态、如果没有连接、或者pos、pis有一方关闭、或者接收方线程死亡则抛出异常、结束程序*/private void checkStateForReceive() throws IOException {if (!connected) {throw new IOException("Pipe not connected");} else if (closedByWriter || closedByReader) {throw new IOException("Pipe closed");} else if (readSide != null && !readSide.isAlive()) {throw new IOException("Read end dead");}}/*** 等待。* 若“写入管道”的数据正好全部被读取完(例如,管道缓冲满),则执行awaitSpace()操作;* 它的目的是让“读取管道的线程”管道产生读取数据请求,从而才能继续的向“管道”中写入数据。*/private void awaitSpace() throws IOException {// 如果“管道中被读取的数据,等于写入管道的数据”时,// 则每隔1000ms检查“管道状态”,并唤醒管道操作:若有“读取管道数据线程被阻塞”,则唤醒该线程。while (in == out) {checkStateForReceive();/* full: kick any waiting readers */notifyAll();try {wait(1000);} catch (InterruptedException ex) {throw new java.io.InterruptedIOException();}}}/*** 当PipedOutputStream被关闭时,被调用*/synchronized void receivedLast() {closedByWriter = true;notifyAll();}/*** 从管道的缓冲中读取一个字节,并将其转换成int类型*/public synchronized int read()  throws IOException {if (!connected) {throw new IOException("Pipe not connected");} else if (closedByReader) {throw new IOException("Pipe closed");} else if (writeSide != null && !writeSide.isAlive()&& !closedByWriter && (in < 0)) {throw new IOException("Write end dead");}readSide = Thread.currentThread();int trials = 2;while (in < 0) {if (closedByWriter) {/* closed by writer, return EOF */return -1;}if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) {throw new IOException("Pipe broken");}/* might be a writer waiting */notifyAll();try {wait(1000);} catch (InterruptedException ex) {throw new java.io.InterruptedIOException();}}int ret = buffer[out++] & 0xFF;if (out >= buffer.length) {out = 0;}if (in == out) {/* now empty */in = -1;}return ret;}/*** 从管道中读取len个字节放入下标从off开始、的后len个位置中*/public synchronized int read(byte b[], int off, int len)  throws IOException {if (b == null) {throw new NullPointerException();} else if (off < 0 || len < 0 || len > b.length - off) {throw new IndexOutOfBoundsException();} else if (len == 0) {return 0;}/* possibly wait on the first character */int c = read();if (c < 0) {return -1;}b[off] = (byte) c;int rlen = 1;while ((in >= 0) && (len > 1)) {int available; if (in > out) {available = Math.min((buffer.length - out), (in - out));} else {available = buffer.length - out;}// A byte is read beforehand outside the loopif (available > (len - 1)) {available = len - 1;}System.arraycopy(buffer, out, b, off + rlen, available);out += available;rlen += available; len -= available;if (out >= buffer.length) {out = 0;}if (in == out) {/* now empty */in = -1;}}return rlen;}/*** 返回能不受阻塞的、从此流中读取的字节数*/public synchronized int available() throws IOException {if(in < 0)return 0;else if(in == out)return buffer.length;else if (in > out)return in - out;elsereturn in + buffer.length - out;}/*** 关闭此流*/public void close()  throws IOException {closedByReader = true;synchronized (this) {in = -1;}}
}

4、       实例演示:

一般情况下用于两个线程之间的通信、那么就先分别建立两个线程、一个输出者、内部持有PipedOutputStream对象的引用、用来向接收者发送字节信息;一个接收者、内部持有PipedInputStream对象的引用、用来接收发送者发送的字节信息、最后通过测试类将两者结合起来、实现线程之间的通信。SenderThread:很简单、就是三个私有方法分别将一个字节、一个较短的字节(小于1024、因为PipedinputStream中buffer默认大小为1024)一个长字节(超过1024)看看有什么不一样的地方。

package com.chy.io.original.thread;import java.io.IOException;
import java.io.PipedOutputStream;public class SenderThread implements Runnable {private PipedOutputStream pos = new PipedOutputStream();public PipedOutputStream getPipedOutputStream(){return pos;}@Overridepublic void run() {//sendOneByte();//sendShortMessage();sendLongMessage();}private void sendOneByte(){try {pos.write(0x61);pos.close();} catch (IOException e) {e.printStackTrace();}}private void sendShortMessage() {try {pos.write("this is a short message from senderThread !".getBytes());pos.flush();pos.close();} catch (IOException e) {e.printStackTrace();}}private void sendLongMessage(){try {byte[] b = new byte[1028];//生成一个长度为1028的字节数组、前1020个是1、后8个是2。for(int i=0; i<1020; i++){b[i] = 1;}for (int i = 1020; i <1028; i++) {b[i] = 2;}pos.write(b);pos.close();} catch (IOException e) {e.printStackTrace();}}
}

ReceiverThread: 也很简单、就是接收上面写入buffer的字节、也有三个方法、

package com.chy.io.original.thread;import java.io.IOException;
import java.io.PipedInputStream;public class ReceiverThread extends Thread {private PipedInputStream pis = new PipedInputStream();public PipedInputStream getPipedInputStream(){return pis;}@Overridepublic void run() {//receiveOneByte();//receiveShortMessage();receiverLongMessage();}private void receiveOneByte(){try {int n = pis.read();System.out.println(n);pis.close();} catch (IOException e) {e.printStackTrace();}}private void receiveShortMessage() {try {byte[] b = new byte[1024];int n = pis.read(b);System.out.println(new String(b, 0, n));pis.close();} catch (IOException e) {e.printStackTrace();}}private void receiverLongMessage(){try {byte[] b = new byte[2048];int count = 0;while(true){count = pis.read(b); for (int i = 0; i < count; i++) {System.out.print(b[i]);}if(count == -1)break;}pis.close();} catch (IOException e) {e.printStackTrace();}}
}

测试类:PipedStreamTest 、 将ReceiverThread、SenderThread两线程通过管道流结合起来、查看通信效果

package com.chy.io.original.test;
import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;import com.chy.io.original.thread.Receiver;
import com.chy.io.original.thread.ReceiverThread;
import com.chy.io.original.thread.Sender;
import com.chy.io.original.thread.SenderThread;@SuppressWarnings("all")
/**  * 管道输入流和管道输出流的交互程序*/
public class PipedStreamTest {   public static void main(String[] args) {   testPipedStream();}private static void testPipedStream(){SenderThread st =new SenderThread();ReceiverThread rt = new ReceiverThread();PipedInputStream pis = rt.getPipedInputStream();PipedOutputStream pos = st.getPipedOutputStream();try {pos.connect(pis);new Thread(st).start();rt.start();} catch (IOException e) {e.printStackTrace();}}private static void exampleFromIE() {Sender t1 = new Sender();   Receiver t2 = new Receiver();   PipedOutputStream out = t1.getOutputStream();   PipedInputStream in = t2.getInputStream();   try {   //管道连接。下面2句话的本质是一样。//out.connect(in);   in.connect(out);   /**  * Thread类的START方法:  * 使该线程开始执行;Java 虚拟机调用该线程的 run 方法。   * 结果是两个线程并发地运行;当前线程(从调用返回给 start 方法)和另一个线程(执行其 run 方法)。   * 多次启动一个线程是非法的。特别是当线程已经结束执行后,不能再重新启动。   */t1.start();t2.start();} catch (IOException e) {e.printStackTrace();}}
}

结果说明:

前面两个写入字节、写入short字节数组都好理解、就是写入多少读取多少、后面那个写入长字节数组、接收线程用了一个while(true)循环读取buffer中的字节、直到读完、如果不用循环则只会读取到1024个字节、因为buffer的默认大小就1024、pos一次只能写入1024、pis也同样一次只能读取1024个、当用while时、第一次读取pos写入的1020个之后、pis再读取就会发现buffer空了、此时接收方线程就会等待并且通知发送方写入字节、然后继续进行。

总结

PipedInputStream、PipedOutputStream两者的结合如鸳鸯一般、离开哪一方都不能继续存在、同时又如连理枝一般、PipedOutputStream先通过connect(PipedInputStream snk)来确定关系、并初始化PipedInputStream状态、告诉PipedInputStream只能属于这个PipedOutputStream、connect =true、当想赠与PipedInputStream字节时、就直接调用receive(byte b) 、receive(byte[] b, int off, int len)来将字节或者字节数组放入pis的存折buffer中。站在PipedInputStream角度上、看上哪个PipedOutputStream时就暗示pos、将主动权交给pos、调用pos的connect将自己给他去登记。当想要花(将字节读取到程序中)字节了就从buffer中拿、但是自己又没有本事挣字节、所以当buffer中没有字节时、自己就等着、并且跟pos讲没有字节了、pos就会向存折(buffer)中存字节、当然、pos不会一直不断往里存、当存折是空的时候也不会主动存、怕花冒、就等着pis要。要才存。过到最后两个只通过buffer来知道对方的存在与否、每次从buffer中存或者取字节时都会看看对方是否安康、若安好则继续生活、若一方不在、则另一方也不愿独存!

更多IO内容:java_io 体系之目录

Java_io体系之PipedInputStream、PipedOutputStream简介、走进源码及示例——06相关推荐

  1. Java_io体系之BufferedWriter、BufferedReader简介、走进源码及示例——16

    Java_io体系之BufferedWriter.BufferedReader简介.走进源码及示例--16 一:BufferedWriter 1.类功能简介: BufferedWriter.缓存字符输 ...

  2. Java_io体系之RandomAccessFile简介、走进源码及示例——20

    Java_io体系之RandomAccessFile简介.走进源码及示例--20 RandomAccessFile 1.       类功能简介: 文件随机访问流.关心几个特点: 1.他实现的接口不再 ...

  3. Java_io体系之CharArrayReader、CharArrayWriter简介、走进源码及示例——13

    转载自   Java_io体系之CharArrayReader.CharArrayWriter简介.走进源码及示例--13 一:CharArrayReader 1.类功能简介: 字符数组输入流car  ...

  4. Java_io体系之PipedWriter、PipedReader简介、走进源码及示例——14

    Java_io体系之PipedWriter.PipedReader简介.走进源码及示例--14 --管道字符输出流.必须建立在管道输入流之上.所以先介绍管道字符输出流.可以先看示例或者总结.总结写的有 ...

  5. linux内核体系学习路径_Linux内核分析(一)linux体系简介|内核源码简介|内核配置编译安装...

    从本篇博文开始我将对linux内核进行学习和分析,整个过程必将十分艰辛,但我会坚持到底,同时在博文中如果那些地方有问题还请各位大神为我讲解. 今天我们会分析到以下内容: 1. Linux体系结构简介 ...

  6. ThreadLocal 简介 案例 源码分析 MD

    Markdown版本笔记 我的GitHub首页 我的博客 我的微信 我的邮箱 MyAndroidBlogs baiqiantao baiqiantao bqt20094 baiqiantao@sina ...

  7. Java生鲜电商平台-电商会员体系系统的架构设计与源码解析

    Java生鲜电商平台-电商会员体系系统的架构设计与源码解析 说明:Java生鲜电商平台中会员体系作为电商平台的基础设施,重要性不容忽视.我去年整理过生鲜电商中的会员系统,但是比较粗,现在做一个最好的整 ...

  8. 编译器 llvm clang 源码转换示例

    编译器 llvm clang 源码转换示例 从git获取llvm项目的源码方式: git clone https://github.com/llvm/llvm-project.git 下载源码后,进入 ...

  9. H5音乐播放器(包含源码与示例)

    H5音乐播放器(包含源码与示例) 基于Angular+ionic的H5音乐播放器,源码:https://gitee.com/CrimsonHu/h5-music-player 示例地址 建议使用原版c ...

最新文章

  1. linux open()调用的注意事项
  2. Java多线程初学者指南(10):使用Synchronized关键字同步类方法
  3. 如何利用 Arthas 热更新线上代码
  4. cannot connect to vCenter Single Sign-on server...
  5. 当AI渗透到财务管理 未来人机协作机器人有望“独当一面”
  6. locker server启动
  7. Linux 自动化运维工具 ansible
  8. Linux下coredump调试2:实例
  9. 工作 10 年,月薪过万者不足三成,程序员却笑了!
  10. mysql安全模式什么意思_mysql安全模式指的是什么意思
  11. python控制代码使用的gpu
  12. 兄弟HL2130打印机清零方法(绝对可用)
  13. python 3d模型制作软件_最好用的9款3D建模软件
  14. 你要小心那些涉世不深的老实人
  15. 简易搜索引擎原理与基于Hadoop MapReduce的搜索引擎实现
  16. 光场相机微透镜阵列排布方式以及其填充率比较
  17. aviary android sdk,Make Photo Magic 使用Aviary SDK 进行图片编辑开发,功能非常强大!!!!(Aviary SDK安装说明)...
  18. EXCEL-VBA:EXCEL的各种文件名获取
  19. VR/AR工作原理、目前存在的技术问题
  20. 经典的三角形软件测试用例

热门文章

  1. 电脑自动弹出命令窗口问题处理(定时任务)
  2. 云闪解绑实名认证_闪耀暖暖防沉迷怎么解除 闪耀暖暖防沉迷是几个小时
  3. 3d 和4d 印刷电子的应用是什么?
  4. SIP Inspector测试语音
  5. matlab中system prt,MATLAB自动驾驶工具箱的简单使用
  6. 回首跌撞中前行,痛并快乐的2011
  7. 暗黑起源服务器维护,[端游]传奇暗黑起源三职业恐怖的黎明VA1.0版本_雕文系统_暗黑重铸...
  8. 软件大战升级,通用汽车与Qt达成合作,增强车内体验
  9. css hover改变svg颜色
  10. 6. Python的数据类型④——元组