use PipedInputStream and PipedOutputStream

解决方案 »

  1.   

    很长的例子,不过结构清晰(摘于Core Java 卷2)1. import java.util.*;
      2. import java.io.*;
      3.
      4. /**
      5.    This program demonstrates how multiple threads communicate
      6.    through pipes.
      7. */
      8. public class PipeTest
      9. {
     10.    public static void main(String args[])
     11.    {
     12.       try
     13.       {
     14.          /* set up pipes */
     15.          PipedOutputStream pout1 = new PipedOutputStream();
     16.          PipedInputStream pin1 = new PipedInputStream(pout1);
     17.
     18.          PipedOutputStream pout2 = new PipedOutputStream();
     19.          PipedInputStream pin2 = new PipedInputStream(pout2);
     20.
     21.          /* construct threads */
     22.
     23.          Producer prod = new Producer(pout1);
     24.          Filter filt = new Filter(pin1, pout2);
     25.          Consumer cons = new Consumer(pin2);
     26.
     27.          /* start threads */
     28.
     29.          prod.start();
     30.          filt.start();
     31.          cons.start();
     32.       }
     33.       catch (IOException e){}
     34.    }
     35. }
     36.
     37. /**
     38.    A thread that writes random numbers to an output stream.
     39. */
     40. class Producer extends Thread
     41. {
     42.    /**
     43.       Constructs a producer thread.
     44.       @param os the output stream
     45.    */
     46.    public Producer(OutputStream os)
     47.    {
     48.       out = new DataOutputStream(os);
     49.    }
     50.
     51.    public void run()
     52.    {
     53.       while (true)
     54.       {
     55.          try
     56.          {
     57.             double num = rand.nextDouble();
     58.             out.writeDouble(num);
     59.             out.flush();
     60.             sleep(Math.abs(rand.nextInt() % 1000));
     61.          }
     62.          catch(Exception e)
     63.          {
     64.             System.out.println("Error: " + e);
     65.          }
     66.       }
     67.    }
     68.
     69.    private DataOutputStream out;
     70.    private Random rand = new Random();
     71. }
     72.
     73. /**
     74.    A thread that reads numbers from a stream and writes their
     75.    average to an output stream.
     76. */
     77. class Filter extends Thread
     78. {
     79.    /**
     80.       Constructs a filter thread.
     81.       @param is the output stream
     82.       @param os the output stream
     83.    */
     84.    public Filter(InputStream is, OutputStream os)
     85.    {
     86.       in = new DataInputStream(is);
     87.       out = new DataOutputStream(os);
     88.    }
     89.
     90.    public void run()
     91.    {
     92.       for (;;)
     93.       {
     94.          try
     95.          {
     96.             double x = in.readDouble();
     97.             total += x;
     98.             count++;
     99.             if (count != 0) out.writeDouble(total / count);
    100.          }
    101.          catch(IOException e)
    102.          {
    103.             System.out.println("Error: " + e);
    104.          }
    105.       }
    106.    }
    107.
    108.    private DataInputStream in;
    109.    private DataOutputStream out;
    110.    private double total = 0;
    111.    private int count = 0;
    112. }
    113.
    114. /**
    115.    A thread that reads numbers from a stream and
    116.    prints out those that deviate from previous inputs
    117.    by a threshold value.
    118. */
    119. class Consumer extends Thread
    120. {
    121.    /**
    122.       Constructs a consumer thread.
    123.       @param is the input stream
    124.    */
    125.    public Consumer(InputStream is)
    126.    {
    127.       in = new DataInputStream(is);
    128.    }
    129.
    130.    public void run()
    131.    {
    132.       for(;;)
    133.       {
    134.          try
    135.          {
    136.             double x = in.readDouble();
    137.             if (Math.abs(x - oldx) > THRESHOLD)
    138.             {
    139.                System.out.println(x);
    140.                oldx = x;
    141.             }
    142.          }
    143.          catch(IOException e)
    144.          {
    145.             System.out.println("Error: " + e);
    146.          }
    147.       }
    148.    }
    149.
    150.    private double oldx = 0;
    151.    private DataInputStream in;
    152.    private static final double THRESHOLD = 0.01;
    153. }