序言
在Kafka客户端与服务端通信的过程中,为了正确地发出、读取不同命令,需要定义通信的格式。org.apache.kafka.common.protocol包就负责该功能。
包内有以下成员:
- ApiKeys定义了不少Api,每个ApiKeys都包含多个Schema。
- types包首先定义了Type抽象类,暴露读写方法供子类实现。Schema继承了Type[1]。
- CommonFields定义了很多公共使用的Field对象。
比较重要的几个类是ApiKeys、Schema、struct、Field,我们逐个理解。
Type
Type是抽象类,定义了多个接口,主要是write和read,对ByteBuffer进行读写。Type内定义了多个内部静态类,比如Type.BOOLEAN, Type.INT8, Type.INT16。
我们看看Type.BOOLEAN的实现,write操作就是简单地往ByteBuffer写入byte类型的0或1,read操作就是读取一个byte并转换为Boolean类型。
如图
ArrayOf
继承了Type,但本身表示type的数组。其write方法首先为数组的每个元素调用write,再写入数组长度;read方法首先读取数组长度,再依次读取数组的每个元素。
/**
* Represents a type for an array of a particular type
*/
public class ArrayOf extends Type {
private final Type type;
private final boolean nullable;
public ArrayOf(Type type) {
this(type, false);
}
public static ArrayOf nullable(Type type) {
return new ArrayOf(type, true);
}
private ArrayOf(Type type, boolean nullable) {
this.type = type;
this.nullable = nullable;
}
@Override
public boolean isNullable() {
return nullable;
}
@Override
public void write(ByteBuffer buffer, Object o) {
if (o == null) {
buffer.putInt(-1);
return;
}
Object[] objs = (Object[]) o;
int size = objs.length;
buffer.putInt(size);
for (Object obj : objs)
type.write(buffer, obj);
}
@Override
public Object read(ByteBuffer buffer) {
int size = buffer.getInt();
if (size < 0 && isNullable())
return null;
else if (size < 0)
throw new SchemaException("Array size " + size + " cannot be negative");
if (size > buffer.remaining())
throw new SchemaException("Error reading array of size " + size + ", only " + buffer.remaining() + " bytes available");
Object[] objs = new Object[size];
for (int i = 0; i < size; i++)
objs[i] = type.read(buffer);
return objs;
}
Field
Field类的意思是"值域"
- 它有几个属性: name、docString、type, 如果hasDefaultValue为真,则还有defaultValue属性。name和docString分别表示该值域的名字和描述。type表示该值域的类型。
- 内部有几个子类,Str, Int8, Int32等,表示不同类型的值域。从Int8和Int32类可知,它们的type分别为Type.INT8和Type.INT32。
Field类的结构图示如下,其中defaultValue为虚线,因为该属性在hasDefaultValue为false时不存在。
各种Field的继承类与type类型的对应关系如下:
Schema
Schema顾名思义,就是格式的意思,按顺序定义了一个格式中多个值域的顺序。它继承了Type类,可对ByteBuffer进行读写操作。
- 从构造函数中可知,传入多个Field,按顺序包装为BoundField,添加到fields。再生成按值域名查找的fieldsByName,便于查找。
- 从read方法可知
- Schema向ByteBuffer读取,其实就是依次让其每个值域(的type)进行读取。write的行为同理,不赘述。
- read最后会返回一个Struct变量,下文分析。构建Struct传入了自己和objects,只要有Schema,就能按顺序再取出来。
每个BoundField都有变量def,记录它保存的Field;还有index变量,记录它在一个Schema中的位置。
/**
* The schema for a compound record definition
*/
public class Schema extends Type {
private final BoundField[] fields;
private final Map<String, BoundField> fieldsByName;
/**
* Construct the schema with a given list of its field values
*
* @throws SchemaException If the given list have duplicate fields
*/
public Schema(Field... fs) {
this.fields = new BoundField[fs.length];
this.fieldsByName = new HashMap<>();
for (int i = 0; i < this.fields.length; i++) {
Field def = fs[i];
if (fieldsByName.containsKey(def.name))
throw new SchemaException("Schema contains a duplicate field: " + def.name);
this.fields[i] = new BoundField(def, this, i);
this.fieldsByName.put(def.name, this.fields[i]);
}
}
...
/**
* Read a struct from the buffer
*/
@Override
public Struct read(ByteBuffer buffer) {
Object[] objects = new Object[fields.length];
for (int i = 0; i < fields.length; i++) {
try {
objects[i] = fields[i].def.type.read(buffer);
} catch (Exception e) {
throw new SchemaException("Error reading field '" + fields[i].def.name + "': " +
(e.getMessage() == null ? e.getClass().getName() : e.getMessage()));
}
}
return new Struct(this, objects);
}
/**
* A field definition bound to a particular schema.
*/
public class BoundField {
public final Field def;
final int index;
final Schema schema;
public BoundField(Field def, Schema schema, int index) {
this.def = def;
this.schema = schema;
this.index = index;
}
@Override
public String toString() {
return def.name + ":" + def.type;
}
}
图示如下:
Struct
维护了values和schema两种变量,分别代表一组值域和可以解析它的格式。
/**
* A record that can be serialized and deserialized according to a pre-defined schema
*/
public class Struct {
private final Schema schema;
private final Object[] values;
提供各种setter和getter,都是按照Schema填入values,和从values取出值域。
-
getter中get和getArray调用较多,行为都是先根据name找到BoundField,再根据其保存index从values读取出来。
/**
* Get the record value for the field with the given name by doing a hash table lookup (slower!)
*
* @param name The name of the field
* @return The value in the record
* @throws SchemaException If no such field exists
*/
public Object get(String name) {
BoundField field = schema.get(name);
if (field == null)
throw new SchemaException("No such field: " + name);
return getFieldOrDefault(field);
}
public Object[] getArray(String name) {
return (Object[]) get(name);
}
/**
* Return the value of the given pre-validated field, or if the value is missing return the default value.
*
* @param field The field for which to get the default value
* @throws SchemaException if the field has no value and has no default.
*/
private Object getFieldOrDefault(BoundField field) {
Object value = this.values[field.index];
if (value != null)
return value;
else if (field.def.hasDefaultValue)
return field.def.defaultValue;
else if (field.def.type.isNullable())
return null;
else
throw new SchemaException("Missing value for field '" + field.def.name + "' which has no default value.");
}
AbstractRequest/AbstractResponse
AbstractResponse提供toStruct和parseResponse,负责Struct与Abstract之间的转换。
- toStruct会按照Schema构建Struct
-
各种Response在构建时,会按照其定义的名字从定义了Schema的Struct中取出各个值域
AbstractRequest同理,此处不赘述
ApiKeys
ApiKeys是enum类型,有很多个实例。它为很多组Api的不同版本,定义了请求和响应的格式。每个Api,比如PRODUCE、FETCH等,都分为请求和响应两部分,它们各自有一个格式,在不同版本下的格式还不同。
ApiKeys(int id, String name, Schema[] requestSchemas, Schema[] responseSchemas) {
this(id, name, false, requestSchemas, responseSchemas);
}
ApiKeys(int id, String name, boolean clusterAction, Schema[] requestSchemas, Schema[] responseSchemas) {
this(id, name, clusterAction, RecordBatch.MAGIC_VALUE_V0, requestSchemas, responseSchemas);
}
ApiKeys(int id, String name, boolean clusterAction, byte minRequiredInterBrokerMagic,
Schema[] requestSchemas, Schema[] responseSchemas) {
if (id < 0)
throw new IllegalArgumentException("id must not be negative, id: " + id);
this.id = (short) id;
this.name = name;
...
this.requestSchemas = requestSchemas;
this.responseSchemas = responseSchemas;
}
外界经常调用的是parseRequest和parseResponse,根据版本来解析请求/响应。
parseResponse的行为:
- 根据version从responseSchemas取出Schema
- 调用其read方法
parseRequest同理。因此ApiKeys下的每个实例(PRODUCE、FETCH等)都能根据版本解析请求/响应
public Schema requestSchema(short version) {
return schemaFor(requestSchemas, version);
}
public Schema responseSchema(short version) {
return schemaFor(responseSchemas, version);
}
public Struct parseRequest(short version, ByteBuffer buffer) {
return requestSchema(version).read(buffer);
}
public Struct parseResponse(short version, ByteBuffer buffer) {
return responseSchema(version).read(buffer);
}
protected Struct parseResponse(short version, ByteBuffer buffer, short fallbackVersion) {
int bufferPosition = buffer.position();
try {
return responseSchema(version).read(buffer);
} catch (SchemaException e) {
if (version != fallbackVersion) {
buffer.position(bufferPosition);
return responseSchema(fallbackVersion).read(buffer);
} else
throw e;
}
}
private Schema schemaFor(Schema[] versions, short version) {
if (!isVersionSupported(version))
throw new IllegalArgumentException("Invalid version for API key " + this + ": " + version);
return versions[version];
}
Response中Struct的生成
- 调用inFlightRequests.completeNext取出头部的请求,暗示当前收到的响应就对应该请求,因为Kafka服务端会保证按照顺序响应请求。
- parseStruct...会将请求的ByteBuffer转换为Struct。注意传入了对应请求的header。
- AbstractResponse.parseResponse将Struct转换为响应。
parseStruct...
调用ApiKeys::parseResponse将ByteBuffer解析为Struct。调用中有两个细节:
- 请求与响应,应当属于同一对apiKey。因此用与请求相同的apiKey解析响应
- 请求与响应,应当属于同一api版本。因此传入的api版本为resquestHeader.apiVersion(),请求的api版本。
- responseSchema根据版本取出Api在该版本下的Schema,然后调用read读取ByteBuffer。
- Schema::read方法会按顺序读取每一个Field,作为Object类型存储。(话说
fields[i].def.type.read这么长的调用,违反迪米特法则了啊)
// ApiKeys.java
public Struct parseResponse(short version, ByteBuffer buffer) {
return responseSchema(version).read(buffer);
}
// Schema.java
/**
* Read a struct from the buffer
*/
@Override
public Struct read(ByteBuffer buffer) {
Object[] objects = new Object[fields.length];
for (int i = 0; i < fields.length; i++) {
try {
objects[i] = fields[i].def.type.read(buffer);
} catch (Exception e) {
throw new SchemaException("Error reading field '" + fields[i].def.name + "': " +
(e.getMessage() == null ? e.getClass().getName() : e.getMessage()));
}
}
return new Struct(this, objects);
}
AbstractResponse.parseResponse
不赘述,可选择某个Response查看实现。
-
Schema继承了Type,却可能包含了多个Type,所以此处用了组合设计模式,但不够严格,因为ApiKeys引用了Schema ↩












网友评论