diff --git a/src/BoardQueueElem.hs b/src/BoardQueueElem.hs index d91bf16..f4ab1f0 100644 --- a/src/BoardQueueElem.hs +++ b/src/BoardQueueElem.hs @@ -13,4 +13,4 @@ data BoardQueueElem = BoardQueueElem instance Ord BoardQueueElem where (<=) :: BoardQueueElem -> BoardQueueElem -> Bool - a <= b = last_modified a >= last_modified b + a <= b = last_modified a <= last_modified b diff --git a/src/Common b/src/Common index 88b5f0d..f8653a3 160000 --- a/src/Common +++ b/src/Common @@ -1 +1 @@ -Subproject commit 88b5f0df7ea5e83a65a6c6153f197da7cd1c6217 +Subproject commit f8653a30b7ee95d23a47eef870fd03b09f65ea4d diff --git a/src/Sync.hs b/src/Sync.hs index fdb3e49..b1ae07a 100644 --- a/src/Sync.hs +++ b/src/Sync.hs @@ -1,4 +1,6 @@ {-# LANGUAGE RecordWildCards #-} +{-# OPTIONS_GHC -Wno-unrecognised-pragmas #-} +{-# HLINT ignore "Use tuple-section" #-} module Sync where @@ -6,6 +8,12 @@ import System.Exit (exitFailure) import qualified Data.Map as Map import qualified Data.Set as Set import Data.Maybe (mapMaybe) +import Control.Concurrent.QSem +import Control.Concurrent.STM.TVar +import Control.Concurrent.STM (atomically) +import Control.Concurrent (threadDelay, forkFinally) +import System.Random (StdGen, getStdGen) + -- import Control.Monad.Trans.Except (ExceptT, runExceptT) import qualified Common.Server.ConsumerSettings as S @@ -29,15 +37,66 @@ consumerSettingsToPartialJSONSettings S.ConsumerJSONSettings {..} = , site_url = undefined } + +threadMain :: QE.BoardQueueElem -> IO () +threadMain board_elem = do + putStrLn $ Board.pathpart $ QE.board board_elem + + +mainLoop :: S.ConsumerJSONSettings -> PQ.Queue QE.BoardQueueElem -> IO () +mainLoop csmr_settings pq = do + sem <- newQSem (S.sync_max_concurrent_workers csmr_settings) + pqvar <- newTVarIO pq + stdGen <- getStdGen + + loop sem stdGen pqvar + + where + loop :: QSem -> StdGen -> TVar (PQ.Queue QE.BoardQueueElem) -> IO () + loop sem stdGen pqvar = do + waitQSem sem -- make sure we don't have too many threads running + + (m_board_elem, stdGen_) <- atomically $ do + pq_a <- readTVar pqvar + + if Set.null pq_a + then + return (Nothing, stdGen) + else do + let (i, stdGen_) = PQ.selectSkewedIndex (Set.size pq) stdGen + + let (board_elem, pq_b) = PQ.take i pq_a + + writeTVar pqvar pq_b + + return (Just board_elem, stdGen_) + + _ <- case m_board_elem of + Nothing -> return undefined + Just board_elem -> do + forkFinally (threadMain board_elem) $ \threadResult -> do + case threadResult of + Left e -> print e + _ -> return () + + atomically $ modifyTVar' pqvar (PQ.put board_elem) + + signalQSem sem + + threadDelay (S.sync_loop_timeout_microseconds csmr_settings) + + loop sem stdGen_ pqvar + + syncWebsites :: S.ConsumerJSONSettings -> IO () -syncWebsites consumer_settings = do +syncWebsites csmr_settings = do putStrLn "Starting channel web synchronization." - let json_settings = consumerSettingsToPartialJSONSettings consumer_settings + let json_settings = consumerSettingsToPartialJSONSettings csmr_settings sitesResult <- Client.getAllSites json_settings - sites <- mapM (flip Lib.ensureSiteExists sitesResult . Lib.toClientSettings consumer_settings) (S.websites consumer_settings) + sites <- mapM (flip Lib.ensureSiteExists sitesResult . Lib.toClientSettings csmr_settings) (S.websites csmr_settings) print sites @@ -100,7 +159,7 @@ syncWebsites consumer_settings = do existing_board_info boards <- Lib.createArchivesForNewBoards - (Lib.toClientSettings consumer_settings site_settings) + (Lib.toClientSettings csmr_settings site_settings) (Set.fromList $ S.boards site_settings) ((Map.!) boards_per_site s_id) s_id @@ -108,7 +167,7 @@ syncWebsites consumer_settings = do return (site, existing_boards ++ boards) ) - (S.websites consumer_settings) + (S.websites csmr_settings) let site_and_board_list = concatMap (\(a, bs) -> map (\b -> (a, b)) bs) site_and_board_list_ @@ -127,8 +186,11 @@ syncWebsites consumer_settings = do let pq :: PQ.Queue QE.BoardQueueElem = Set.fromList queue_elems + putStrLn "PQ:" print pq + mainLoop csmr_settings pq + -- we have our boards last modified timestamps -- get list of boards per site