Skip to content

Commit 00872df

Browse files
Antonio MurgiaAntonio Murgia
authored andcommitted
Merge branch 'develop' into feature/19
2 parents 2b05e3f + f5f146b commit 00872df

File tree

14 files changed

+150
-12
lines changed

14 files changed

+150
-12
lines changed

common/src/main/scala/it/agilelab/darwin/common/Connector.scala

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,27 @@ import org.apache.avro.Schema
99
*/
1010
abstract class Connector(config: Config) extends Serializable {
1111

12+
/**
13+
* Creates the configured table, if the table already exists, does nothing
14+
*/
15+
def createTable(): Unit
16+
17+
/**
18+
* Returns whether or not the configured table exists
19+
*/
20+
def tableExists(): Boolean
21+
22+
/**
23+
*
24+
* @return a message indicating the user what he/she should do to create the table him/herself
25+
*/
26+
def tableCreationHint(): String
27+
1228
/**
1329
* Loads all schemas found on the storage.
1430
* This method can be invoked multiple times: to initialize the initial values or to update the existing ones with
1531
* the new data found on the storage.
32+
*
1633
* @return a sequence of all the pairs (ID, schema) found on the storage
1734
*/
1835
def fullLoad(): Seq[(Long, Schema)]
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package it.agilelab.darwin
2+
3+
package object common {
4+
5+
def using[A <: AutoCloseable, B](closeable: A)(f: A => B): B = {
6+
try {
7+
f(closeable)
8+
} finally {
9+
closeable.close()
10+
}
11+
}
12+
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
createTable: false

core/src/main/scala/it/agilelab/darwin/manager/AvroSchemaManager.scala

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package it.agilelab.darwin.manager
33
import com.typesafe.config.Config
44
import it.agilelab.darwin.common.{Connector, ConnectorFactory, Logging}
55
import it.agilelab.darwin.manager.exception.ConnectorNotFoundException
6+
import it.agilelab.darwin.manager.util.ConfigurationKeys
67
import jdk.nashorn.internal.runtime.ParserException
78
import org.apache.avro.{Schema, SchemaNormalization}
89
import it.agilelab.darwin.manager.util.ByteArrayUtils._
@@ -20,9 +21,20 @@ trait AvroSchemaManager extends Logging {
2021

2122
protected def config: Config
2223

23-
protected[darwin] lazy val connector: Connector = ConnectorFactory.creators().headOption.map(_.create(config))
24+
protected[darwin] lazy val connector: Connector = {
25+
val cnt = ConnectorFactory.creators().headOption.map(_.create(config))
2426
.getOrElse(throw new ConnectorNotFoundException(config))
2527

28+
if (config.getBoolean(ConfigurationKeys.CREATE_TABLE)) {
29+
cnt.createTable()
30+
} else if (!cnt.tableExists()) {
31+
log.warn(s"Darwin table does not exists and has not been created (${ConfigurationKeys.CREATE_TABLE} was false)")
32+
log.warn(cnt.tableCreationHint())
33+
}
34+
cnt
35+
}
36+
37+
2638
/**
2739
* Extracts the ID from a Schema.
2840
*
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package it.agilelab.darwin.manager.util
2+
3+
object ConfigurationKeys {
4+
5+
val CREATE_TABLE = "createTable"
6+
7+
}

hbase/src/main/scala/it/agilelab/darwin/connector/hbase/HBaseConnector.scala

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package it.agilelab.darwin.connector.hbase
22

33
import com.typesafe.config.Config
4-
import it.agilelab.darwin.common.{Connector, Logging}
4+
import it.agilelab.darwin.common.{Connector, Logging, using}
55
import org.apache.avro.Schema
66
import org.apache.avro.Schema.Parser
77
import org.apache.commons.io.IOUtils
@@ -10,7 +10,7 @@ import org.apache.hadoop.fs.Path
1010
import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Get, Put, Result}
1111
import org.apache.hadoop.hbase.security.User
1212
import org.apache.hadoop.hbase.util.Bytes
13-
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
13+
import org.apache.hadoop.hbase._
1414
import org.apache.hadoop.security.UserGroupInformation
1515

1616
import scala.collection.JavaConverters._
@@ -51,8 +51,8 @@ case class HBaseConnector(config: Config) extends Connector(config) with Logging
5151

5252
lazy val TABLE_NAME: TableName = TableName.valueOf(Bytes.toBytes(NAMESPACE_STRING), Bytes.toBytes(TABLE_NAME_STRING))
5353

54-
lazy val CF: Array[Byte] = Bytes.toBytes("0")
55-
lazy val QUALIFIER: Array[Byte] = Bytes.toBytes("schema")
54+
val CF: Array[Byte] = Bytes.toBytes("0")
55+
val QUALIFIER: Array[Byte] = Bytes.toBytes("schema")
5656

5757
log.debug("Creating default HBaseConfiguration")
5858
val configuration: Configuration = HBaseConfiguration.create()
@@ -126,6 +126,31 @@ case class HBaseConnector(config: Config) extends Connector(config) with Logging
126126
log.debug(s"insertion of schemas into $NAMESPACE_STRING:$TABLE_NAME_STRING successful")
127127
}
128128

129+
override def createTable(): Unit = {
130+
using(connection.getAdmin) { admin =>
131+
if (!admin.listNamespaceDescriptors().exists(_.getName == NAMESPACE_STRING)) {
132+
log.info(s"Namespace $NAMESPACE_STRING does not exists, creating it")
133+
admin.createNamespace(NamespaceDescriptor.create(NAMESPACE_STRING).build())
134+
}
135+
if (!tableExists()) {
136+
log.info(s"Table $TABLE_NAME does not exists, creating it")
137+
admin.createTable(new HTableDescriptor(TABLE_NAME).addFamily(new HColumnDescriptor(CF)))
138+
}
139+
}
140+
}
141+
142+
override def tableExists(): Boolean = {
143+
using(connection.getAdmin) { admin =>
144+
admin.tableExists(TABLE_NAME)
145+
}
146+
}
147+
148+
override def tableCreationHint(): String = {
149+
s"""To create namespace and table from an HBase shell issue:
150+
| create_namespace '$NAMESPACE_STRING'
151+
| create '$NAMESPACE_STRING:$TABLE_NAME_STRING', '0'""".stripMargin
152+
}
153+
129154
override def findSchema(id: Long): Option[Schema] = {
130155
log.debug(s"loading a schema with id = $id from table $NAMESPACE_STRING:$TABLE_NAME_STRING")
131156
val get: Get = new Get(Bytes.toBytes(id))

hbase/src/test/scala/it/agilelab/darwin/connector/hbase/HBaseConnectorSuite.scala

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,11 @@ package it.agilelab.darwin.connector.hbase
22

33
import com.typesafe.config.ConfigFactory
44
import it.agilelab.darwin.common.Connector
5-
import org.apache.avro.{Schema, SchemaNormalization}
65
import org.apache.avro.reflect.ReflectData
7-
import org.scalatest.{FlatSpec, Matchers}
6+
import org.apache.avro.{Schema, SchemaNormalization}
7+
import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
88

9-
class HBaseConnectorSuite extends FlatSpec with Matchers {
9+
class HBaseConnectorSuite extends FlatSpec with Matchers with BeforeAndAfterAll {
1010

1111
val connector: Connector = new HBaseConnectorCreator().create(ConfigFactory.load())
1212

@@ -28,4 +28,19 @@ class HBaseConnectorSuite extends FlatSpec with Matchers {
2828
assert(noSchema.isEmpty)
2929
}
3030

31+
"connector.tableCreationHint" should "print the correct hint for table creation" in {
32+
connector.tableCreationHint() should be(
33+
"""To create namespace and table from an HBase shell issue:
34+
| create_namespace 'AVRO'
35+
| create 'AVRO:SCHEMA_REPOSITORY', '0'""".stripMargin)
36+
}
37+
38+
"connector.tableExists" should "return true with existent table" in {
39+
connector.tableExists() should be(true)
40+
}
41+
42+
override protected def beforeAll(): Unit = {
43+
super.beforeAll()
44+
connector.createTable()
45+
}
3146
}

ignite/src/main/scala/it/agilelab/darwin/connector/ignite/IgniteConnector.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,4 +10,10 @@ class IgniteConnector(config: Config) extends Connector(config) {
1010
override def insert(schemas: Seq[(Long, Schema)]): Unit = ???
1111

1212
override def findSchema(id: Long): Option[Schema] = ???
13+
14+
override def createTable(): Unit = ???
15+
16+
override def tableExists(): Boolean = ???
17+
18+
override def tableCreationHint(): String = ???
1319
}

mock-connector/src/main/scala/it/agilelab/darwin/connector/mock/MockConnector.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,4 +28,10 @@ class MockConnector(config: Config) extends Connector(config) {
2828
}
2929

3030
override def findSchema(id: Long): Option[Schema] = table.get(id)
31+
32+
override def createTable(): Unit = Unit
33+
34+
override def tableExists(): Boolean = true
35+
36+
override def tableCreationHint(): String = ""
3137
}

mongo/src/main/scala/it/agilelab/darwin/connector/mongo/MongoConnector.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,5 +9,11 @@ class MongoConnector(config: Config) extends Connector(config) {
99

1010
override def insert(schemas: Seq[(Long, Schema)]): Unit = ???
1111

12+
override def createTable(): Unit = ???
13+
14+
override def tableExists(): Boolean = ???
15+
16+
override def tableCreationHint(): String = ???
17+
1218
override def findSchema(id: Long): Option[Schema] = ???
1319
}

0 commit comments

Comments
 (0)