diff --git a/sql/initialize.sql b/sql/initialize.sql index c94969a..7d1c40f 100644 --- a/sql/initialize.sql +++ b/sql/initialize.sql @@ -17,6 +17,7 @@ DROP TABLE IF EXISTS threads CASCADE; DROP TABLE IF EXISTS posts CASCADE; DROP TABLE IF EXISTS attachments CASCADE; DROP TYPE IF EXISTS catalog_grid_result CASCADE; +DROP TYPE IF EXISTS post_key CASCADE; DROP FUNCTION IF EXISTS update_post_body_search_index; DROP FUNCTION IF EXISTS fetch_top_threads; DROP FUNCTION IF EXISTS fetch_catalog; @@ -170,6 +171,8 @@ $$ LANGUAGE sql; */ + +-- Deprecated: this doesn't insert local_idx, seems better to just get all the posts in application CREATE OR REPLACE FUNCTION insert_posts_and_return_ids(new_posts posts[]) RETURNS TABLE (post_id bigint, board_post_id bigint, thread_id bigint) AS $$ WITH @@ -199,6 +202,21 @@ $$ LANGUAGE sql; -- 1:21 for full db (nothing inserted) +CREATE TYPE post_key AS + ( thread_id bigint + , board_post_id bigint + ); + +CREATE OR REPLACE FUNCTION get_posts(board_posts post_key[]) +RETURNS SETOF posts AS $$ + + SELECT * + FROM posts + WHERE (thread_id, board_post_id) IN (SELECT thread_id, board_post_id FROM unnest(board_posts)) + +$$ LANGUAGE sql; + + CREATE OR REPLACE FUNCTION fetch_top_threads( p_start_time TIMESTAMPTZ, lookback INT DEFAULT 10000 @@ -345,6 +363,7 @@ REVOKE EXECUTE ON FUNCTION fetch_top_threads FROM PUBLIC; REVOKE EXECUTE ON FUNCTION fetch_catalog FROM PUBLIC; REVOKE EXECUTE ON FUNCTION search_posts FROM PUBLIC; REVOKE EXECUTE ON FUNCTION update_post_body_search_index FROM PUBLIC; +REVOKE EXECUTE ON FUNCTION get_posts FROM PUBLIC; CREATE ROLE chan_archive_anon nologin; GRANT CONNECT ON DATABASE chan_archives TO chan_archive_anon; @@ -356,6 +375,7 @@ GRANT SELECT ON attachments TO chan_archive_anon; GRANT EXECUTE ON FUNCTION fetch_catalog TO chan_archive_anon; GRANT EXECUTE ON FUNCTION fetch_top_threads TO chan_archive_anon; GRANT EXECUTE ON FUNCTION search_posts TO chan_archive_anon; +GRANT EXECUTE ON FUNCTION get_posts TO chan_archive_anon; -- GRANT usage, select ON SEQUENCE sites_site_id_seq TO chan_archive_anon; -- GRANT usage, select ON SEQUENCE boards_board_id_seq TO chan_archive_anon; @@ -374,6 +394,7 @@ GRANT EXECUTE ON FUNCTION insert_posts_and_return_ids TO chan_archiver; GRANT EXECUTE ON FUNCTION fetch_top_threads TO chan_archiver; GRANT EXECUTE ON FUNCTION fetch_catalog TO chan_archiver; GRANT EXECUTE ON FUNCTION search_posts TO chan_archiver; +GRANT EXECUTE ON FUNCTION get_posts TO chan_archiver; GRANT usage, select ON SEQUENCE sites_site_id_seq TO chan_archiver; GRANT usage, select ON SEQUENCE boards_board_id_seq TO chan_archiver; GRANT usage, select ON SEQUENCE threads_thread_id_seq TO chan_archiver; diff --git a/sql/make_get_posts.sql b/sql/make_get_posts.sql new file mode 100644 index 0000000..6c8f980 --- /dev/null +++ b/sql/make_get_posts.sql @@ -0,0 +1,25 @@ +BEGIN TRANSACTION; + +DROP TYPE IF EXISTS post_key CASCADE; + +-- DROP FUNCTION IF EXISTS get_posts; + +CREATE TYPE post_key AS + ( thread_id bigint + , board_post_id bigint + ); + +CREATE OR REPLACE FUNCTION get_posts(board_posts post_key[]) +RETURNS SETOF posts AS $$ + + SELECT * + FROM posts + WHERE (thread_id, board_post_id) IN (SELECT thread_id, board_post_id FROM unnest(board_posts)) + +$$ LANGUAGE sql; + +GRANT EXECUTE ON FUNCTION get_posts TO chan_archive_anon; +GRANT EXECUTE ON FUNCTION get_posts TO chan_archiver; +REVOKE EXECUTE ON FUNCTION get_posts FROM PUBLIC; + +COMMIT; diff --git a/src/Backfill.hs b/src/Backfill.hs index 246a9f3..bb57ebb 100644 --- a/src/Backfill.hs +++ b/src/Backfill.hs @@ -18,7 +18,8 @@ import System.Directory , createDirectoryIfMissing ) import System.FilePath ((), (<.>), takeExtension) -import Data.List (find, isSuffixOf, foldl') +import Data.List (find, isSuffixOf, foldl', sortBy) +import Data.Ord (comparing) import qualified Data.Set as Set import Data.Set (Set) import Data.Time.Clock.POSIX (posixSecondsToUTCTime) @@ -200,9 +201,12 @@ readPosts settings board thread = do backupDir = backup_read_root settings Boards.pathpart board +apiPostToPostKey :: Threads.Thread -> JSONPosts.Post -> Client.PostId +apiPostToPostKey thread post = Client.PostId (Threads.thread_id thread) (JSONPosts.no post) + -- Convert Post to DbPost -apiPostToArchivePost :: Threads.Thread -> JSONPosts.Post -> Posts.Post -apiPostToArchivePost thread post = +apiPostToArchivePost :: Int -> Threads.Thread -> JSONPosts.Post -> Posts.Post +apiPostToArchivePost local_idx thread post = Posts.Post { Posts.post_id = Nothing , Posts.board_post_id = JSONPosts.no post @@ -213,6 +217,7 @@ apiPostToArchivePost thread post = , Posts.email = JSONPosts.email post , Posts.thread_id = Threads.thread_id thread , Posts.embed = JSONPosts.embed post + , Posts.local_idx = local_idx } -- | A version of 'concatMap' that works with a monadic predicate. @@ -230,18 +235,19 @@ concatMapM op = foldr f (pure []) pure $ x_ ++ xs_ -setPostIdInPosts - :: [(Sites.Site, Boards.Board, Threads.Thread, JSONPosts.Post, Posts.Post)] - -> [ Client.PostId ] +addPostsToTuples + :: [(Sites.Site, Boards.Board, Threads.Thread, JSONPosts.Post)] + -> [ Posts.Post ] -> [(Sites.Site, Boards.Board, Threads.Thread, JSONPosts.Post, Posts.Post)] -setPostIdInPosts tuples ids = map f ids +addPostsToTuples tuples posts = map f posts where - post_map :: Map.Map (Int64, Int64) (Sites.Site, Boards.Board, Threads.Thread, JSONPosts.Post, Posts.Post) - post_map = Map.fromList (map (\(a, b, c, i, j) -> ((Posts.thread_id j, Posts.board_post_id j), (a, b, c, i, j))) tuples) + post_map :: Map.Map (Int64, Int64) (Sites.Site, Boards.Board, Threads.Thread, JSONPosts.Post) + post_map = Map.fromList (map (\(a, b, c, d) -> ((Threads.thread_id c, JSONPosts.no d), (a, b, c, d))) tuples) - f :: Client.PostId -> (Sites.Site, Boards.Board, Threads.Thread, JSONPosts.Post, Posts.Post) - f (Client.PostId { Client.post_id = asdf1, Client.thread_id = asdf2, Client.board_post_id = asdf3 }) = - (\(a, b, c, i, j) -> (a, b, c, i, j { Posts.post_id = Just asdf1 })) (post_map Map.! (asdf2, asdf3)) + f :: Posts.Post -> (Sites.Site, Boards.Board, Threads.Thread, JSONPosts.Post, Posts.Post) + f new_post = + (\(a, b, c, d) -> (a, b, c, d, new_post)) + (post_map Map.! (Posts.thread_id new_post, Posts.board_post_id new_post)) fileToAttachment :: Int -> Posts.Post -> JS.File -> At.Attachment @@ -520,6 +526,57 @@ processFiles settings tuples = do -- perfect just means that our posts have ids, in Map.insert pid (x : l) accMap +createNewPosts + :: JSONSettings + -> [ (Threads.Thread, JSONPosts.Post, Client.PostId) ] + -> IO [ Posts.Post ] +createNewPosts settings tuples = do + existing_post_results <- Client.getPosts settings (map (\(_, _, c) -> c) tuples) + existing_posts <- either handleError return existing_post_results + + let existing_set :: Set (Int64, Int64) = Set.fromList (map (\x -> (Posts.thread_id x, Posts.board_post_id x)) existing_posts) + + let to_insert_list :: [ (Threads.Thread, JSONPosts.Post, Client.PostId) ] = + sortBy (comparing $ \(_, _, p) -> Client.board_post_id p) $ + determineNew tuples existing_set + + -- Map of thread_id to the largest local_idx value (which would be the number of the last post in the thread) + let local_idx :: Map.Map Int64 Int = foldr (Map.unionWith max) Map.empty $ + map (\x -> Map.singleton (Posts.thread_id x) (Posts.local_idx x)) existing_posts + + let insert_posts = fst $ foldl' foldFn ([], local_idx) to_insert_list + + -- posts to insert are the posts that are not in existing_posts + -- so we create a Set (thread_id, board_post_id) ✓ + -- then check every tuples against the set and the ones not in the set get added to a to_insert_list ✓ + -- also for every tuples we need to compute a local_idx + -- so we create a Map index_map from thread_id to local_idx ✓ + -- - for existing_posts + -- - need to compare posts already in the map with another post and keep the max local_idx ✓ + -- to get the new local_idx, we must order the to_insert_list by board_post_id, and look up each entry ✓ + + posts_result <- Client.postPosts settings insert_posts + new_posts <- either handleError return posts_result + return $ existing_posts ++ new_posts + + where + handleError err = print err >> exitFailure + + determineNew :: [(Threads.Thread, JSONPosts.Post, Client.PostId)] -> Set (Int64, Int64) -> [(Threads.Thread, JSONPosts.Post, Client.PostId)] + determineNew ts existing_set = filter (\(_, _, c) -> Set.notMember (Client.thread_id c, Client.board_post_id c) existing_set) ts + + foldFn :: ([Posts.Post], Map.Map Int64 Int) -> (Threads.Thread, JSONPosts.Post, Client.PostId) -> ([Posts.Post], Map.Map Int64 Int) + foldFn (posts, idx_map) (t, p, c) = + case Map.lookup thread_id idx_map of + Nothing -> ((post 1) : posts, Map.insert thread_id 1 idx_map) + Just i -> ((post (i + 1)) : posts, Map.insert thread_id (i + 1) idx_map) + + where + post :: Int -> Posts.Post + post i = apiPostToArchivePost i t p + + thread_id = Client.thread_id c + processBoard :: JSONSettings -> Sites.Site -> Boards.Board -> IO () processBoard settings site board = do let catalogPath = backupDir "catalog.json" @@ -535,23 +592,21 @@ processBoard settings site board = do all_posts_on_board :: [(Threads.Thread, [ JSONPosts.Post ])] <- mapM (readPosts settings board) all_threads_for_board - let tuples :: [(Sites.Site, Boards.Board, Threads.Thread, JSONPosts.Post, Posts.Post)] = concatMap - (\(t, posts) -> map (\p -> (site, board, t, p, apiPostToArchivePost t p)) posts) + + let tuples :: [(Sites.Site, Boards.Board, Threads.Thread, JSONPosts.Post)] = concatMap + (\(t, posts) -> map (\p -> (site, board, t, p)) posts) all_posts_on_board - posts_result <- Client.postPosts settings (map (\(_, _, _, _, d) -> d) tuples) + posts_result :: [ Posts.Post ] <- createNewPosts settings (map (\(_, _, c, d) -> (c, d, apiPostToPostKey c d)) tuples) - case posts_result of - Left err -> print err - Right (new_ids :: [ Client.PostId ]) -> do - putStrLn "Sum of post_ids:" - print $ sum $ map Client.post_id new_ids - putStrLn "Sum of board_post_ids:" - print $ sum $ map Client.board_post_id new_ids + putStrLn "Sum of post_ids:" + print $ sum $ map (fromJust . Posts.post_id) posts_result + putStrLn "Sum of board_post_ids:" + print $ sum $ map Posts.board_post_id posts_result - let perfect_post_pairs = setPostIdInPosts tuples new_ids + let perfect_post_pairs = addPostsToTuples tuples posts_result - processFiles settings perfect_post_pairs + processFiles settings perfect_post_pairs Left errMsg -> putStrLn $ "Failed to parse the JSON file in directory: " diff --git a/src/Common b/src/Common index 1b08b3a..aed106f 160000 --- a/src/Common +++ b/src/Common @@ -1 +1 @@ -Subproject commit 1b08b3ac079c6032e1090e79a8074d4cfd9ef24f +Subproject commit aed106f40e24125d9057d3a9dda934a41c6d68a0 diff --git a/src/DataClient.hs b/src/DataClient.hs index 969dbf4..be013fe 100644 --- a/src/DataClient.hs +++ b/src/DataClient.hs @@ -12,6 +12,7 @@ module DataClient , postBoards , getThreads , postThreads + , getPosts , postPosts , getAttachments , postAttachments @@ -26,6 +27,7 @@ import qualified Data.ByteString.Lazy.Char8 as LC8 import Data.List (intercalate) import Data.Aeson ( eitherDecode + , ToJSON , FromJSON , (.=) , object @@ -42,11 +44,12 @@ import qualified Common.AttachmentType as Attachments import qualified Common.PostsType as Posts import Common.Network.HttpClient + data PostId = PostId - { post_id :: Int64 - , board_post_id :: Int64 - , thread_id :: Int64 - } deriving (Show, Generic, FromJSON) + { board_post_id :: Int64 + , thread_id :: Int64 + } deriving (Show, Generic, ToJSON) + getSiteBoards :: T.JSONSettings -> Int -> IO (Either HttpError [ Boards.Board ]) getSiteBoards settings site_id_ = get settings path >>= return . eitherDecodeResponse @@ -103,6 +106,7 @@ postThreads settings threads = getAllSites :: T.JSONSettings -> IO (Either HttpError [ Sites.Site ]) getAllSites settings = get settings "/sites" >>= return . eitherDecodeResponse + getThreads :: T.JSONSettings -> Int -> [ Int ] -> IO (Either HttpError [ Threads.Thread ]) getThreads settings board_id board_thread_ids = get settings path >>= return . eitherDecodeResponse @@ -155,15 +159,39 @@ postAttachments settings attachments = do payload = encode attachments +-- | Function to handle each chunk. +getPostsChunk :: T.JSONSettings -> [ PostId ] -> IO (Either HttpError [Posts.Post]) +getPostsChunk settings chunk = + post settings "/rpc/get_posts" payload False >>= return . eitherDecodeResponse + + where + payload = encode $ object [ "board_posts" .= chunk ] + + +getPosts :: T.JSONSettings -> [ PostId ] -> IO (Either HttpError [Posts.Post]) +getPosts settings xs = do + results <- forM (chunkList chunkSize xs) (getPostsChunk settings) + return $ combineResults results + where + chunkSize = 1000 + + postPosts :: T.JSONSettings -> [ Posts.Post ] - -> IO (Either HttpError [ PostId ]) -postPosts settings posts = - post settings "/rpc/insert_posts_and_return_ids" payload True >>= return . eitherDecodeResponse + -> IO (Either HttpError [ Posts.Post ]) +postPosts settings posts = do + post settings "/posts" payload True >>= return . eitherDecodeResponse where - payload = encode $ object [ "new_posts" .= posts ] + payload = encode posts + + +-- Old type: +-- postPosts +-- :: T.JSONSettings +-- -> [ Posts.Post ] +-- -> IO (Either HttpError [ PostId ]) eitherDecodeResponse :: (FromJSON a) => Either HttpError LBS.ByteString -> Either HttpError a