Hi Dave,
Use this to create the table
USE kalturadw;
CREATE TABLE dwh_entry_plays_views
(entry_id VARCHAR(255),
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
plays INT(11),
views INT(11),
PRIMARY KEY (entry_id),
KEY (updated_at));
As for the stored proc, this is what I last used for CE 5 [again about 5 years ago but I believe it should work for you]:
DELIMITER $$
USE `kalturadw`$$
DROP PROCEDURE IF EXISTS `get_data_for_operational`$$
CREATE DEFINER=`etl`@`localhost` PROCEDURE `get_data_for_operational`(p_sync_type VARCHAR(55))
BEGIN
DECLARE v_execution_start_time DATETIME;
DECLARE v_group_column VARCHAR(1024);
DECLARE v_entity_table VARCHAR(1024);
DECLARE v_aggregation_phrase VARCHAR(1024);
DECLARE v_aggregation_table VARCHAR(1024);
DECLARE v_bridge_entity VARCHAR(1024);
DECLARE v_bridge_table VARCHAR(1024);
DECLARE v_last_execution_parameter_id INT;
DECLARE v_execution_start_time_parameter_id INT;
SET v_execution_start_time = NOW();
SELECT group_column, entity_table, aggregation_phrase, aggregation_table,
bridge_entity, bridge_table, last_execution_parameter_id, execution_start_time_parameter_id
INTO v_group_column, v_entity_table, v_aggregation_phrase, v_aggregation_table,
v_bridge_entity, v_bridge_table, v_last_execution_parameter_id, v_execution_start_time_parameter_id
FROM kalturadw_ds.operational_syncs WHERE operational_sync_name = p_sync_type;
UPDATE kalturadw_ds.parameters SET date_value = v_execution_start_time WHERE id = v_execution_start_time_parameter_id;
SET @s = CONCAT('SELECT dim.', v_group_column,', ', v_aggregation_phrase,
' FROM ', v_aggregation_table ,' aggr, ', IF (v_bridge_table IS NULL, '', CONCAT(v_bridge_table, ' bridge, ')), v_entity_table, ' dim, kalturadw_ds.parameters p',
' WHERE aggr.', IF(v_bridge_entity IS NULL, v_group_column,
CONCAT(v_bridge_entity, ' = bridge.',v_bridge_entity, ' AND bridge.', v_group_column)),
' = dim.', v_group_column, ' AND dim.operational_measures_updated_at > p.date_value AND p.id = ', v_last_execution_parameter_id,
' GROUP BY dim.',v_group_column);
PREPARE stmt FROM @s;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
END$$
DELIMITER ;
This version of updateEntryPlaysViews.php should work with CE 5:
<?php
define ('ROOT_DIR','/monsoon/opt/kaltura/app');
require_once(ROOT_DIR.'/api_v3/bootstrap.php');
require_once(ROOT_DIR . '/alpha/config/kConfLocal.php');
require_once(ROOT_DIR . '/infra/bootstrap_base.php');
require_once(ROOT_DIR . '/infra/KAutoloader.php');
KAutoloader::addClassPath(KAutoloader::buildPath(ROOT_DIR, "vendor", "propel", "*"));
KAutoloader::addClassPath(KAutoloader::buildPath(ROOT_DIR, "plugins", "*"));
KAutoloader::addClassPath(KAutoloader::buildPath(ROOT_DIR, "infra", "*"));
KAutoloader::setClassMapFilePath(kConf::get("cache_root_path") . '/deploy/classMap.cache');
KAutoloader::register();
require_once(ROOT_DIR.DIRECTORY_SEPARATOR.DIRECTORY_SEPARATOR."infra".DIRECTORY_SEPARATOR."bootstrap_base.php");
require_once (ROOT_DIR.DIRECTORY_SEPARATOR.'alpha'.DIRECTORY_SEPARATOR.'config'.DIRECTORY_SEPARATOR.'kConf.php');
// Autoloader
require_once(ROOT_DIR. DIRECTORY_SEPARATOR."infra".DIRECTORY_SEPARATOR."KAutoloader.php");
$f = fopen("php://stdin", "r");
$count = 0;
$sphinxMgr = new kSphinxSearchManager();
$dbConf = kConf::getDB();
DbManager::setConfig($dbConf);
DbManager::initialize();
$connection = Propel::getConnection();
$partnerId=100;
while($s = trim(fgets($f))){
$sep = strpos($s, "\t") ? "\t" : " ";
list($entryId, $plays, $views) = explode($sep, $s);
myPartnerUtils::resetAllFilters();
entryPeer::setDefaultCriteriaFilter();
$entry = entryPeer::retrieveByPK ( $entryId);
if (is_null ( $entry )) {
KalturaLog::err ('Couldn\'t find entry [' . $entryId . ']' );
continue;
}
if ($entry->getViews() != $views || $entry->getPlays() != $plays){
$entry->setViews ( $views );
$entry->setPlays ( $plays );
KalturaLog::debug ( 'Successfully saved entry [' . $entryId . ']' );
try {
// update entry without setting the updated at
$updateSql = "UPDATE entry set views='$views',plays='$plays' WHERE id='$entryId'";
$stmt = $connection->prepare($updateSql);
$stmt->execute();
$affectedRows = $stmt->rowCount();
KalturaLog::log("AffectedRows: ". $affectedRows);
// update sphinx log directly
$sql = $sphinxMgr->getSphinxSaveSql($entry, false);
$sphinxLog = new SphinxLog();
$sphinxLog->setEntryId($entryId);
$sphinxLog->setPartnerId($partnerId);
$sphinxLog->setSql($sql);
$sphinxLog->save(myDbHelper::getConnection(myDbHelper::DB_HELPER_CONN_SPHINX_LOG));
} catch (Exception $e) {
KalturaLog::log($e->getMessage(), Propel::LOG_ERR);
}
}
$count++;
if ($count % 500 === 0){
entryPeer::clearInstancePool ();
}
}
?>