Create outer loop for continuous sync

This commit is contained in:
towards-a-new-leftypol 2025-02-05 20:14:25 -05:00
parent 3ba873b984
commit d19e12e6de
3 changed files with 69 additions and 7 deletions

View File

@ -13,4 +13,4 @@ data BoardQueueElem = BoardQueueElem
instance Ord BoardQueueElem where instance Ord BoardQueueElem where
(<=) :: BoardQueueElem -> BoardQueueElem -> Bool (<=) :: BoardQueueElem -> BoardQueueElem -> Bool
a <= b = last_modified a >= last_modified b a <= b = last_modified a <= last_modified b

@ -1 +1 @@
Subproject commit 88b5f0df7ea5e83a65a6c6153f197da7cd1c6217 Subproject commit f8653a30b7ee95d23a47eef870fd03b09f65ea4d

View File

@ -1,4 +1,6 @@
{-# LANGUAGE RecordWildCards #-} {-# LANGUAGE RecordWildCards #-}
{-# OPTIONS_GHC -Wno-unrecognised-pragmas #-}
{-# HLINT ignore "Use tuple-section" #-}
module Sync where module Sync where
@ -6,6 +8,12 @@ import System.Exit (exitFailure)
import qualified Data.Map as Map import qualified Data.Map as Map
import qualified Data.Set as Set import qualified Data.Set as Set
import Data.Maybe (mapMaybe) import Data.Maybe (mapMaybe)
import Control.Concurrent.QSem
import Control.Concurrent.STM.TVar
import Control.Concurrent.STM (atomically)
import Control.Concurrent (threadDelay, forkFinally)
import System.Random (StdGen, getStdGen)
-- import Control.Monad.Trans.Except (ExceptT, runExceptT) -- import Control.Monad.Trans.Except (ExceptT, runExceptT)
import qualified Common.Server.ConsumerSettings as S import qualified Common.Server.ConsumerSettings as S
@ -29,15 +37,66 @@ consumerSettingsToPartialJSONSettings S.ConsumerJSONSettings {..} =
, site_url = undefined , site_url = undefined
} }
threadMain :: QE.BoardQueueElem -> IO ()
threadMain board_elem = do
putStrLn $ Board.pathpart $ QE.board board_elem
mainLoop :: S.ConsumerJSONSettings -> PQ.Queue QE.BoardQueueElem -> IO ()
mainLoop csmr_settings pq = do
sem <- newQSem (S.sync_max_concurrent_workers csmr_settings)
pqvar <- newTVarIO pq
stdGen <- getStdGen
loop sem stdGen pqvar
where
loop :: QSem -> StdGen -> TVar (PQ.Queue QE.BoardQueueElem) -> IO ()
loop sem stdGen pqvar = do
waitQSem sem -- make sure we don't have too many threads running
(m_board_elem, stdGen_) <- atomically $ do
pq_a <- readTVar pqvar
if Set.null pq_a
then
return (Nothing, stdGen)
else do
let (i, stdGen_) = PQ.selectSkewedIndex (Set.size pq) stdGen
let (board_elem, pq_b) = PQ.take i pq_a
writeTVar pqvar pq_b
return (Just board_elem, stdGen_)
_ <- case m_board_elem of
Nothing -> return undefined
Just board_elem -> do
forkFinally (threadMain board_elem) $ \threadResult -> do
case threadResult of
Left e -> print e
_ -> return ()
atomically $ modifyTVar' pqvar (PQ.put board_elem)
signalQSem sem
threadDelay (S.sync_loop_timeout_microseconds csmr_settings)
loop sem stdGen_ pqvar
syncWebsites :: S.ConsumerJSONSettings -> IO () syncWebsites :: S.ConsumerJSONSettings -> IO ()
syncWebsites consumer_settings = do syncWebsites csmr_settings = do
putStrLn "Starting channel web synchronization." putStrLn "Starting channel web synchronization."
let json_settings = consumerSettingsToPartialJSONSettings consumer_settings let json_settings = consumerSettingsToPartialJSONSettings csmr_settings
sitesResult <- Client.getAllSites json_settings sitesResult <- Client.getAllSites json_settings
sites <- mapM (flip Lib.ensureSiteExists sitesResult . Lib.toClientSettings consumer_settings) (S.websites consumer_settings) sites <- mapM (flip Lib.ensureSiteExists sitesResult . Lib.toClientSettings csmr_settings) (S.websites csmr_settings)
print sites print sites
@ -100,7 +159,7 @@ syncWebsites consumer_settings = do
existing_board_info existing_board_info
boards <- Lib.createArchivesForNewBoards boards <- Lib.createArchivesForNewBoards
(Lib.toClientSettings consumer_settings site_settings) (Lib.toClientSettings csmr_settings site_settings)
(Set.fromList $ S.boards site_settings) (Set.fromList $ S.boards site_settings)
((Map.!) boards_per_site s_id) ((Map.!) boards_per_site s_id)
s_id s_id
@ -108,7 +167,7 @@ syncWebsites consumer_settings = do
return (site, existing_boards ++ boards) return (site, existing_boards ++ boards)
) )
(S.websites consumer_settings) (S.websites csmr_settings)
let site_and_board_list = concatMap (\(a, bs) -> map (\b -> (a, b)) bs) site_and_board_list_ let site_and_board_list = concatMap (\(a, bs) -> map (\b -> (a, b)) bs) site_and_board_list_
@ -127,8 +186,11 @@ syncWebsites consumer_settings = do
let pq :: PQ.Queue QE.BoardQueueElem = Set.fromList queue_elems let pq :: PQ.Queue QE.BoardQueueElem = Set.fromList queue_elems
putStrLn "PQ:"
print pq print pq
mainLoop csmr_settings pq
-- we have our boards last modified timestamps -- we have our boards last modified timestamps
-- get list of boards per site -- get list of boards per site