fastavro - Man Page
Name
fastavro — fastavro Documentation
The current Python avro package is dog slow.
On a test case of about 10K records, it takes about 14sec to iterate over all of them. In comparison the JAVA avro SDK does it in about 1.9sec.
fastavro is an alternative implementation that is much faster. It iterates over the same 10K records in 2.9sec, and if you use it with PyPy it’ll do it in 1.5sec (to be fair, the JAVA benchmark is doing some extra JSON encoding/decoding).
If the optional C extension (generated by Cython) is available, then fastavro will be even faster. For the same 10K records it’ll run in about 1.7sec.
Supported Features
- File Writer
- File Reader (iterating via records or blocks)
- Schemaless Writer
- Schemaless Reader
- JSON Writer
- JSON Reader
- Codecs (Snappy, Deflate, Zstandard, Bzip2, LZ4, XZ)
- Schema resolution
- Aliases
- Logical Types
Missing Features
- Anything involving Avro’s RPC features
- Parsing schemas into the canonical form
- Schema fingerprinting
Example
from fastavro import writer, reader, parse_schema schema = { 'doc': 'A weather reading.', 'name': 'Weather', 'namespace': 'test', 'type': 'record', 'fields': [ {'name': 'station', 'type': 'string'}, {'name': 'time', 'type': 'long'}, {'name': 'temp', 'type': 'int'}, ], } parsed_schema = parse_schema(schema) # 'records' can be an iterable (including generator) records = [ {u'station': u'011990-99999', u'temp': 0, u'time': 1433269388}, {u'station': u'011990-99999', u'temp': 22, u'time': 1433270389}, {u'station': u'011990-99999', u'temp': -11, u'time': 1433273379}, {u'station': u'012650-99999', u'temp': 111, u'time': 1433275478}, ] # Writing with open('weather.avro', 'wb') as out: writer(out, parsed_schema, records) # Reading with open('weather.avro', 'rb') as fo: for record in reader(fo): print(record)
Documentation
fastavro.read
- class reader(fo, reader_schema=None, return_record_name=False)
Iterator over records in an avro file.
- Parameters
- fo (file-like) – Input stream
- reader_schema (dict, optional) – Reader schema
- return_record_name (bool, optional) – If true, when reading a union of records, the result will be a tuple where the first value is the name of the record and the second value is the record itself
Example:
from fastavro import reader with open('some-file.avro', 'rb') as fo: avro_reader = reader(fo) for record in avro_reader: process_record(record)
The fo argument is a file-like object so another common example usage would use an io.BytesIO object like so:
from io import BytesIO from fastavro import writer, reader fo = BytesIO() writer(fo, schema, records) fo.seek(0) for record in reader(fo): process_record(record)
- metadata
Key-value pairs in the header metadata
- codec
The codec used when writing
- writer_schema
The schema used when writing
- reader_schema
The schema used when reading (if provided)
- class block_reader(fo, reader_schema=None, return_record_name=False)
Iterator over Block in an avro file.
- Parameters
- fo (file-like) – Input stream
- reader_schema (dict, optional) – Reader schema
- return_record_name (bool, optional) – If true, when reading a union of records, the result will be a tuple where the first value is the name of the record and the second value is the record itself
Example:
from fastavro import block_reader with open('some-file.avro', 'rb') as fo: avro_reader = block_reader(fo) for block in avro_reader: process_block(block)
- metadata
Key-value pairs in the header metadata
- codec
The codec used when writing
- writer_schema
The schema used when writing
- reader_schema
The schema used when reading (if provided)
- class Block(bytes_, num_records, codec, reader_schema, writer_schema, offset, size, return_record_name=False)
An avro block. Will yield records when iterated over
- num_records
Number of records in the block
- writer_schema
The schema used when writing
- reader_schema
The schema used when reading (if provided)
- offset
Offset of the block from the begining of the avro file
- size
Size of the block in bytes
- schemaless_reader(fo, writer_schema, reader_schema=None, return_record_name=False)
Reads a single record writen using the schemaless_writer()
- Parameters
- fo (file-like) – Input stream
- writer_schema (dict) – Schema used when calling schemaless_writer
- reader_schema (dict, optional) – If the schema has changed since being written then the new schema can be given to allow for schema migration
- return_record_name (bool, optional) – If true, when reading a union of records, the result will be a tuple where the first value is the name of the record and the second value is the record itself
Example:
parsed_schema = fastavro.parse_schema(schema) with open('file.avro', 'rb') as fp: record = fastavro.schemaless_reader(fp, parsed_schema)
Note: The schemaless_reader can only read a single record.
- is_avro(path_or_buffer)
Return True if path (or buffer) points to an Avro file.
- Parameters
path_or_buffer (path to file or file-like object) – Path to file
fastavro.write
- writer(fo, schema, records, codec='null', sync_interval=16000, metadata=None, validator=None, sync_marker=None, codec_compression_level=None)
Write records to fo (stream) according to schema
- Parameters
- fo (file-like) – Output stream
- schema (dict) – Writer schema
- records (iterable) – Records to write. This is commonly a list of the dictionary representation of the records, but it can be any iterable
- codec (string, optional) – Compression codec, can be ‘null’, ‘deflate’ or ‘snappy’ (if installed)
- sync_interval (int, optional) – Size of sync interval
- metadata (dict, optional) – Header metadata
- validator (None, True or a function) – Validator function. If None (the default) - no validation. If True then then fastavro.validation.validate will be used. If it’s a function, it should have the same signature as fastavro.writer.validate and raise an exeption on error.
- sync_marker (bytes, optional) – A byte string used as the avro sync marker. If not provided, a random byte string will be used.
- codec_compression_level (int, optional) – Compression level to use with the specified codec (if the codec supports it)
Example:
from fastavro import writer, parse_schema schema = { 'doc': 'A weather reading.', 'name': 'Weather', 'namespace': 'test', 'type': 'record', 'fields': [ {'name': 'station', 'type': 'string'}, {'name': 'time', 'type': 'long'}, {'name': 'temp', 'type': 'int'}, ], } parsed_schema = parse_schema(schema) records = [ {u'station': u'011990-99999', u'temp': 0, u'time': 1433269388}, {u'station': u'011990-99999', u'temp': 22, u'time': 1433270389}, {u'station': u'011990-99999', u'temp': -11, u'time': 1433273379}, {u'station': u'012650-99999', u'temp': 111, u'time': 1433275478}, ] with open('weather.avro', 'wb') as out: writer(out, parsed_schema, records)
The fo argument is a file-like object so another common example usage would use an io.BytesIO object like so:
from io import BytesIO from fastavro import writer fo = BytesIO() writer(fo, schema, records)
Given an existing avro file, it’s possible to append to it by re-opening the file in a+b mode. If the file is only opened in ab mode, we aren’t able to read some of the existing header information and an error will be raised. For example:
# Write initial records with open('weather.avro', 'wb') as out: writer(out, parsed_schema, records) # Write some more records with open('weather.avro', 'a+b') as out: writer(out, parsed_schema, more_records)
- schemaless_writer(fo, schema, record)
Write a single record without the schema or header information
- Parameters
- fo (file-like) – Output file
- schema (dict) – Schema
- record (dict) – Record to write
Example:
parsed_schema = fastavro.parse_schema(schema) with open('file.avro', 'rb') as fp: fastavro.schemaless_writer(fp, parsed_schema, record)
Note: The schemaless_writer can only write a single record.
Using the tuple notation to specify which branch of a union to take
Since this library uses plain dictionaries to reprensent a record, it is possible for that dictionary to fit the definition of two different records.
For example, given a dictionary like this:
{"name": "My Name"}
It would be valid against both of these records:
child_schema = { "name": "Child", "type": "record", "fields": [ {"name": "name", "type": "string"}, {"name": "favorite_color", "type": ["null", "string"]}, ] } pet_schema = { "name": "Pet", "type": "record", "fields": [ {"name": "name", "type": "string"}, {"name": "favorite_toy", "type": ["null", "string"]}, ] }
This becomes a problem when a schema contains a union of these two similar records as it is not clear which record the dictionary represents. For example, if you used the previous dictionary with the following schema, it wouldn’t be clear if the record should be serialized as a Child or a
`
Pet:
household_schema = { "name": "Household", "type": "record", "fields": [ {"name": "address", "type": "string"}, { "name": "family_members", "type": { "type": "array", "items": [ { "name": "Child", "type": "record", "fields": [ {"name": "name", "type": "string"}, {"name": "favorite_color", "type": ["null", "string"]}, ] }, { "name": "Pet", "type": "record", "fields": [ {"name": "name", "type": "string"}, {"name": "favorite_toy", "type": ["null", "string"]}, ] } ] } }, ] }
To resolve this, you can use a tuple notation where the first value of the tuple is the fully namespaced record name and the second value is the dictionary. For example:
records = [ { "address": "123 Drive Street", "family_members": [ ("Child", {"name": "Son"}), ("Child", {"name": "Daughter"}), ("Pet", {"name": "Dog"}), ] } ]
fastavro.json_read
- json_reader(fo, schema)
Iterator over records in an avro json file.
- Parameters
- fo (file-like) – Input stream
- reader_schema (dict) – Reader schema
Example:
from fastavro import json_reader schema = { 'doc': 'A weather reading.', 'name': 'Weather', 'namespace': 'test', 'type': 'record', 'fields': [ {'name': 'station', 'type': 'string'}, {'name': 'time', 'type': 'long'}, {'name': 'temp', 'type': 'int'}, ] } with open('some-file', 'r') as fo: avro_reader = json_reader(fo, schema) for record in avro_reader: print(record)
fastavro.json_write
- json_writer(fo, schema, records)
Write records to fo (stream) according to schema
- Parameters
- fo (file-like) – Output stream
- schema (dict) – Writer schema
- records (iterable) – Records to write. This is commonly a list of the dictionary representation of the records, but it can be any iterable
Example:
from fastavro import json_writer, parse_schema schema = { 'doc': 'A weather reading.', 'name': 'Weather', 'namespace': 'test', 'type': 'record', 'fields': [ {'name': 'station', 'type': 'string'}, {'name': 'time', 'type': 'long'}, {'name': 'temp', 'type': 'int'}, ], } parsed_schema = parse_schema(schema) records = [ {u'station': u'011990-99999', u'temp': 0, u'time': 1433269388}, {u'station': u'011990-99999', u'temp': 22, u'time': 1433270389}, {u'station': u'011990-99999', u'temp': -11, u'time': 1433273379}, {u'station': u'012650-99999', u'temp': 111, u'time': 1433275478}, ] with open('some-file', 'w') as out: json_writer(out, parsed_schema, records)
fastavro.schema
- parse_schema(schema, expand=False, _write_hint=True, _force=False)
Returns a parsed avro schema
It is not necessary to call parse_schema but doing so and saving the parsed schema for use later will make future operations faster as the schema will not need to be reparsed.
- Parameters
- schema (dict) – Input schema
expand (bool) –
NOTE: This option should be considered a keyword only argument and may get enforced as such when Python 2 support is dropped.
If true, named schemas will be fully expanded to their true schemas rather than being represented as just the name. This format should be considered an output only and not passed in to other reader/writer functions as it does not conform to the avro specification and will likely cause an exception
- _write_hint (bool) – Internal API argument specifying whether or not the __fastavro_parsed marker should be added to the schema
- _force (bool) – Internal API argument. If True, the schema will always be parsed even if it has been parsed and has the __fastavro_parsed marker
Example:
from fastavro import parse_schema from fastavro import writer parsed_schema = parse_schema(original_schema) with open('weather.avro', 'wb') as out: writer(out, parsed_schema, records)
- fullname(schema)
Returns the fullname of a schema
- Parameters
schema (dict) – Input schema
Example:
from fastavro.schema import fullname schema = { 'doc': 'A weather reading.', 'name': 'Weather', 'namespace': 'test', 'type': 'record', 'fields': [ {'name': 'station', 'type': 'string'}, {'name': 'time', 'type': 'long'}, {'name': 'temp', 'type': 'int'}, ], } fname = fullname(schema) assert fname == "test.Weather"
- expand_schema(schema)
Returns a schema where all named types are expanded to their real schema
NOTE: The output of this function produces a schema that can include multiple definitions of the same named type (as per design) which are not valid per the avro specification. Therefore, the output of this should not be passed to the normal writer/reader functions as it will likely result in an error.
- Parameters
schema (dict) – Input schema
Example:
from fastavro.schema import expand_schema original_schema = { "name": "MasterSchema", "namespace": "com.namespace.master", "type": "record", "fields": [{ "name": "field_1", "type": { "name": "Dependency", "namespace": "com.namespace.dependencies", "type": "record", "fields": [ {"name": "sub_field_1", "type": "string"} ] } }, { "name": "field_2", "type": "com.namespace.dependencies.Dependency" }] } expanded_schema = expand_schema(original_schema) assert expanded_schema == { "name": "com.namespace.master.MasterSchema", "type": "record", "fields": [{ "name": "field_1", "type": { "name": "com.namespace.dependencies.Dependency", "type": "record", "fields": [ {"name": "sub_field_1", "type": "string"} ] } }, { "name": "field_2", "type": { "name": "com.namespace.dependencies.Dependency", "type": "record", "fields": [ {"name": "sub_field_1", "type": "string"} ] } }] }
fastavro.validation
- validate(datum, schema, field=None, raise_errors=True)
Determine if a python datum is an instance of a schema.
- Parameters
- datum (Any) – Data being validated
- schema (dict) – Schema
- field (str, optional) – Record field being validated
- raise_errors (bool, optional) – If true, errors are raised for invalid data. If false, a simple True (valid) or False (invalid) result is returned
Example:
from fastavro.validation import validate schema = {...} record = {...} validate(record, schema)
- validate_many(records, schema, raise_errors=True)
Validate a list of data!
- Parameters
- records (iterable) – List of records to validate
- schema (dict) – Schema
- raise_errors (bool, optional) – If true, errors are raised for invalid data. If false, a simple True (valid) or False (invalid) result is returned
Example:
from fastavro.validation import validate_many schema = {...} records = [{...}, {...}, ...] validate_many(records, schema)
fastavro command line script
A command line script is installed with the library that can be used to dump the contents of avro file(s) to the standard output.
Usage:
usage: fastavro [-h] [--schema] [--codecs] [--version] [-p] [file [file ...]] iter over avro file, emit records as JSON positional arguments: file file(s) to parse optional arguments: -h, --help show this help message and exit --schema dump schema instead of records --codecs print supported codecs --version show program's version number and exit -p, --pretty pretty print json
Examples
Read an avro file:
$ fastavro weather.avro {"temp": 0, "station": "011990-99999", "time": -619524000000} {"temp": 22, "station": "011990-99999", "time": -619506000000} {"temp": -11, "station": "011990-99999", "time": -619484400000} {"temp": 111, "station": "012650-99999", "time": -655531200000} {"temp": 78, "station": "012650-99999", "time": -655509600000}
Show the schema:
$ fastavro --schema weather.avro { "type": "record", "namespace": "test", "doc": "A weather reading.", "fields": [ { "type": "string", "name": "station" }, { "type": "long", "name": "time" }, { "type": "int", "name": "temp" } ], "name": "Weather" }
- genindex
- modindex
- search
Author
Miki Tebeka
Copyright
2021, Miki Tebeka