Java读写hdfs上的avro文件

1、通过Java往hdfs写avro文件

import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;

import org.apache.avro.Schema;
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;

public class HdfsAvroTest {

    public static final String SCHEMA_JSON = "{"type": "record","name": "SmallFilesTest", "
            + ""fields": ["
            + "{"name":""
            + "username"
            + "","type":"string"},"
            + "{"name":""
            + "password"
            + "", "type":"string"}]}";
    public static final Schema SCHEMA = new Schema.Parser().parse(SCHEMA_JSON);

    public static void writeToAvro(File srcPath, OutputStream outputStream)
            throws IOException {
        DataFileWriter<Object> writer = new DataFileWriter<Object>(
                new GenericDatumWriter<Object>()).setSyncInterval(100);
        writer.setCodec(CodecFactory.snappyCodec());
        writer.create(SCHEMA, outputStream);
        for (Object obj : FileUtils.listFiles(srcPath, null, false)) {
            File file = (File) obj;
            String filename = file.getAbsolutePath();
            byte content[] = FileUtils.readFileToByteArray(file);
            GenericRecord record = new GenericData.Record(SCHEMA);
            record.put("username", filename);
            record.put("password", ByteBuffer.wrap(content));
            writer.append(record);
        }
        IOUtils.cleanup(null, writer);
        IOUtils.cleanup(null, outputStream);
    }

    public static void main(String[] args) throws Exception {
        Configuration config = new Configuration();
        FileSystem hdfs = FileSystem.get(config);
        File sourceDir = new File(args[0]);
        Path destFile = new Path(args[1]);
        OutputStream os = hdfs.create(destFile);
        writeToAvro(sourceDir, os);
    }
}

2、Java读hdfs上的avro文件

import java.io.IOException;
import java.io.InputStream;

import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;

public class HdfsReadAvro {


    public static void readFromAvro(InputStream is) throws IOException {
        DataFileStream<Object> reader = new DataFileStream<Object>(is,
                new GenericDatumReader<Object>());
        for (Object o : reader) {
            GenericRecord r = (GenericRecord) o;
            System.out.println(r.get("username")+ ":"+r.get("password"));
        }
        IOUtils.cleanup(null, is);
        IOUtils.cleanup(null, reader);
    }

    public static void main(String[] args) throws Exception {
        Configuration config = new Configuration();
        FileSystem hdfs = FileSystem.get(config);
        Path destFile = new Path(args[0]);
        InputStream is = hdfs.open(destFile);
        readFromAvro(is);
    }
}
原文地址:https://www.cnblogs.com/caiwuzi/p/12867085.html