Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package org.apache.flinkx.api.serializer

import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSnapshot}
import org.apache.flink.core.memory.{DataInputView, DataOutputView}
import org.apache.flinkx.api.{NullMarker, VariableLengthDataType}

import scala.collection.mutable

/** Serializer for [[mutable.ArrayDeque]]. Handle nullable value. */
class MutableArrayDequeSerializer[A](child: TypeSerializer[A], clazz: Class[A])
extends MutableSerializer[mutable.ArrayDeque[A]] {

override def copy(from: mutable.ArrayDeque[A]): mutable.ArrayDeque[A] =
if (from == null) {
from
} else {
val length = from.length
val result = from.clone()
if (!child.isImmutableType) {
var i = 0
while (i < length) {
val element = result(i)
if (element != null) result(i) = child.copy(element)
i += 1
}
}
result
}

override def duplicate(): MutableArrayDequeSerializer[A] = {
val duplicatedChild = child.duplicate()
if (duplicatedChild.eq(child)) {
this
} else {
new MutableArrayDequeSerializer[A](duplicatedChild, clazz)
}
}

override def createInstance(): mutable.ArrayDeque[A] = mutable.ArrayDeque.empty[A]

override def getLength: Int = VariableLengthDataType

override def serialize(records: mutable.ArrayDeque[A], target: DataOutputView): Unit =
if (records == null) {
target.writeInt(NullMarker)
} else {
target.writeInt(records.length)
var i = 0
while (i < records.length) { // while loop is significantly faster than foreach when working on arrays
child.serialize(records(i), target)
i += 1
}
}

override def deserialize(source: DataInputView): mutable.ArrayDeque[A] = {
var remaining = source.readInt()
if (remaining == NullMarker) {
null
} else {
val arrayDeque = createInstance()
while (remaining > 0) {
arrayDeque.append(child.deserialize(source))
remaining -= 1
}
arrayDeque
}
}

override def copy(source: DataInputView, target: DataOutputView): Unit = {
var remaining = source.readInt()
target.writeInt(remaining)
while (remaining > 0) {
child.copy(source, target)
remaining -= 1
}
}

override def snapshotConfiguration(): TypeSerializerSnapshot[mutable.ArrayDeque[A]] =
new CollectionSerializerSnapshot[mutable.ArrayDeque, A, MutableArrayDequeSerializer[A]](
child,
classOf[MutableArrayDequeSerializer[A]],
clazz
)

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package org.apache.flinkx.api.serializer

import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSnapshot}
import org.apache.flink.core.memory.{DataInputView, DataOutputView}
import org.apache.flinkx.api.{NullMarker, VariableLengthDataType}

import scala.collection.mutable

/** Serializer for [[mutable.Buffer]]. Handle nullable value. */
class MutableBufferSerializer[A](child: TypeSerializer[A], clazz: Class[A])
extends MutableSerializer[mutable.Buffer[A]] {

override def copy(from: mutable.Buffer[A]): mutable.Buffer[A] =
if (from == null) {
from
} else {
val length = from.length
val result = from.clone()
if (!child.isImmutableType) {
var i = 0
while (i < length) {
val element = result(i)
if (element != null) result(i) = child.copy(element)
i += 1
}
}
result
}

override def duplicate(): MutableBufferSerializer[A] = {
val duplicatedChild = child.duplicate()
if (duplicatedChild.eq(child)) {
this
} else {
new MutableBufferSerializer[A](duplicatedChild, clazz)
}
}

override def createInstance(): mutable.Buffer[A] = mutable.Buffer.empty[A]

override def getLength: Int = VariableLengthDataType

override def serialize(records: mutable.Buffer[A], target: DataOutputView): Unit =
if (records == null) {
target.writeInt(NullMarker)
} else {
target.writeInt(records.length)
records match {
case _: mutable.ArrayBuffer[_] | _: mutable.ArrayDeque[_] =>
var i = 0
while (i < records.length) { // while loop is significantly faster than foreach when working on arrays
child.serialize(records(i), target)
i += 1
}
case _ => records.foreach(element => child.serialize(element, target))
}
}

override def deserialize(source: DataInputView): mutable.Buffer[A] = {
var remaining = source.readInt()
if (remaining == NullMarker) {
null
} else {
val buffer = createInstance()
while (remaining > 0) {
buffer.append(child.deserialize(source))
remaining -= 1
}
buffer
}
}

override def copy(source: DataInputView, target: DataOutputView): Unit = {
var remaining = source.readInt()
target.writeInt(remaining)
while (remaining > 0) {
child.copy(source, target)
remaining -= 1
}
}

override def snapshotConfiguration(): TypeSerializerSnapshot[mutable.Buffer[A]] =
new CollectionSerializerSnapshot[mutable.Buffer, A, MutableBufferSerializer[A]](
child,
classOf[MutableBufferSerializer[A]],
clazz
)

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package org.apache.flinkx.api.serializer

import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSchemaCompatibility, TypeSerializerSnapshot}
import org.apache.flink.core.memory.{DataInputView, DataOutputView}
import org.apache.flinkx.api.{NullMarker, VariableLengthDataType}

import scala.collection.mutable

/** Serializer for [[mutable.Map]]. Handle nullable value. */
class MutableMapSerializer[K, V](
keySerializer: TypeSerializer[K],
valueSerializer: TypeSerializer[V]
) extends MutableSerializer[mutable.Map[K, V]] {

override def copy(from: mutable.Map[K, V]): mutable.Map[K, V] =
if (from == null) {
from
} else {
from.map(element => (keySerializer.copy(element._1), valueSerializer.copy(element._2)))
}

override def duplicate(): MutableMapSerializer[K, V] = {
val duplicatedKs = keySerializer.duplicate()
val duplicatedVs = valueSerializer.duplicate()
if (duplicatedKs.eq(keySerializer) && duplicatedVs.eq(valueSerializer)) {
this
} else {
new MutableMapSerializer(duplicatedKs, duplicatedVs)
}
}

override def createInstance(): mutable.Map[K, V] = mutable.Map.empty[K, V]

override def getLength: Int = VariableLengthDataType

override def serialize(records: mutable.Map[K, V], target: DataOutputView): Unit =
if (records == null) {
target.writeInt(NullMarker)
} else {
target.writeInt(records.size)
records.foreach(element => {
keySerializer.serialize(element._1, target)
valueSerializer.serialize(element._2, target)
})
}

override def deserialize(source: DataInputView): mutable.Map[K, V] = {
var remaining = source.readInt() // The valid range of actual data is >= 0. Only markers are negative
if (remaining == NullMarker) {
null
} else {
val map = createInstance()
while (remaining > 0) {
val key = keySerializer.deserialize(source)
val value = valueSerializer.deserialize(source)
map.put(key, value)
remaining -= 1
}
map
}
}

override def copy(source: DataInputView, target: DataOutputView): Unit = {
var remaining = source.readInt()
target.writeInt(remaining)
while (remaining > 0) {
keySerializer.copy(source, target)
valueSerializer.copy(source, target)
remaining -= 1
}
}

override def snapshotConfiguration(): TypeSerializerSnapshot[mutable.Map[K, V]] =
new MutableMapSerializerSnapshot(keySerializer, valueSerializer)

}

class MutableMapSerializerSnapshot[K, V](
private var keySerializer: TypeSerializer[K],
private var valueSerializer: TypeSerializer[V]
) extends TypeSerializerSnapshot[mutable.Map[K, V]] {

def this() = this(null, null)

override def getCurrentVersion: Int = 1

override def writeSnapshot(out: DataOutputView): Unit = {
TypeSerializerSnapshot.writeVersionedSnapshot(out, keySerializer.snapshotConfiguration())
TypeSerializerSnapshot.writeVersionedSnapshot(out, valueSerializer.snapshotConfiguration())
}

override def readSnapshot(readVersion: Int, in: DataInputView, userCodeClassLoader: ClassLoader): Unit = {
keySerializer = TypeSerializerSnapshot.readVersionedSnapshot[K](in, userCodeClassLoader).restoreSerializer()
valueSerializer = TypeSerializerSnapshot.readVersionedSnapshot[V](in, userCodeClassLoader).restoreSerializer()
}

override def resolveSchemaCompatibility(
oldSerializerSnapshot: TypeSerializerSnapshot[mutable.Map[K, V]]
): TypeSerializerSchemaCompatibility[mutable.Map[K, V]] = {
TypeSerializerSchemaCompatibility.compatibleAsIs()
}

override def restoreSerializer(): TypeSerializer[mutable.Map[K, V]] =
new MutableMapSerializer(keySerializer, valueSerializer)

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package org.apache.flinkx.api.serializer

import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSnapshot}
import org.apache.flink.core.memory.{DataInputView, DataOutputView}
import org.apache.flinkx.api.{NullMarker, VariableLengthDataType}

import scala.collection.mutable

/** Serializer for [[mutable.Queue]]. Handle nullable value. */
class MutableQueueSerializer[A](child: TypeSerializer[A], clazz: Class[A]) extends MutableSerializer[mutable.Queue[A]] {

override def copy(from: mutable.Queue[A]): mutable.Queue[A] =
if (from == null) {
from
} else {
val length = from.length
val result = from.clone()
if (!child.isImmutableType) {
var i = 0
while (i < length) {
val element = result(i)
if (element != null) result(i) = child.copy(element)
i += 1
}
}
result
}

override def duplicate(): MutableQueueSerializer[A] = {
val duplicatedChild = child.duplicate()
if (duplicatedChild.eq(child)) {
this
} else {
new MutableQueueSerializer[A](duplicatedChild, clazz)
}
}

override def createInstance(): mutable.Queue[A] = mutable.Queue.empty[A]

override def getLength: Int = VariableLengthDataType

override def serialize(records: mutable.Queue[A], target: DataOutputView): Unit =
if (records == null) {
target.writeInt(NullMarker)
} else {
target.writeInt(records.length)
var i = 0
while (i < records.length) { // while loop is significantly faster than foreach when working on arrays
child.serialize(records(i), target)
i += 1
}
}

override def deserialize(source: DataInputView): mutable.Queue[A] = {
var remaining = source.readInt()
if (remaining == NullMarker) {
null
} else {
val queue = createInstance()
while (remaining > 0) {
val a = child.deserialize(source)
queue.append(a)
remaining -= 1
}
queue
}
}

override def copy(source: DataInputView, target: DataOutputView): Unit = {
var remaining = source.readInt()
target.writeInt(remaining)
while (remaining > 0) {
child.copy(source, target)
remaining -= 1
}
}

override def snapshotConfiguration(): TypeSerializerSnapshot[mutable.Queue[A]] =
new CollectionSerializerSnapshot[mutable.Queue, A, MutableQueueSerializer[A]](
child,
classOf[MutableQueueSerializer[A]],
clazz
)

}
Loading