Skip to content

Commit 161e19d

Browse files
[#18] Make HBaseConnector and PostgreSql connector create tables automatically if set
1 parent ea214a7 commit 161e19d

File tree

13 files changed

+144
-11
lines changed

13 files changed

+144
-11
lines changed

common/src/main/scala/it/agilelab/darwin/common/Connector.scala

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,27 @@ import org.apache.avro.Schema
99
*/
1010
abstract class Connector(config: Config) extends Serializable {
1111

12+
/**
13+
* Creates the configured table, if the table already exists, does nothing
14+
*/
15+
def createTable(): Unit
16+
17+
/**
18+
* Returns whether or not the configured table exists
19+
*/
20+
def tableExists(): Boolean
21+
22+
/**
23+
*
24+
* @return a message indicating the user what he/she should do to create the table him/herself
25+
*/
26+
def tableCreationHint(): String
27+
1228
/**
1329
* Loads all schemas found on the storage.
1430
* This method can be invoked multiple times: to initialize the initial values or to update the existing ones with
1531
* the new data found on the storage.
32+
*
1633
* @return a sequence of all the pairs (ID, schema) found on the storage
1734
*/
1835
def fullLoad(): Seq[(Long, Schema)]
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package it.agilelab.darwin
2+
3+
package object common {
4+
5+
def using[A <: AutoCloseable, B](closeable: A)(f: A => B): B = {
6+
try {
7+
f(closeable)
8+
} finally {
9+
closeable.close()
10+
}
11+
}
12+
}

core/src/main/scala/it/agilelab/darwin/manager/AvroSchemaManager.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,10 @@ import java.util.concurrent.atomic.AtomicReference
66
import com.typesafe.config.Config
77
import it.agilelab.darwin.common.{Connector, ConnectorFactory, Logging}
88
import it.agilelab.darwin.manager.exception.ConnectorNotFoundException
9+
import it.agilelab.darwin.manager.util.ConfigurationKeys
910
import jdk.nashorn.internal.runtime.ParserException
1011
import org.apache.avro.Schema
12+
1113
import scala.collection.JavaConverters._
1214

1315
object AvroSchemaManager extends Logging {
@@ -162,6 +164,12 @@ case class AvroSchemaManager(config: Config) extends Logging {
162164
.getOrElse(throw new ConnectorNotFoundException(config))
163165

164166
private def initialize(): Unit = {
167+
if (config.getBoolean(ConfigurationKeys.CREATE_TABLE)) {
168+
connector.createTable()
169+
} else if (!connector.tableExists()) {
170+
log.warn(s"Darwin table does not exists and has not been created (${ConfigurationKeys.CREATE_TABLE} was false)")
171+
log.warn(connector.tableCreationHint())
172+
}
165173
log.debug("cache initialization...")
166174
AvroSchemaManager._cache.compareAndSet(None, Some(AvroSchemaCacheFingerprint(connector.fullLoad())))
167175
log.debug("cache initialized")
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package it.agilelab.darwin.manager.util
2+
3+
object ConfigurationKeys {
4+
5+
val CREATE_TABLE = "createTable"
6+
7+
}

hbase/src/main/scala/it/agilelab/darwin/connector/hbase/HBaseConnector.scala

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package it.agilelab.darwin.connector.hbase
22

33
import com.typesafe.config.Config
4-
import it.agilelab.darwin.common.{Connector, Logging}
4+
import it.agilelab.darwin.common.{Connector, Logging, using}
55
import org.apache.avro.Schema
66
import org.apache.avro.Schema.Parser
77
import org.apache.commons.io.IOUtils
@@ -10,7 +10,7 @@ import org.apache.hadoop.fs.Path
1010
import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Put, Result}
1111
import org.apache.hadoop.hbase.security.User
1212
import org.apache.hadoop.hbase.util.Bytes
13-
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
13+
import org.apache.hadoop.hbase._
1414
import org.apache.hadoop.security.UserGroupInformation
1515

1616
import scala.collection.JavaConverters._
@@ -51,8 +51,8 @@ case class HBaseConnector(config: Config) extends Connector(config) with Logging
5151

5252
lazy val TABLE_NAME: TableName = TableName.valueOf(Bytes.toBytes(NAMESPACE_STRING), Bytes.toBytes(TABLE_NAME_STRING))
5353

54-
lazy val CF: Array[Byte] = Bytes.toBytes("0")
55-
lazy val QUALIFIER: Array[Byte] = Bytes.toBytes("schema")
54+
val CF: Array[Byte] = Bytes.toBytes("0")
55+
val QUALIFIER: Array[Byte] = Bytes.toBytes("schema")
5656

5757
log.debug("Creating default HBaseConfiguration")
5858
val configuration: Configuration = HBaseConfiguration.create()
@@ -125,6 +125,31 @@ case class HBaseConnector(config: Config) extends Connector(config) with Logging
125125
mutator.flush()
126126
log.debug(s"insertion of schemas into $NAMESPACE_STRING:$TABLE_NAME_STRING successful")
127127
}
128+
129+
override def createTable(): Unit = {
130+
using(connection.getAdmin) { admin =>
131+
if (!admin.listNamespaceDescriptors().exists(_.getName == NAMESPACE_STRING)) {
132+
log.info(s"Namespace $NAMESPACE_STRING does not exists, creating it")
133+
admin.createNamespace(NamespaceDescriptor.create(NAMESPACE_STRING).build())
134+
}
135+
if (!tableExists()) {
136+
log.info(s"Table $TABLE_NAME does not exists, creating it")
137+
admin.createTable(new HTableDescriptor(TABLE_NAME).addFamily(new HColumnDescriptor(CF)))
138+
}
139+
}
140+
}
141+
142+
override def tableExists(): Boolean = {
143+
using(connection.getAdmin) { admin =>
144+
admin.tableExists(TABLE_NAME)
145+
}
146+
}
147+
148+
override def tableCreationHint(): String = {
149+
s"""To create namespace and table from an HBase shell issue:
150+
| create_namespace '$NAMESPACE_STRING'
151+
| create '$NAMESPACE_STRING:$TABLE_NAME_STRING', '0'""".stripMargin
152+
}
128153
}
129154

130155

hbase/src/test/scala/it/agilelab/darwin/connector/hbase/HBaseConnectorSuite.scala

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,11 @@ package it.agilelab.darwin.connector.hbase
22

33
import com.typesafe.config.ConfigFactory
44
import it.agilelab.darwin.common.Connector
5-
import org.apache.avro.{Schema, SchemaNormalization}
65
import org.apache.avro.reflect.ReflectData
7-
import org.scalatest.{FlatSpec, Matchers}
6+
import org.apache.avro.{Schema, SchemaNormalization}
7+
import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
88

9-
class HBaseConnectorSuite extends FlatSpec with Matchers {
9+
class HBaseConnectorSuite extends FlatSpec with Matchers with BeforeAndAfterAll {
1010

1111
val connector: Connector = new HBaseConnectorCreator().create(ConfigFactory.load())
1212

@@ -23,4 +23,19 @@ class HBaseConnectorSuite extends FlatSpec with Matchers {
2323
assert(loaded.forall(schemas.contains))
2424
}
2525

26+
"connector.tableCreationHint" should "print the correct hint for table creation" in {
27+
connector.tableCreationHint() should be(
28+
"""To create namespace and table from an HBase shell issue:
29+
| create_namespace 'AVRO'
30+
| create 'AVRO:SCHEMA_REPOSITORY', '0'""".stripMargin)
31+
}
32+
33+
"connector.tableExists" should "return true with existent table" in {
34+
connector.tableExists() should be(true)
35+
}
36+
37+
override protected def beforeAll(): Unit = {
38+
super.beforeAll()
39+
connector.createTable()
40+
}
2641
}

ignite/src/main/scala/it/agilelab/darwin/connector/ignite/IgniteConnector.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,10 @@ class IgniteConnector(config: Config) extends Connector(config) {
88
override def fullLoad(): Seq[(Long, Schema)] = ???
99

1010
override def insert(schemas: Seq[(Long, Schema)]): Unit = ???
11+
12+
override def createTable(): Unit = ???
13+
14+
override def tableExists(): Boolean = ???
15+
16+
override def tableCreationHint(): String = ???
1117
}

mock-connector/src/main/scala/it/agilelab/darwin/connector/mock/MockConnector.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,4 +26,10 @@ class MockConnector(config: Config) extends Connector(config) {
2626
val p = new Schema.Parser()
2727
p.parse(getClass.getClassLoader.getResourceAsStream(path))
2828
}
29+
30+
override def createTable(): Unit = Unit
31+
32+
override def tableExists(): Boolean = true
33+
34+
override def tableCreationHint(): String = ""
2935
}

mongo/src/main/scala/it/agilelab/darwin/connector/mongo/MongoConnector.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,10 @@ class MongoConnector(config: Config) extends Connector(config) {
88
override def fullLoad(): Seq[(Long, Schema)] = ???
99

1010
override def insert(schemas: Seq[(Long, Schema)]): Unit = ???
11+
12+
override def createTable(): Unit = ???
13+
14+
override def tableExists(): Boolean = ???
15+
16+
override def tableCreationHint(): String = ???
1117
}

postgres/src/main/scala/it/agilelab/darwin/connector/postgres/PostgresConnector.scala

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package it.agilelab.darwin.connector.postgres
33
import java.sql.ResultSet
44

55
import com.typesafe.config.Config
6-
import it.agilelab.darwin.common.Connector
6+
import it.agilelab.darwin.common.{Connector, using}
77
import org.apache.avro.Schema
88
import org.apache.avro.Schema.Parser
99

@@ -21,6 +21,12 @@ class PostgresConnector(config: Config) extends Connector(config) with PostgresC
2121

2222
setConnectionConfig(config)
2323

24+
private val CREATE_TABLE_STMT =
25+
s"""CREATE TABLE IF NOT EXISTS $TABLE_NAME (
26+
|id bigint NOT NULL PRIMARY KEY,
27+
|schema text NOT NULL
28+
|)""".stripMargin
29+
2430
override def fullLoad(): Seq[(Long, Schema)] = {
2531
val connection = getConnection
2632
var schemas: Seq[(Long, Schema)] = Seq.empty[(Long, Schema)]
@@ -58,4 +64,18 @@ class PostgresConnector(config: Config) extends Connector(config) with PostgresC
5864
connection.close
5965
}
6066
}
67+
68+
override def createTable(): Unit = {
69+
using(getConnection) { conn =>
70+
conn.createStatement().executeUpdate(CREATE_TABLE_STMT)
71+
}
72+
}
73+
74+
override def tableExists(): Boolean = ???
75+
76+
override def tableCreationHint(): String = {
77+
s"""To create table perform the following sql query:
78+
|$CREATE_TABLE_STMT
79+
""".stripMargin
80+
}
6181
}

0 commit comments

Comments
 (0)