Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import org.geotools.geometry.jts._
import org.geotools.jdbc.JDBCDataStore
import org.geotools.referencing.CRS
import org.geotools.util.factory.Hints
import org.locationtech.geomesa.gt.partition.postgis.dialect.PartitionedPostgisDialect.SftUserData
import org.locationtech.geomesa.gt.partition.postgis.dialect.PartitionedPostgisDialect.{SftUserData, getIndexedColumns}
import org.locationtech.geomesa.gt.partition.postgis.dialect.filter.SplitFilterVisitor
import org.locationtech.geomesa.gt.partition.postgis.dialect.functions.{LogCleaner, TruncateToPartition, TruncateToTenMinutes}
import org.locationtech.geomesa.gt.partition.postgis.dialect.procedures._
Expand Down Expand Up @@ -217,9 +217,19 @@ class PartitionedPostgisDialect(store: JDBCDataStore, grants: Seq[RoleName] = Se
UserDataTable.read(cx, schemaName, sft.getTypeName).foreach { case (k, v) => sft.getUserData.put(k, v) }

// populate flags on indexed attributes
getIndexedColumns(cx, sft.getTypeName).foreach { attribute =>
Option(sft.getDescriptor(attribute)).foreach(_.getUserData.put(AttributeOptions.OptIndex, "true"))
}
getIndexedColumns(cx, sft.getTypeName)
.recover {
case NonFatal(throwable) =>
logger.warn(s"SimpleFeatureType: ${sft.getTypeName} could not load attributes with indices", throwable)
List.empty
}
.get
.foreach { attribute =>
Try(sft.getDescriptor(attribute)).fold(
throwable => logger.warn(s"SimpleFeatureType: ${sft.getTypeName} could not load attribute descriptor by name: $attribute", throwable),
_.getUserData.put(AttributeOptions.OptIndex, "true")
)
}
}

override def preDropTable(schemaName: String, sft: SimpleFeatureType, cx: Connection): Unit = {
Expand Down Expand Up @@ -355,40 +365,9 @@ class PartitionedPostgisDialect(store: JDBCDataStore, grants: Seq[RoleName] = Se
"text"
}
}

/**
* Gets a list of indexed columns for the given type
*
* @param cx connection
* @param typeName feature type name
* @return
*/
private def getIndexedColumns(cx: Connection, typeName: String): Seq[String] = {
val attributesWithIndicesSql =
s"""select distinct(att.attname) as indexed_attribute_name
|from pg_class obj
|join pg_index idx on idx.indrelid = obj.oid
|join pg_attribute att on att.attrelid = obj.oid and att.attnum = any(idx.indkey)
|join pg_views v on v.viewname = ?
|where obj.relname = concat(?, ${PartitionedTableSuffix.quoted})
|order by att.attname;""".stripMargin
try {
WithClose(cx.prepareStatement(attributesWithIndicesSql)) { statement =>
statement.setString(1, typeName)
statement.setString(2, typeName)
WithClose(statement.executeQuery()) { rs =>
Iterator.continually(rs).takeWhile(_.next()).map(_.getString(1)).toList
}
}
} catch {
case NonFatal(e) =>
logger.warn(s"Error loading attributes with indices for schema $typeName:", e)
Seq.empty
}
}
}

object PartitionedPostgisDialect extends Conversions {
object PartitionedPostgisDialect extends Conversions with StrictLogging {

private val IgnoredTables = Seq("pg_stat_statements", "pg_stat_statements_info")

Expand Down Expand Up @@ -490,4 +469,29 @@ object PartitionedPostgisDialect extends Conversions {
def getCoordinateDimensions: Option[Int] =
Option(d.getUserData.get(Hints.COORDINATE_DIMENSION)).map(int)
}

/**
* Get a list of indexed columns for the given SimpleFeatureType
*
* @param cx connection
* @param typeName feature type name
* @return a sequence of SimpleFeatureType attribute names which have an index
*/
def getIndexedColumns(cx: Connection, typeName: String): Try[List[String]] = {
val attributesWithIndicesSql =
s"""select distinct(att.attname) as indexed_attribute_name
|from pg_class obj
|join pg_index idx on idx.indrelid = obj.oid
|join pg_attribute att on att.attrelid = obj.oid and att.attnum = any(idx.indkey)
|where obj.relname = concat(?, ${PartitionedTableSuffix.quoted})
|order by att.attname;""".stripMargin
Try {
WithClose(cx.prepareStatement(attributesWithIndicesSql)) { statement =>
statement.setString(1, typeName)
WithClose(statement.executeQuery()) { rs =>
Iterator.continually(rs).takeWhile(_.next()).map(_.getString(1)).toList
}
}
}
}
}
Loading